1use fuchsia_inspect::{Inspector, Node as InspectNode};
6use fuchsia_sync::Mutex as SyncMutex;
7use futures::channel::mpsc;
8use futures::lock::Mutex as AsyncMutex;
9use futures::{Future, FutureExt as _, StreamExt as _, select};
10use log::{error, info, warn};
11use std::sync::Arc;
12use {async_channel as mpmc, fuchsia_async as fasync};
13
14use crate::experimental::clock::{Timed, Timestamp};
15use crate::experimental::series::interpolation::InterpolationKind;
16use crate::experimental::series::statistic::{FoldError, Metadata, SerialStatistic};
17use crate::experimental::series::{Interpolator, MatrixSampler, SerializedBuffer, TimeMatrix};
18use crate::experimental::vec1::Vec1;
19
20pub fn serve_time_matrix_inspection(
32 node: InspectNode,
33) -> (TimeMatrixClient, impl Future<Output = Result<(), anyhow::Error>>) {
34 const TIME_MATRIX_SENDER_BUFFER_SIZE: usize = 250;
37
38 const INTERPOLATION_PERIOD: zx::MonotonicDuration = zx::MonotonicDuration::from_minutes(5);
40
41 let (sender, mut receiver) = mpsc::channel::<SharedTimeMatrix>(TIME_MATRIX_SENDER_BUFFER_SIZE);
42
43 let client = TimeMatrixClient::new(sender, node.clone_weak());
44 let server = async move {
45 let _node = node;
46 let mut matrices = vec![];
47
48 let mut interpolation = fasync::Interval::new(INTERPOLATION_PERIOD);
49 loop {
50 select! {
51 matrix = receiver.next() => {
53 match matrix {
54 Some(matrix) => {
55 matrices.push(matrix);
56 }
57 None => {
58 info!("time matrix inspection terminated.");
59 }
60 }
61 }
62 _ = interpolation.next() => {
64 for matrix in matrices.iter() {
67 let mut matrix = matrix.lock().await;
68 if let Err(error) = matrix.fold_buffered_samples() {
69 warn!("failed to fold samples into time matrix: {:?}", error);
70 }
71 if let Err(error) = matrix.interpolate(Timestamp::now()) {
79 warn!("failed to interpolate time matrix: {:?}", error);
80 }
81 }
82 }
83 }
84 }
85 };
86 (client, server)
87}
88
89pub trait ServedTimeMatrix: Interpolator + Send {
90 fn fold_buffered_samples(&mut self) -> Result<(), FoldError>;
91}
92
93pub struct BufferedSampler<T, M>
94where
95 M: MatrixSampler<T>,
96{
97 receiver: mpmc::Receiver<Timed<T>>,
98 matrix: M,
99}
100
101impl<T, M> BufferedSampler<T, M>
102where
103 M: MatrixSampler<T>,
104{
105 pub fn from_time_matrix(matrix: M) -> (mpmc::Sender<Timed<T>>, Self) {
106 const TIMED_SAMPLE_SENDER_BUFFER_SIZE: usize = 1024;
109
110 let (sender, receiver) = mpmc::bounded(TIMED_SAMPLE_SENDER_BUFFER_SIZE);
111 (sender, BufferedSampler { receiver, matrix })
112 }
113}
114
115impl<T, M> Interpolator for BufferedSampler<T, M>
116where
117 M: MatrixSampler<T>,
118{
119 fn interpolate(&mut self, timestamp: Timestamp) -> Result<(), FoldError> {
120 self.matrix.interpolate(timestamp)
121 }
122
123 fn interpolate_and_get_buffers(
124 &mut self,
125 timestamp: Timestamp,
126 ) -> Result<SerializedBuffer, FoldError> {
127 self.matrix.interpolate_and_get_buffers(timestamp)
128 }
129}
130
131impl<T, M> ServedTimeMatrix for BufferedSampler<T, M>
132where
133 T: Send,
134 M: MatrixSampler<T> + Send,
135{
136 fn fold_buffered_samples(&mut self) -> Result<(), FoldError> {
137 let mut errors = vec![];
138 loop {
139 match self.receiver.try_recv() {
140 Ok(sample) => {
141 if let Err(error) = self.matrix.fold(sample) {
142 errors.push(error);
143 }
144 }
145 Err(error) => {
146 return match error {
147 mpmc::TryRecvError::Closed => Err(FoldError::Buffer),
148 mpmc::TryRecvError::Empty => match Vec1::try_from(errors) {
149 Ok(errors) => Err(FoldError::Flush(errors)),
150 _ => Ok(()),
151 },
152 };
153 }
154 }
155 }
156 }
157}
158
159pub trait InspectSender {
160 fn inspect_time_matrix<F, P>(
167 &self,
168 name: impl Into<String>,
169 matrix: TimeMatrix<F, P>,
170 ) -> InspectedTimeMatrix<F::Sample>
171 where
172 TimeMatrix<F, P>: 'static + MatrixSampler<F::Sample> + Send,
173 Metadata<F>: 'static + Send + Sync,
174 F: SerialStatistic<P>,
175 F::Sample: Send,
176 P: InterpolationKind;
177
178 fn inspect_time_matrix_with_metadata<F, P>(
187 &self,
188 name: impl Into<String>,
189 matrix: TimeMatrix<F, P>,
190 metadata: impl Into<Metadata<F>>,
191 ) -> InspectedTimeMatrix<F::Sample>
192 where
193 TimeMatrix<F, P>: 'static + MatrixSampler<F::Sample> + Send,
194 Metadata<F>: 'static + Send + Sync,
195 F: SerialStatistic<P>,
196 F::Sample: Send,
197 P: InterpolationKind;
198}
199
200type SharedTimeMatrix = Arc<AsyncMutex<dyn ServedTimeMatrix>>;
201
202pub struct TimeMatrixClient {
203 sender: Arc<SyncMutex<mpsc::Sender<SharedTimeMatrix>>>,
207 node: InspectNode,
208}
209
210impl TimeMatrixClient {
211 fn new(sender: mpsc::Sender<SharedTimeMatrix>, node: InspectNode) -> Self {
212 Self { sender: Arc::new(SyncMutex::new(sender)), node }
213 }
214
215 fn inspect_and_record_with<F, P, R>(
216 &self,
217 name: impl Into<String>,
218 matrix: TimeMatrix<F, P>,
219 record: R,
220 ) -> InspectedTimeMatrix<F::Sample>
221 where
222 TimeMatrix<F, P>: 'static + MatrixSampler<F::Sample> + Send,
223 Metadata<F>: 'static + Send + Sync,
224 F: SerialStatistic<P>,
225 F::Sample: Send,
226 P: InterpolationKind,
227 R: 'static + Clone + Fn(&InspectNode) + Send + Sync,
228 {
229 let name = name.into();
230 let (sender, matrix) = BufferedSampler::from_time_matrix(matrix);
231 let matrix = Arc::new(AsyncMutex::new(matrix));
232 self::record_lazy_time_matrix_with(&self.node, &name, matrix.clone(), record);
233 if let Err(error) = self.sender.lock().try_send(matrix) {
234 error!("failed to send time matrix \"{}\" to inspection server: {:?}", name, error);
235 }
236 InspectedTimeMatrix::new(name, sender)
237 }
238}
239
240impl Clone for TimeMatrixClient {
241 fn clone(&self) -> Self {
242 TimeMatrixClient { sender: self.sender.clone(), node: self.node.clone_weak() }
243 }
244}
245
246impl InspectSender for TimeMatrixClient {
247 fn inspect_time_matrix<F, P>(
248 &self,
249 name: impl Into<String>,
250 matrix: TimeMatrix<F, P>,
251 ) -> InspectedTimeMatrix<F::Sample>
252 where
253 TimeMatrix<F, P>: 'static + MatrixSampler<F::Sample> + Send,
254 Metadata<F>: 'static + Send + Sync,
255 F: SerialStatistic<P>,
256 F::Sample: Send,
257 P: InterpolationKind,
258 {
259 self.inspect_and_record_with(name, matrix, |_node| {})
260 }
261
262 fn inspect_time_matrix_with_metadata<F, P>(
263 &self,
264 name: impl Into<String>,
265 matrix: TimeMatrix<F, P>,
266 metadata: impl Into<Metadata<F>>,
267 ) -> InspectedTimeMatrix<F::Sample>
268 where
269 TimeMatrix<F, P>: 'static + MatrixSampler<F::Sample> + Send,
270 Metadata<F>: 'static + Send + Sync,
271 F: SerialStatistic<P>,
272 F::Sample: Send,
273 P: InterpolationKind,
274 {
275 let metadata = Arc::new(metadata.into());
276 self.inspect_and_record_with(name, matrix, move |node| {
277 use crate::experimental::series::metadata::Metadata;
278
279 node.record_child("metadata", |node| {
280 metadata.record(node);
281 })
282 })
283 }
284}
285
286#[derive(Debug, Clone)]
287pub struct InspectedTimeMatrix<T> {
288 name: String,
289 sender: mpmc::Sender<Timed<T>>,
290}
291
292impl<T> InspectedTimeMatrix<T> {
293 pub(crate) fn new(name: impl Into<String>, sender: mpmc::Sender<Timed<T>>) -> Self {
294 Self { name: name.into(), sender }
295 }
296
297 pub fn fold(&self, sample: Timed<T>) -> Result<(), FoldError> {
298 self.sender.try_send(sample).map_err(|_| FoldError::Buffer)
301 }
302
303 pub fn fold_or_log_error(&self, sample: Timed<T>) {
304 if let Err(error) = self.sender.try_send(sample) {
305 warn!("failed to buffer sample for time matrix \"{}\": {:?}", self.name, error);
306 }
307 }
308}
309
310fn record_lazy_time_matrix_with<F>(
316 node: &InspectNode,
317 name: impl Into<String>,
318 matrix: Arc<AsyncMutex<dyn ServedTimeMatrix + Send>>,
319 f: F,
320) where
321 F: 'static + Clone + Fn(&InspectNode) + Send + Sync,
322{
323 let name = name.into();
324 node.record_lazy_child(name, move || {
325 let matrix = matrix.clone();
326 let f = f.clone();
327 async move {
328 let inspector = Inspector::default();
329 {
330 let mut matrix = matrix.lock().await;
331 if let Err(error) = matrix
332 .fold_buffered_samples()
333 .and_then(|_| matrix.interpolate_and_get_buffers(Timestamp::now()))
334 .map(|buffer| {
335 inspector.root().atomic_update(|node| {
336 node.record_string("type", buffer.data_semantic);
337 node.record_bytes("data", buffer.data);
338 f(node);
339 })
340 })
341 {
342 inspector.root().record_string("type", format!("error: {:?}", error));
343 }
344 }
345 Ok(inspector)
346 }
347 .boxed()
348 });
349}
350
351#[cfg(test)]
352mod tests {
353 use super::*;
354 use diagnostics_assertions::{AnyBytesProperty, assert_data_tree};
355 use fuchsia_async as fasync;
356 use futures::task::Poll;
357 use std::mem;
358 use std::pin::pin;
359
360 use crate::experimental::series::interpolation::LastSample;
361 use crate::experimental::series::metadata::BitSetMap;
362 use crate::experimental::series::statistic::Union;
363
364 #[fuchsia::test]
365 async fn serve_time_matrix_inspection_then_inspect_data_tree_contains_buffers() {
366 let inspector = Inspector::default();
367 let (client, _server) =
368 serve_time_matrix_inspection(inspector.root().create_child("serve_test_node"));
369 let _matrix = client
370 .inspect_time_matrix("connectivity", TimeMatrix::<Union<u64>, LastSample>::default());
371
372 assert_data_tree!(inspector, root: contains {
373 serve_test_node: {
374 connectivity: {
375 "type": "bitset",
376 "data": AnyBytesProperty,
377 }
378 }
379 });
380 }
381
382 #[fuchsia::test]
383 async fn serve_time_matrix_inspection_with_metadata_then_inspect_data_tree_contains_metadata() {
384 let inspector = Inspector::default();
385 let (client, _server) =
386 self::serve_time_matrix_inspection(inspector.root().create_child("serve_test_node"));
387 let _matrix = client.inspect_time_matrix_with_metadata(
388 "engine",
389 TimeMatrix::<Union<u64>, LastSample>::default(),
390 BitSetMap::from_ordered(["check", "oil", "battery", "coolant"]),
391 );
392
393 assert_data_tree!(inspector, root: contains {
394 serve_test_node: {
395 engine: {
396 "type": "bitset",
397 "data": AnyBytesProperty,
398 metadata: {
399 index: {
400 "0": "check",
401 "1": "oil",
402 "2": "battery",
403 "3": "coolant",
404 }
405 }
406 }
407 }
408 });
409 }
410
411 #[test]
412 fn drop_time_matrix_client_then_server_continues_execution() {
413 let mut executor = fasync::TestExecutor::new_with_fake_time();
414
415 let inspector = Inspector::default();
416 let (client, server) =
417 serve_time_matrix_inspection(inspector.root().create_child("serve_test_node"));
418 let mut server = pin!(server);
419
420 mem::drop(client);
421
422 let Poll::Pending = executor.run_until_stalled(&mut server) else {
424 panic!("time matrix inspection server terminated unexpectedly");
425 };
426 }
427}