fuchsia_inspect_auto_persist/
lib.rs
1use fidl_fuchsia_diagnostics_persist::PersistResult;
6use futures::channel::mpsc;
7use futures::{Future, StreamExt};
8use injectable_time::{MonotonicInstant, TimeSource};
9use log::{error, info};
10use std::ops::{Deref, DerefMut};
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::Arc;
13
14pub type PersistenceReqSender = mpsc::Sender<String>;
15
16pub struct AutoPersist<T> {
20 inspect_node: T,
21 persistence_tag: String,
22 persistence_req_sender: PersistenceReqSender,
23 sender_is_blocked: Arc<AtomicBool>,
24}
25
26impl<T> AutoPersist<T> {
27 pub fn new(
28 inspect_node: T,
29 persistence_tag: &str,
30 persistence_req_sender: PersistenceReqSender,
31 ) -> Self {
32 Self {
33 inspect_node,
34 persistence_tag: persistence_tag.to_string(),
35 persistence_req_sender,
36 sender_is_blocked: Arc::new(AtomicBool::new(false)),
37 }
38 }
39
40 pub fn get_mut(&mut self) -> AutoPersistGuard<'_, T> {
43 AutoPersistGuard {
44 inspect_node: &mut self.inspect_node,
45 persistence_tag: &self.persistence_tag,
46 persistence_req_sender: &mut self.persistence_req_sender,
47 sender_is_blocked: Arc::clone(&self.sender_is_blocked),
48 }
49 }
50}
51
52pub struct AutoPersistGuard<'a, T> {
53 inspect_node: &'a mut T,
54 persistence_tag: &'a str,
55 persistence_req_sender: &'a mut PersistenceReqSender,
56 sender_is_blocked: Arc<AtomicBool>,
57}
58
59impl<T> Deref for AutoPersistGuard<'_, T> {
60 type Target = T;
61
62 fn deref(&self) -> &Self::Target {
63 self.inspect_node
64 }
65}
66
67impl<T> DerefMut for AutoPersistGuard<'_, T> {
68 fn deref_mut(&mut self) -> &mut Self::Target {
69 self.inspect_node
70 }
71}
72
73impl<T> Drop for AutoPersistGuard<'_, T> {
74 fn drop(&mut self) {
75 if self.persistence_req_sender.try_send(self.persistence_tag.to_string()).is_err() {
76 if self
78 .sender_is_blocked
79 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
80 .is_ok()
81 {
82 error!("PersistenceReqSender dropped a persistence request: either buffer is full or no receiver is waiting");
83 }
84 } else {
85 if self
87 .sender_is_blocked
88 .compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
89 .is_ok()
90 {
91 info!("PersistenceReqSender recovered and resumed sending");
92 }
93 }
94 }
95}
96
97fn log_at_most_once_per_min_factory(
98 time_source: impl TimeSource,
99 mut log_fn: impl FnMut(String),
100) -> impl FnMut(String) {
101 let mut last_logged = None;
102 move |message| {
103 let now = zx::MonotonicInstant::from_nanos(time_source.now());
104 let should_log = match last_logged {
105 Some(last_logged) => (now - last_logged) >= zx::MonotonicDuration::from_minutes(1),
106 None => true,
107 };
108 if should_log {
109 log_fn(message);
110 last_logged.replace(now);
111 }
112 }
113}
114
115const DEFAULT_BUFFER_SIZE: usize = 100;
117
118pub fn create_persistence_req_sender(
125 persistence_proxy: fidl_fuchsia_diagnostics_persist::DataPersistenceProxy,
126) -> (PersistenceReqSender, impl Future<Output = ()>) {
127 let (sender, mut receiver) = mpsc::channel::<String>(DEFAULT_BUFFER_SIZE);
128 let fut = async move {
129 let persistence_proxy = persistence_proxy.clone();
130 let mut log_error =
131 log_at_most_once_per_min_factory(MonotonicInstant::new(), |e| error!("{}", e));
132 while let Some(tag_name) = receiver.next().await {
133 let resp = persistence_proxy.persist(&tag_name).await;
134 match resp {
135 Ok(PersistResult::Queued) => continue,
136 Ok(other) => log_error(format!(
137 "Persistence Service returned an error for tag {}: {:?}",
138 tag_name, other
139 )),
140 Err(e) => log_error(format!(
141 "Failed to send request to Persistence Service for tag {}: {}",
142 tag_name, e
143 )),
144 }
145 }
146 };
147 (sender, fut)
148}
149
150#[cfg(test)]
151mod tests {
152 use super::*;
153 use fidl::endpoints::create_proxy_and_stream;
154 use fidl_fuchsia_diagnostics_persist::DataPersistenceRequest;
155 use fuchsia_async as fasync;
156 use fuchsia_inspect::Inspector;
157 use futures::task::Poll;
158 use std::cell::RefCell;
159 use std::pin::pin;
160 use std::rc::Rc;
161
162 #[fuchsia::test]
163 fn test_auto_persist() {
164 let (sender, mut receiver) = mpsc::channel::<String>(100);
165 let inspector = Inspector::default();
166 let node = inspector.root().create_child("node");
167 let mut auto_persist_node = AutoPersist::new(node, "some-tag", sender);
168
169 assert!(receiver.try_next().is_err());
171
172 {
173 let _guard = auto_persist_node.get_mut();
174 }
175
176 match receiver.try_next() {
177 Ok(Some(tag)) => assert_eq!(tag, "some-tag"),
178 _ => panic!("expect message in receiver"),
179 }
180 }
181
182 #[fuchsia::test]
183 fn test_create_persistence_req_sender() {
184 let mut exec = fasync::TestExecutor::new();
185 let (persistence_proxy, mut persistence_stream) =
186 create_proxy_and_stream::<fidl_fuchsia_diagnostics_persist::DataPersistenceMarker>();
187 let (mut req_sender, req_forwarder_fut) = create_persistence_req_sender(persistence_proxy);
188
189 let mut req_forwarder_fut = pin!(req_forwarder_fut);
190
191 match exec.run_until_stalled(&mut req_forwarder_fut) {
193 Poll::Pending => (),
194 other => panic!("unexpected variant: {:?}", other),
195 };
196 match exec.run_until_stalled(&mut persistence_stream.next()) {
197 Poll::Pending => (),
198 other => panic!("unexpected variant: {:?}", other),
199 };
200
201 assert!(req_sender.try_send("some-tag".to_string()).is_ok());
202
203 match exec.run_until_stalled(&mut req_forwarder_fut) {
205 Poll::Pending => (),
206 other => panic!("unexpected variant: {:?}", other),
207 };
208 match exec.run_until_stalled(&mut persistence_stream.next()) {
210 Poll::Ready(Some(Ok(DataPersistenceRequest::Persist { tag, .. }))) => {
211 assert_eq!(tag, "some-tag")
212 }
213 other => panic!("unexpected variant: {:?}", other),
214 };
215 }
216
217 #[derive(Debug)]
218 struct FakeTimeSource {
219 now: Rc<RefCell<zx::MonotonicInstant>>,
220 }
221
222 impl TimeSource for FakeTimeSource {
223 fn now(&self) -> i64 {
224 self.now.borrow().into_nanos()
225 }
226 }
227
228 #[fuchsia::test]
229 fn test_log_at_most_once_per_min_factory() {
230 let log_count = Rc::new(RefCell::new(0));
231 let now = Rc::new(RefCell::new(zx::MonotonicInstant::from_nanos(0)));
232 let fake_time_source = FakeTimeSource { now: now.clone() };
233 let mut log =
234 log_at_most_once_per_min_factory(fake_time_source, |_| *log_count.borrow_mut() += 1);
235
236 log("message 1".to_string());
237 assert_eq!(*log_count.borrow(), 1);
238
239 log("message 2".to_string());
241 assert_eq!(*log_count.borrow(), 1);
242
243 {
244 *now.borrow_mut() += zx::MonotonicDuration::from_seconds(30);
245 }
246
247 log("message 3".to_string());
249 assert_eq!(*log_count.borrow(), 1);
250
251 {
252 *now.borrow_mut() += zx::MonotonicDuration::from_seconds(30);
253 }
254
255 log("message 3".to_string());
257 assert_eq!(*log_count.borrow(), 2);
258 }
259}