windowed_stats/experimental/
serve.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fuchsia_inspect::{Inspector, Node as InspectNode};
6use fuchsia_sync::Mutex as SyncMutex;
7use futures::channel::mpsc;
8use futures::lock::Mutex as AsyncMutex;
9use futures::{Future, FutureExt as _, StreamExt as _, select};
10use log::{error, info, warn};
11use std::sync::Arc;
12use {async_channel as mpmc, fuchsia_async as fasync};
13
14use crate::experimental::clock::{Timed, Timestamp};
15use crate::experimental::series::interpolation::InterpolationKind;
16use crate::experimental::series::statistic::{FoldError, Metadata, SerialStatistic};
17use crate::experimental::series::{Interpolator, MatrixSampler, SerializedBuffer, TimeMatrix};
18use crate::experimental::vec1::Vec1;
19
20// TODO(https://fxbug.dev/375489301): It is not possible to inject a mock time matrix into this
21//                                    function. Refactor the function so that a unit test can
22//                                    assert that interpolation occurs after an interval of time.
23/// Creates a client and server for interpolating and recording time matrices to the given [Inspect
24/// node][`Node`].
25///
26/// The client end can be used to instrument and send a [`TimeMatrix`] to the server. The server
27/// end must be polled to incorporate and interpolate time matrices.
28///
29/// [`Node`]: fuchsia_inspect::Node
30/// [`TimeMatrix`]: crate::experimental::series::TimeMatrix
31pub fn serve_time_matrix_inspection(
32    node: InspectNode,
33) -> (TimeMatrixClient, impl Future<Output = Result<(), anyhow::Error>>) {
34    /// The buffer capacity of the MPSC channel through which time matrices are sent from clients
35    /// to the server future.
36    const TIME_MATRIX_SENDER_BUFFER_SIZE: usize = 250;
37
38    /// The duration between interpolating data in inspected time matrices.
39    const INTERPOLATION_PERIOD: zx::MonotonicDuration = zx::MonotonicDuration::from_minutes(5);
40
41    let (sender, mut receiver) = mpsc::channel::<SharedTimeMatrix>(TIME_MATRIX_SENDER_BUFFER_SIZE);
42
43    let client = TimeMatrixClient::new(sender, node.clone_weak());
44    let server = async move {
45        let _node = node;
46        let mut matrices = vec![];
47
48        let mut interpolation = fasync::Interval::new(INTERPOLATION_PERIOD);
49        loop {
50            select! {
51                // Incorporate time matrices received from the client.
52                matrix = receiver.next() => {
53                    match matrix {
54                        Some(matrix) => {
55                            matrices.push(matrix);
56                        }
57                        None => {
58                            info!("time matrix inspection terminated.");
59                        }
60                    }
61                }
62                // Periodically fold buffered samples into and interpolate time matrices.
63                _ = interpolation.next() => {
64                    // TODO(https://fxbug.dev/375255877): Log more information, such as the name
65                    //                                    associated with the matrix.
66                    for matrix in matrices.iter() {
67                        let mut matrix = matrix.lock().await;
68                        if let Err(error) = matrix.fold_buffered_samples() {
69                            warn!("failed to fold samples into time matrix: {:?}", error);
70                        }
71                        // Querying the current timestamp for each matrix like this introduces a
72                        // bias: the more recently a matrix has been pushed into `matrices`, the
73                        // more recent the timestamp of its interpolation. However, folding
74                        // buffered samples may take a non-trivial amount of time and a sample may
75                        // arrive as a buffer is being drained. The current timestamp must be
76                        // queried after the drain is complete to guarantee that it is more recent
77                        // than any timestamp associated with a sample.
78                        if let Err(error) = matrix.interpolate(Timestamp::now()) {
79                            warn!("failed to interpolate time matrix: {:?}", error);
80                        }
81                    }
82                }
83            }
84        }
85    };
86    (client, server)
87}
88
89pub trait ServedTimeMatrix: Interpolator + Send {
90    fn fold_buffered_samples(&mut self) -> Result<(), FoldError>;
91}
92
93pub struct BufferedSampler<T, M>
94where
95    M: MatrixSampler<T>,
96{
97    receiver: mpmc::Receiver<Timed<T>>,
98    matrix: M,
99}
100
101impl<T, M> BufferedSampler<T, M>
102where
103    M: MatrixSampler<T>,
104{
105    pub fn from_time_matrix(matrix: M) -> (mpmc::Sender<Timed<T>>, Self) {
106        /// The buffer capacity of the MPMC channel through which timed samples are sent to
107        /// `BufferedSampler`s.
108        const TIMED_SAMPLE_SENDER_BUFFER_SIZE: usize = 1024;
109
110        let (sender, receiver) = mpmc::bounded(TIMED_SAMPLE_SENDER_BUFFER_SIZE);
111        (sender, BufferedSampler { receiver, matrix })
112    }
113}
114
115impl<T, M> Interpolator for BufferedSampler<T, M>
116where
117    M: MatrixSampler<T>,
118{
119    fn interpolate(&mut self, timestamp: Timestamp) -> Result<(), FoldError> {
120        self.matrix.interpolate(timestamp)
121    }
122
123    fn interpolate_and_get_buffers(
124        &mut self,
125        timestamp: Timestamp,
126    ) -> Result<SerializedBuffer, FoldError> {
127        self.matrix.interpolate_and_get_buffers(timestamp)
128    }
129}
130
131impl<T, M> ServedTimeMatrix for BufferedSampler<T, M>
132where
133    T: Send,
134    M: MatrixSampler<T> + Send,
135{
136    fn fold_buffered_samples(&mut self) -> Result<(), FoldError> {
137        let mut errors = vec![];
138        loop {
139            match self.receiver.try_recv() {
140                Ok(sample) => {
141                    if let Err(error) = self.matrix.fold(sample) {
142                        errors.push(error);
143                    }
144                }
145                Err(error) => {
146                    return match error {
147                        mpmc::TryRecvError::Closed => Err(FoldError::Buffer),
148                        mpmc::TryRecvError::Empty => match Vec1::try_from(errors) {
149                            Ok(errors) => Err(FoldError::Flush(errors)),
150                            _ => Ok(()),
151                        },
152                    };
153                }
154            }
155        }
156    }
157}
158
159pub trait InspectSender {
160    /// Sends a [`TimeMatrix`] to the client's inspection server.
161    ///
162    /// See [`inspect_time_matrix_with_metadata`].
163    ///
164    /// [`inspect_time_matrix_with_metadata`]: crate::experimental::serve::TimeMatrixClient::inspect_time_matrix_with_metadata
165    /// [`TimeMatrix`]: crate::experimental::series::TimeMatrix
166    fn inspect_time_matrix<F, P>(
167        &self,
168        name: impl Into<String>,
169        matrix: TimeMatrix<F, P>,
170    ) -> InspectedTimeMatrix<F::Sample>
171    where
172        TimeMatrix<F, P>: 'static + MatrixSampler<F::Sample> + Send,
173        Metadata<F>: 'static + Send + Sync,
174        F: SerialStatistic<P>,
175        F::Sample: Send,
176        P: InterpolationKind;
177
178    /// Sends a [`TimeMatrix`] to the client's inspection server.
179    ///
180    /// This function lazily records the given [`TimeMatrix`] to Inspect. The server end
181    /// periodically interpolates the matrix and records data as needed. The returned
182    /// [handle][`InspectedTimeMatrix`] can be used to fold samples into the matrix.
183    ///
184    /// [`InspectedTimeMatrix`]: crate::experimental::serve::InspectedTimeMatrix
185    /// [`TimeMatrix`]: crate::experimental::series::TimeMatrix
186    fn inspect_time_matrix_with_metadata<F, P>(
187        &self,
188        name: impl Into<String>,
189        matrix: TimeMatrix<F, P>,
190        metadata: impl Into<Metadata<F>>,
191    ) -> InspectedTimeMatrix<F::Sample>
192    where
193        TimeMatrix<F, P>: 'static + MatrixSampler<F::Sample> + Send,
194        Metadata<F>: 'static + Send + Sync,
195        F: SerialStatistic<P>,
196        F::Sample: Send,
197        P: InterpolationKind;
198}
199
200type SharedTimeMatrix = Arc<AsyncMutex<dyn ServedTimeMatrix>>;
201
202pub struct TimeMatrixClient {
203    // TODO(https://fxbug.dev/432324973): Synchronizing the sender end of a channel like this is an
204    //                                    anti-pattern. Consider removing the mutex. See the linked
205    //                                    bug for discussion of the ramifications.
206    sender: Arc<SyncMutex<mpsc::Sender<SharedTimeMatrix>>>,
207    node: InspectNode,
208}
209
210impl TimeMatrixClient {
211    fn new(sender: mpsc::Sender<SharedTimeMatrix>, node: InspectNode) -> Self {
212        Self { sender: Arc::new(SyncMutex::new(sender)), node }
213    }
214
215    fn inspect_and_record_with<F, P, R>(
216        &self,
217        name: impl Into<String>,
218        matrix: TimeMatrix<F, P>,
219        record: R,
220    ) -> InspectedTimeMatrix<F::Sample>
221    where
222        TimeMatrix<F, P>: 'static + MatrixSampler<F::Sample> + Send,
223        Metadata<F>: 'static + Send + Sync,
224        F: SerialStatistic<P>,
225        F::Sample: Send,
226        P: InterpolationKind,
227        R: 'static + Clone + Fn(&InspectNode) + Send + Sync,
228    {
229        let name = name.into();
230        let (sender, matrix) = BufferedSampler::from_time_matrix(matrix);
231        let matrix = Arc::new(AsyncMutex::new(matrix));
232        self::record_lazy_time_matrix_with(&self.node, &name, matrix.clone(), record);
233        if let Err(error) = self.sender.lock().try_send(matrix) {
234            error!("failed to send time matrix \"{}\" to inspection server: {:?}", name, error);
235        }
236        InspectedTimeMatrix::new(name, sender)
237    }
238}
239
240impl Clone for TimeMatrixClient {
241    fn clone(&self) -> Self {
242        TimeMatrixClient { sender: self.sender.clone(), node: self.node.clone_weak() }
243    }
244}
245
246impl InspectSender for TimeMatrixClient {
247    fn inspect_time_matrix<F, P>(
248        &self,
249        name: impl Into<String>,
250        matrix: TimeMatrix<F, P>,
251    ) -> InspectedTimeMatrix<F::Sample>
252    where
253        TimeMatrix<F, P>: 'static + MatrixSampler<F::Sample> + Send,
254        Metadata<F>: 'static + Send + Sync,
255        F: SerialStatistic<P>,
256        F::Sample: Send,
257        P: InterpolationKind,
258    {
259        self.inspect_and_record_with(name, matrix, |_node| {})
260    }
261
262    fn inspect_time_matrix_with_metadata<F, P>(
263        &self,
264        name: impl Into<String>,
265        matrix: TimeMatrix<F, P>,
266        metadata: impl Into<Metadata<F>>,
267    ) -> InspectedTimeMatrix<F::Sample>
268    where
269        TimeMatrix<F, P>: 'static + MatrixSampler<F::Sample> + Send,
270        Metadata<F>: 'static + Send + Sync,
271        F: SerialStatistic<P>,
272        F::Sample: Send,
273        P: InterpolationKind,
274    {
275        let metadata = Arc::new(metadata.into());
276        self.inspect_and_record_with(name, matrix, move |node| {
277            use crate::experimental::series::metadata::Metadata;
278
279            node.record_child("metadata", |node| {
280                metadata.record(node);
281            })
282        })
283    }
284}
285
286#[derive(Debug, Clone)]
287pub struct InspectedTimeMatrix<T> {
288    name: String,
289    sender: mpmc::Sender<Timed<T>>,
290}
291
292impl<T> InspectedTimeMatrix<T> {
293    pub(crate) fn new(name: impl Into<String>, sender: mpmc::Sender<Timed<T>>) -> Self {
294        Self { name: name.into(), sender }
295    }
296
297    pub fn fold(&self, sample: Timed<T>) -> Result<(), FoldError> {
298        // TODO(https://fxbug.dev/432323121): Place the data that could not be sent into the
299        //                                    channel into the error. See `FoldError`.
300        self.sender.try_send(sample).map_err(|_| FoldError::Buffer)
301    }
302
303    pub fn fold_or_log_error(&self, sample: Timed<T>) {
304        if let Err(error) = self.sender.try_send(sample) {
305            warn!("failed to buffer sample for time matrix \"{}\": {:?}", self.name, error);
306        }
307    }
308}
309
310/// Records a lazy child node in the given node that records buffers and metadata for the given
311/// time matrix.
312///
313/// The function `f` is passed a node to record arbitrary data after the data semantic and buffers
314/// of the time matrix have been recorded using that same node.
315fn record_lazy_time_matrix_with<F>(
316    node: &InspectNode,
317    name: impl Into<String>,
318    matrix: Arc<AsyncMutex<dyn ServedTimeMatrix + Send>>,
319    f: F,
320) where
321    F: 'static + Clone + Fn(&InspectNode) + Send + Sync,
322{
323    let name = name.into();
324    node.record_lazy_child(name, move || {
325        let matrix = matrix.clone();
326        let f = f.clone();
327        async move {
328            let inspector = Inspector::default();
329            {
330                let mut matrix = matrix.lock().await;
331                if let Err(error) = matrix
332                    .fold_buffered_samples()
333                    .and_then(|_| matrix.interpolate_and_get_buffers(Timestamp::now()))
334                    .map(|buffer| {
335                        inspector.root().atomic_update(|node| {
336                            node.record_string("type", buffer.data_semantic);
337                            node.record_bytes("data", buffer.data);
338                            f(node);
339                        })
340                    })
341                {
342                    inspector.root().record_string("type", format!("error: {:?}", error));
343                }
344            }
345            Ok(inspector)
346        }
347        .boxed()
348    });
349}
350
351#[cfg(test)]
352mod tests {
353    use super::*;
354    use diagnostics_assertions::{AnyBytesProperty, assert_data_tree};
355    use fuchsia_async as fasync;
356    use futures::task::Poll;
357    use std::mem;
358    use std::pin::pin;
359
360    use crate::experimental::series::interpolation::LastSample;
361    use crate::experimental::series::metadata::BitSetMap;
362    use crate::experimental::series::statistic::Union;
363
364    #[fuchsia::test]
365    async fn serve_time_matrix_inspection_then_inspect_data_tree_contains_buffers() {
366        let inspector = Inspector::default();
367        let (client, _server) =
368            serve_time_matrix_inspection(inspector.root().create_child("serve_test_node"));
369        let _matrix = client
370            .inspect_time_matrix("connectivity", TimeMatrix::<Union<u64>, LastSample>::default());
371
372        assert_data_tree!(inspector, root: contains {
373            serve_test_node: {
374                connectivity: {
375                    "type": "bitset",
376                    "data": AnyBytesProperty,
377                }
378            }
379        });
380    }
381
382    #[fuchsia::test]
383    async fn serve_time_matrix_inspection_with_metadata_then_inspect_data_tree_contains_metadata() {
384        let inspector = Inspector::default();
385        let (client, _server) =
386            self::serve_time_matrix_inspection(inspector.root().create_child("serve_test_node"));
387        let _matrix = client.inspect_time_matrix_with_metadata(
388            "engine",
389            TimeMatrix::<Union<u64>, LastSample>::default(),
390            BitSetMap::from_ordered(["check", "oil", "battery", "coolant"]),
391        );
392
393        assert_data_tree!(inspector, root: contains {
394            serve_test_node: {
395                engine: {
396                    "type": "bitset",
397                    "data": AnyBytesProperty,
398                    metadata: {
399                        index: {
400                            "0": "check",
401                            "1": "oil",
402                            "2": "battery",
403                            "3": "coolant",
404                        }
405                    }
406                }
407            }
408        });
409    }
410
411    #[test]
412    fn drop_time_matrix_client_then_server_continues_execution() {
413        let mut executor = fasync::TestExecutor::new_with_fake_time();
414
415        let inspector = Inspector::default();
416        let (client, server) =
417            serve_time_matrix_inspection(inspector.root().create_child("serve_test_node"));
418        let mut server = pin!(server);
419
420        mem::drop(client);
421
422        // The server future should continue execution even if its associated client is dropped.
423        let Poll::Pending = executor.run_until_stalled(&mut server) else {
424            panic!("time matrix inspection server terminated unexpectedly");
425        };
426    }
427}