fuchsia_inspect_auto_persist/
lib.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl_fuchsia_diagnostics_persist::PersistResult;
6use futures::channel::mpsc;
7use futures::{Future, StreamExt};
8use injectable_time::{MonotonicInstant, TimeSource};
9use log::{error, info};
10use std::ops::{Deref, DerefMut};
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::Arc;
13
14pub type PersistenceReqSender = mpsc::Sender<String>;
15
16/// Wrapper around an Inspect node T so that after the node is accessed (and written to),
17/// the corresponding Data Persistence tag would be sent through a channel so that it
18/// can be forwarded to the Data Persistence Service.
19pub struct AutoPersist<T> {
20    inspect_node: T,
21    persistence_tag: String,
22    persistence_req_sender: PersistenceReqSender,
23    sender_is_blocked: Arc<AtomicBool>,
24}
25
26impl<T> AutoPersist<T> {
27    pub fn new(
28        inspect_node: T,
29        persistence_tag: &str,
30        persistence_req_sender: PersistenceReqSender,
31    ) -> Self {
32        Self {
33            inspect_node,
34            persistence_tag: persistence_tag.to_string(),
35            persistence_req_sender,
36            sender_is_blocked: Arc::new(AtomicBool::new(false)),
37        }
38    }
39
40    /// Return a guard that derefs to `inspect_node`. When the guard is dropped,
41    /// `persistence_tag` is sent via the `persistence_req_sender`.
42    pub fn get_mut(&mut self) -> AutoPersistGuard<'_, T> {
43        AutoPersistGuard {
44            inspect_node: &mut self.inspect_node,
45            persistence_tag: &self.persistence_tag,
46            persistence_req_sender: &mut self.persistence_req_sender,
47            sender_is_blocked: Arc::clone(&self.sender_is_blocked),
48        }
49    }
50}
51
52pub struct AutoPersistGuard<'a, T> {
53    inspect_node: &'a mut T,
54    persistence_tag: &'a str,
55    persistence_req_sender: &'a mut PersistenceReqSender,
56    sender_is_blocked: Arc<AtomicBool>,
57}
58
59impl<T> Deref for AutoPersistGuard<'_, T> {
60    type Target = T;
61
62    fn deref(&self) -> &Self::Target {
63        self.inspect_node
64    }
65}
66
67impl<T> DerefMut for AutoPersistGuard<'_, T> {
68    fn deref_mut(&mut self) -> &mut Self::Target {
69        self.inspect_node
70    }
71}
72
73impl<T> Drop for AutoPersistGuard<'_, T> {
74    fn drop(&mut self) {
75        if self.persistence_req_sender.try_send(self.persistence_tag.to_string()).is_err() {
76            // If sender has not been blocked before, set bool to true and log error message
77            if self
78                .sender_is_blocked
79                .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
80                .is_ok()
81            {
82                error!("PersistenceReqSender dropped a persistence request: either buffer is full or no receiver is waiting");
83            }
84        } else {
85            // If sender has been blocked before, set bool to false and log message
86            if self
87                .sender_is_blocked
88                .compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
89                .is_ok()
90            {
91                info!("PersistenceReqSender recovered and resumed sending");
92            }
93        }
94    }
95}
96
97fn log_at_most_once_per_min_factory(
98    time_source: impl TimeSource,
99    mut log_fn: impl FnMut(String),
100) -> impl FnMut(String) {
101    let mut last_logged = None;
102    move |message| {
103        let now = zx::MonotonicInstant::from_nanos(time_source.now());
104        let should_log = match last_logged {
105            Some(last_logged) => (now - last_logged) >= zx::MonotonicDuration::from_minutes(1),
106            None => true,
107        };
108        if should_log {
109            log_fn(message);
110            last_logged.replace(now);
111        }
112    }
113}
114
115// arbitrary value
116const DEFAULT_BUFFER_SIZE: usize = 100;
117
118/// Create a sender for sending Persistence tag, and a Future representing a sending thread
119/// that forwards that tag to the Data Persistence service.
120///
121/// If the sending thread fails to forward a tag, or the Persistence Service returns an error
122/// code, an error will be logged. However, an error is only logged at most once per minute
123/// to avoid log spam.
124pub fn create_persistence_req_sender(
125    persistence_proxy: fidl_fuchsia_diagnostics_persist::DataPersistenceProxy,
126) -> (PersistenceReqSender, impl Future<Output = ()>) {
127    let (sender, mut receiver) = mpsc::channel::<String>(DEFAULT_BUFFER_SIZE);
128    let fut = async move {
129        let persistence_proxy = persistence_proxy.clone();
130        let mut log_error =
131            log_at_most_once_per_min_factory(MonotonicInstant::new(), |e| error!("{}", e));
132        while let Some(tag_name) = receiver.next().await {
133            let resp = persistence_proxy.persist(&tag_name).await;
134            match resp {
135                Ok(PersistResult::Queued) => continue,
136                Ok(other) => log_error(format!(
137                    "Persistence Service returned an error for tag {}: {:?}",
138                    tag_name, other
139                )),
140                Err(e) => log_error(format!(
141                    "Failed to send request to Persistence Service for tag {}: {}",
142                    tag_name, e
143                )),
144            }
145        }
146    };
147    (sender, fut)
148}
149
150#[cfg(test)]
151mod tests {
152    use super::*;
153    use fidl::endpoints::create_proxy_and_stream;
154    use fidl_fuchsia_diagnostics_persist::DataPersistenceRequest;
155    use fuchsia_async as fasync;
156    use fuchsia_inspect::Inspector;
157    use futures::task::Poll;
158    use std::cell::RefCell;
159    use std::pin::pin;
160    use std::rc::Rc;
161
162    #[fuchsia::test]
163    fn test_auto_persist() {
164        let (sender, mut receiver) = mpsc::channel::<String>(100);
165        let inspector = Inspector::default();
166        let node = inspector.root().create_child("node");
167        let mut auto_persist_node = AutoPersist::new(node, "some-tag", sender);
168
169        // There should be no message on the receiver end yet
170        assert!(receiver.try_next().is_err());
171
172        {
173            let _guard = auto_persist_node.get_mut();
174        }
175
176        match receiver.try_next() {
177            Ok(Some(tag)) => assert_eq!(tag, "some-tag"),
178            _ => panic!("expect message in receiver"),
179        }
180    }
181
182    #[fuchsia::test]
183    fn test_create_persistence_req_sender() {
184        let mut exec = fasync::TestExecutor::new();
185        let (persistence_proxy, mut persistence_stream) =
186            create_proxy_and_stream::<fidl_fuchsia_diagnostics_persist::DataPersistenceMarker>();
187        let (mut req_sender, req_forwarder_fut) = create_persistence_req_sender(persistence_proxy);
188
189        let mut req_forwarder_fut = pin!(req_forwarder_fut);
190
191        // Nothing has happened yet, so these futures should be Pending
192        match exec.run_until_stalled(&mut req_forwarder_fut) {
193            Poll::Pending => (),
194            other => panic!("unexpected variant: {:?}", other),
195        };
196        match exec.run_until_stalled(&mut persistence_stream.next()) {
197            Poll::Pending => (),
198            other => panic!("unexpected variant: {:?}", other),
199        };
200
201        assert!(req_sender.try_send("some-tag".to_string()).is_ok());
202
203        // req_forwarder_fut still Pending because it's a loop
204        match exec.run_until_stalled(&mut req_forwarder_fut) {
205            Poll::Pending => (),
206            other => panic!("unexpected variant: {:?}", other),
207        };
208        // There should be a message in the stream now
209        match exec.run_until_stalled(&mut persistence_stream.next()) {
210            Poll::Ready(Some(Ok(DataPersistenceRequest::Persist { tag, .. }))) => {
211                assert_eq!(tag, "some-tag")
212            }
213            other => panic!("unexpected variant: {:?}", other),
214        };
215    }
216
217    #[derive(Debug)]
218    struct FakeTimeSource {
219        now: Rc<RefCell<zx::MonotonicInstant>>,
220    }
221
222    impl TimeSource for FakeTimeSource {
223        fn now(&self) -> i64 {
224            self.now.borrow().into_nanos()
225        }
226    }
227
228    #[fuchsia::test]
229    fn test_log_at_most_once_per_min_factory() {
230        let log_count = Rc::new(RefCell::new(0));
231        let now = Rc::new(RefCell::new(zx::MonotonicInstant::from_nanos(0)));
232        let fake_time_source = FakeTimeSource { now: now.clone() };
233        let mut log =
234            log_at_most_once_per_min_factory(fake_time_source, |_| *log_count.borrow_mut() += 1);
235
236        log("message 1".to_string());
237        assert_eq!(*log_count.borrow(), 1);
238
239        // No time has passed, so log_count shouldn't increase
240        log("message 2".to_string());
241        assert_eq!(*log_count.borrow(), 1);
242
243        {
244            *now.borrow_mut() += zx::MonotonicDuration::from_seconds(30);
245        }
246
247        // Not enough time has passed, so log_count shouldn't increase
248        log("message 3".to_string());
249        assert_eq!(*log_count.borrow(), 1);
250
251        {
252            *now.borrow_mut() += zx::MonotonicDuration::from_seconds(30);
253        }
254
255        // Enough time has passed, so log_count should increase
256        log("message 3".to_string());
257        assert_eq!(*log_count.borrow(), 2);
258    }
259}