fuchsia_inspect_auto_persist/
lib.rs1use fidl_fuchsia_diagnostics_persist::PersistResult;
6use futures::channel::mpsc;
7use futures::{Future, StreamExt};
8use injectable_time::{MonotonicInstant, TimeSource};
9use log::{error, info};
10use std::ops::{Deref, DerefMut};
11use std::sync::Arc;
12use std::sync::atomic::{AtomicBool, Ordering};
13
14pub type PersistenceReqSender = mpsc::Sender<String>;
15
16pub struct AutoPersist<T> {
20 inspect_node: T,
21 persistence_tag: String,
22 persistence_req_sender: PersistenceReqSender,
23 sender_is_blocked: Arc<AtomicBool>,
24}
25
26impl<T> AutoPersist<T> {
27 pub fn new(
28 inspect_node: T,
29 persistence_tag: &str,
30 persistence_req_sender: PersistenceReqSender,
31 ) -> Self {
32 Self {
33 inspect_node,
34 persistence_tag: persistence_tag.to_string(),
35 persistence_req_sender,
36 sender_is_blocked: Arc::new(AtomicBool::new(false)),
37 }
38 }
39
40 pub fn get_mut(&mut self) -> AutoPersistGuard<'_, T> {
43 AutoPersistGuard {
44 inspect_node: &mut self.inspect_node,
45 persistence_tag: &self.persistence_tag,
46 persistence_req_sender: &mut self.persistence_req_sender,
47 sender_is_blocked: Arc::clone(&self.sender_is_blocked),
48 }
49 }
50}
51
52pub struct AutoPersistGuard<'a, T> {
53 inspect_node: &'a mut T,
54 persistence_tag: &'a str,
55 persistence_req_sender: &'a mut PersistenceReqSender,
56 sender_is_blocked: Arc<AtomicBool>,
57}
58
59impl<T> Deref for AutoPersistGuard<'_, T> {
60 type Target = T;
61
62 fn deref(&self) -> &Self::Target {
63 self.inspect_node
64 }
65}
66
67impl<T> DerefMut for AutoPersistGuard<'_, T> {
68 fn deref_mut(&mut self) -> &mut Self::Target {
69 self.inspect_node
70 }
71}
72
73impl<T> Drop for AutoPersistGuard<'_, T> {
74 fn drop(&mut self) {
75 if self.persistence_req_sender.try_send(self.persistence_tag.to_string()).is_err() {
76 if self
78 .sender_is_blocked
79 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
80 .is_ok()
81 {
82 error!(
83 "PersistenceReqSender dropped a persistence request: either buffer is full or no receiver is waiting"
84 );
85 }
86 } else {
87 if self
89 .sender_is_blocked
90 .compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
91 .is_ok()
92 {
93 info!("PersistenceReqSender recovered and resumed sending");
94 }
95 }
96 }
97}
98
99fn log_at_most_once_per_min_factory(
100 time_source: impl TimeSource,
101 mut log_fn: impl FnMut(String),
102) -> impl FnMut(String) {
103 let mut last_logged = None;
104 move |message| {
105 let now = zx::MonotonicInstant::from_nanos(time_source.now());
106 let should_log = match last_logged {
107 Some(last_logged) => (now - last_logged) >= zx::MonotonicDuration::from_minutes(1),
108 None => true,
109 };
110 if should_log {
111 log_fn(message);
112 last_logged.replace(now);
113 }
114 }
115}
116
117const DEFAULT_BUFFER_SIZE: usize = 100;
119
120pub fn create_persistence_req_sender(
127 persistence_proxy: fidl_fuchsia_diagnostics_persist::DataPersistenceProxy,
128) -> (PersistenceReqSender, impl Future<Output = ()>) {
129 let (sender, mut receiver) = mpsc::channel::<String>(DEFAULT_BUFFER_SIZE);
130 let fut = async move {
131 let persistence_proxy = persistence_proxy.clone();
132 let mut log_error =
133 log_at_most_once_per_min_factory(MonotonicInstant::new(), |e| error!("{}", e));
134 while let Some(tag_name) = receiver.next().await {
135 let resp = persistence_proxy.persist(&tag_name).await;
136 match resp {
137 Ok(PersistResult::Queued) => continue,
138 Ok(other) => log_error(format!(
139 "Persistence Service returned an error for tag {tag_name}: {other:?}"
140 )),
141 Err(e) => log_error(format!(
142 "Failed to send request to Persistence Service for tag {tag_name}: {e}"
143 )),
144 }
145 }
146 };
147 (sender, fut)
148}
149
150#[cfg(test)]
151mod tests {
152 use super::*;
153 use fidl::endpoints::create_proxy_and_stream;
154 use fidl_fuchsia_diagnostics_persist::DataPersistenceRequest;
155 use fuchsia_async as fasync;
156 use fuchsia_inspect::Inspector;
157 use futures::task::Poll;
158 use std::cell::RefCell;
159 use std::pin::pin;
160 use std::rc::Rc;
161
162 #[fuchsia::test]
163 fn test_auto_persist() {
164 let (sender, mut receiver) = mpsc::channel::<String>(100);
165 let inspector = Inspector::default();
166 let node = inspector.root().create_child("node");
167 let mut auto_persist_node = AutoPersist::new(node, "some-tag", sender);
168
169 assert!(receiver.try_next().is_err());
171
172 {
173 let _guard = auto_persist_node.get_mut();
174 }
175
176 match receiver.try_next() {
177 Ok(Some(tag)) => assert_eq!(tag, "some-tag"),
178 _ => panic!("expect message in receiver"),
179 }
180 }
181
182 #[fuchsia::test]
183 fn test_create_persistence_req_sender() {
184 let mut exec = fasync::TestExecutor::new();
185 let (persistence_proxy, mut persistence_stream) =
186 create_proxy_and_stream::<fidl_fuchsia_diagnostics_persist::DataPersistenceMarker>();
187 let (mut req_sender, req_forwarder_fut) = create_persistence_req_sender(persistence_proxy);
188
189 let mut req_forwarder_fut = pin!(req_forwarder_fut);
190
191 match exec.run_until_stalled(&mut req_forwarder_fut) {
193 Poll::Pending => (),
194 other => panic!("unexpected variant: {other:?}"),
195 };
196 match exec.run_until_stalled(&mut persistence_stream.next()) {
197 Poll::Pending => (),
198 other => panic!("unexpected variant: {other:?}"),
199 };
200
201 assert!(req_sender.try_send("some-tag".to_string()).is_ok());
202
203 match exec.run_until_stalled(&mut req_forwarder_fut) {
205 Poll::Pending => (),
206 other => panic!("unexpected variant: {other:?}"),
207 };
208 match exec.run_until_stalled(&mut persistence_stream.next()) {
210 Poll::Ready(Some(Ok(DataPersistenceRequest::Persist { tag, .. }))) => {
211 assert_eq!(tag, "some-tag")
212 }
213 other => panic!("unexpected variant: {other:?}"),
214 };
215 }
216
217 #[derive(Debug)]
218 struct FakeTimeSource {
219 now: Rc<RefCell<zx::MonotonicInstant>>,
220 }
221
222 impl TimeSource for FakeTimeSource {
223 fn now(&self) -> i64 {
224 self.now.borrow().into_nanos()
225 }
226 }
227
228 #[fuchsia::test]
229 fn test_log_at_most_once_per_min_factory() {
230 let log_count = Rc::new(RefCell::new(0));
231 let now = Rc::new(RefCell::new(zx::MonotonicInstant::from_nanos(0)));
232 let fake_time_source = FakeTimeSource { now: now.clone() };
233 let mut log =
234 log_at_most_once_per_min_factory(fake_time_source, |_| *log_count.borrow_mut() += 1);
235
236 log("message 1".to_string());
237 assert_eq!(*log_count.borrow(), 1);
238
239 log("message 2".to_string());
241 assert_eq!(*log_count.borrow(), 1);
242
243 {
244 *now.borrow_mut() += zx::MonotonicDuration::from_seconds(30);
245 }
246
247 log("message 3".to_string());
249 assert_eq!(*log_count.borrow(), 1);
250
251 {
252 *now.borrow_mut() += zx::MonotonicDuration::from_seconds(30);
253 }
254
255 log("message 3".to_string());
257 assert_eq!(*log_count.borrow(), 2);
258 }
259}