fuchsia_inspect_auto_persist/
lib.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl_fuchsia_diagnostics_persist::PersistResult;
6use futures::channel::mpsc;
7use futures::{Future, StreamExt};
8use injectable_time::{MonotonicInstant, TimeSource};
9use log::{error, info};
10use std::ops::{Deref, DerefMut};
11use std::sync::Arc;
12use std::sync::atomic::{AtomicBool, Ordering};
13
14pub type PersistenceReqSender = mpsc::Sender<String>;
15
16/// Wrapper around an Inspect node T so that after the node is accessed (and written to),
17/// the corresponding Data Persistence tag would be sent through a channel so that it
18/// can be forwarded to the Data Persistence Service.
19pub struct AutoPersist<T> {
20    inspect_node: T,
21    persistence_tag: String,
22    persistence_req_sender: PersistenceReqSender,
23    sender_is_blocked: Arc<AtomicBool>,
24}
25
26impl<T> AutoPersist<T> {
27    pub fn new(
28        inspect_node: T,
29        persistence_tag: &str,
30        persistence_req_sender: PersistenceReqSender,
31    ) -> Self {
32        Self {
33            inspect_node,
34            persistence_tag: persistence_tag.to_string(),
35            persistence_req_sender,
36            sender_is_blocked: Arc::new(AtomicBool::new(false)),
37        }
38    }
39
40    /// Return a guard that derefs to `inspect_node`. When the guard is dropped,
41    /// `persistence_tag` is sent via the `persistence_req_sender`.
42    pub fn get_mut(&mut self) -> AutoPersistGuard<'_, T> {
43        AutoPersistGuard {
44            inspect_node: &mut self.inspect_node,
45            persistence_tag: &self.persistence_tag,
46            persistence_req_sender: &mut self.persistence_req_sender,
47            sender_is_blocked: Arc::clone(&self.sender_is_blocked),
48        }
49    }
50}
51
52pub struct AutoPersistGuard<'a, T> {
53    inspect_node: &'a mut T,
54    persistence_tag: &'a str,
55    persistence_req_sender: &'a mut PersistenceReqSender,
56    sender_is_blocked: Arc<AtomicBool>,
57}
58
59impl<T> Deref for AutoPersistGuard<'_, T> {
60    type Target = T;
61
62    fn deref(&self) -> &Self::Target {
63        self.inspect_node
64    }
65}
66
67impl<T> DerefMut for AutoPersistGuard<'_, T> {
68    fn deref_mut(&mut self) -> &mut Self::Target {
69        self.inspect_node
70    }
71}
72
73impl<T> Drop for AutoPersistGuard<'_, T> {
74    fn drop(&mut self) {
75        if self.persistence_req_sender.try_send(self.persistence_tag.to_string()).is_err() {
76            // If sender has not been blocked before, set bool to true and log error message
77            if self
78                .sender_is_blocked
79                .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
80                .is_ok()
81            {
82                error!(
83                    "PersistenceReqSender dropped a persistence request: either buffer is full or no receiver is waiting"
84                );
85            }
86        } else {
87            // If sender has been blocked before, set bool to false and log message
88            if self
89                .sender_is_blocked
90                .compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
91                .is_ok()
92            {
93                info!("PersistenceReqSender recovered and resumed sending");
94            }
95        }
96    }
97}
98
99fn log_at_most_once_per_min_factory(
100    time_source: impl TimeSource,
101    mut log_fn: impl FnMut(String),
102) -> impl FnMut(String) {
103    let mut last_logged = None;
104    move |message| {
105        let now = zx::MonotonicInstant::from_nanos(time_source.now());
106        let should_log = match last_logged {
107            Some(last_logged) => (now - last_logged) >= zx::MonotonicDuration::from_minutes(1),
108            None => true,
109        };
110        if should_log {
111            log_fn(message);
112            last_logged.replace(now);
113        }
114    }
115}
116
117// arbitrary value
118const DEFAULT_BUFFER_SIZE: usize = 100;
119
120/// Create a sender for sending Persistence tag, and a Future representing a sending thread
121/// that forwards that tag to the Data Persistence service.
122///
123/// If the sending thread fails to forward a tag, or the Persistence Service returns an error
124/// code, an error will be logged. However, an error is only logged at most once per minute
125/// to avoid log spam.
126pub fn create_persistence_req_sender(
127    persistence_proxy: fidl_fuchsia_diagnostics_persist::DataPersistenceProxy,
128) -> (PersistenceReqSender, impl Future<Output = ()>) {
129    let (sender, mut receiver) = mpsc::channel::<String>(DEFAULT_BUFFER_SIZE);
130    let fut = async move {
131        let persistence_proxy = persistence_proxy.clone();
132        let mut log_error =
133            log_at_most_once_per_min_factory(MonotonicInstant::new(), |e| error!("{}", e));
134        while let Some(tag_name) = receiver.next().await {
135            let resp = persistence_proxy.persist(&tag_name).await;
136            match resp {
137                Ok(PersistResult::Queued) => continue,
138                Ok(other) => log_error(format!(
139                    "Persistence Service returned an error for tag {tag_name}: {other:?}"
140                )),
141                Err(e) => log_error(format!(
142                    "Failed to send request to Persistence Service for tag {tag_name}: {e}"
143                )),
144            }
145        }
146    };
147    (sender, fut)
148}
149
150#[cfg(test)]
151mod tests {
152    use super::*;
153    use fidl::endpoints::create_proxy_and_stream;
154    use fidl_fuchsia_diagnostics_persist::DataPersistenceRequest;
155    use fuchsia_async as fasync;
156    use fuchsia_inspect::Inspector;
157    use futures::task::Poll;
158    use std::cell::RefCell;
159    use std::pin::pin;
160    use std::rc::Rc;
161
162    #[fuchsia::test]
163    fn test_auto_persist() {
164        let (sender, mut receiver) = mpsc::channel::<String>(100);
165        let inspector = Inspector::default();
166        let node = inspector.root().create_child("node");
167        let mut auto_persist_node = AutoPersist::new(node, "some-tag", sender);
168
169        // There should be no message on the receiver end yet
170        assert!(receiver.try_next().is_err());
171
172        {
173            let _guard = auto_persist_node.get_mut();
174        }
175
176        match receiver.try_next() {
177            Ok(Some(tag)) => assert_eq!(tag, "some-tag"),
178            _ => panic!("expect message in receiver"),
179        }
180    }
181
182    #[fuchsia::test]
183    fn test_create_persistence_req_sender() {
184        let mut exec = fasync::TestExecutor::new();
185        let (persistence_proxy, mut persistence_stream) =
186            create_proxy_and_stream::<fidl_fuchsia_diagnostics_persist::DataPersistenceMarker>();
187        let (mut req_sender, req_forwarder_fut) = create_persistence_req_sender(persistence_proxy);
188
189        let mut req_forwarder_fut = pin!(req_forwarder_fut);
190
191        // Nothing has happened yet, so these futures should be Pending
192        match exec.run_until_stalled(&mut req_forwarder_fut) {
193            Poll::Pending => (),
194            other => panic!("unexpected variant: {other:?}"),
195        };
196        match exec.run_until_stalled(&mut persistence_stream.next()) {
197            Poll::Pending => (),
198            other => panic!("unexpected variant: {other:?}"),
199        };
200
201        assert!(req_sender.try_send("some-tag".to_string()).is_ok());
202
203        // req_forwarder_fut still Pending because it's a loop
204        match exec.run_until_stalled(&mut req_forwarder_fut) {
205            Poll::Pending => (),
206            other => panic!("unexpected variant: {other:?}"),
207        };
208        // There should be a message in the stream now
209        match exec.run_until_stalled(&mut persistence_stream.next()) {
210            Poll::Ready(Some(Ok(DataPersistenceRequest::Persist { tag, .. }))) => {
211                assert_eq!(tag, "some-tag")
212            }
213            other => panic!("unexpected variant: {other:?}"),
214        };
215    }
216
217    #[derive(Debug)]
218    struct FakeTimeSource {
219        now: Rc<RefCell<zx::MonotonicInstant>>,
220    }
221
222    impl TimeSource for FakeTimeSource {
223        fn now(&self) -> i64 {
224            self.now.borrow().into_nanos()
225        }
226    }
227
228    #[fuchsia::test]
229    fn test_log_at_most_once_per_min_factory() {
230        let log_count = Rc::new(RefCell::new(0));
231        let now = Rc::new(RefCell::new(zx::MonotonicInstant::from_nanos(0)));
232        let fake_time_source = FakeTimeSource { now: now.clone() };
233        let mut log =
234            log_at_most_once_per_min_factory(fake_time_source, |_| *log_count.borrow_mut() += 1);
235
236        log("message 1".to_string());
237        assert_eq!(*log_count.borrow(), 1);
238
239        // No time has passed, so log_count shouldn't increase
240        log("message 2".to_string());
241        assert_eq!(*log_count.borrow(), 1);
242
243        {
244            *now.borrow_mut() += zx::MonotonicDuration::from_seconds(30);
245        }
246
247        // Not enough time has passed, so log_count shouldn't increase
248        log("message 3".to_string());
249        assert_eq!(*log_count.borrow(), 1);
250
251        {
252            *now.borrow_mut() += zx::MonotonicDuration::from_seconds(30);
253        }
254
255        // Enough time has passed, so log_count should increase
256        log("message 3".to_string());
257        assert_eq!(*log_count.borrow(), 2);
258    }
259}