windowed_stats/experimental/
inspect.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use derivative::Derivative;
6use fuchsia_inspect::{Inspector, Node as InspectNode};
7use fuchsia_sync::Mutex;
8use futures::FutureExt as _;
9use log::warn;
10use std::sync::Arc;
11
12use crate::experimental::clock::{Timed, Timestamp};
13use crate::experimental::series::interpolation::InterpolationKind;
14use crate::experimental::series::statistic::{FoldError, Metadata, SerialStatistic};
15use crate::experimental::series::{SerializedBuffer, TimeMatrix, TimeMatrixFold, TimeMatrixTick};
16
17pub trait InspectSender {
18    /// Sends a [`TimeMatrix`] to the client's inspection server.
19    ///
20    /// See [`inspect_time_matrix_with_metadata`].
21    ///
22    /// [`inspect_time_matrix_with_metadata`]: crate::experimental::serve::TimeMatrixClient::inspect_time_matrix_with_metadata
23    /// [`TimeMatrix`]: crate::experimental::series::TimeMatrix
24    fn inspect_time_matrix<F, P>(
25        &self,
26        name: impl Into<String>,
27        matrix: TimeMatrix<F, P>,
28    ) -> InspectedTimeMatrix<F::Sample>
29    where
30        TimeMatrix<F, P>: 'static + TimeMatrixFold<F::Sample> + Send,
31        Metadata<F>: 'static + Send + Sync,
32        F: SerialStatistic<P>,
33        F::Sample: Send,
34        P: InterpolationKind;
35
36    /// Sends a [`TimeMatrix`] to the client's inspection server.
37    ///
38    /// This function lazily records the given [`TimeMatrix`] to Inspect. The server end
39    /// periodically interpolates the matrix and records data as needed. The returned
40    /// [handle][`InspectedTimeMatrix`] can be used to fold samples into the matrix.
41    ///
42    /// [`InspectedTimeMatrix`]: crate::experimental::serve::InspectedTimeMatrix
43    /// [`TimeMatrix`]: crate::experimental::series::TimeMatrix
44    fn inspect_time_matrix_with_metadata<F, P>(
45        &self,
46        name: impl Into<String>,
47        matrix: TimeMatrix<F, P>,
48        metadata: impl Into<Metadata<F>>,
49    ) -> InspectedTimeMatrix<F::Sample>
50    where
51        TimeMatrix<F, P>: 'static + TimeMatrixFold<F::Sample> + Send,
52        Metadata<F>: 'static + Send + Sync,
53        F: SerialStatistic<P>,
54        F::Sample: Send,
55        P: InterpolationKind;
56}
57
58type SharedTimeMatrix = Arc<Mutex<dyn TimeMatrixTick>>;
59
60pub struct TimeMatrixClient {
61    node: InspectNode,
62}
63
64impl TimeMatrixClient {
65    /// Create a new TimeMatrixClient that holds a given Inspect Node
66    ///
67    /// Note: If TimeMatrixClient is constructed with a weak reference to Inspect
68    /// Node, then the original Node needs to be preserved for time series
69    /// data shows up in Inspect. If TimeMatrixClient is constructed with the original
70    /// Inspect node, then the TimeMatrixClient itself needs to be preserved.
71    pub fn new(node: InspectNode) -> Self {
72        Self { node }
73    }
74
75    fn inspect_and_record_with<F, P, R>(
76        &self,
77        name: impl Into<String>,
78        matrix: TimeMatrix<F, P>,
79        record: R,
80    ) -> InspectedTimeMatrix<F::Sample>
81    where
82        TimeMatrix<F, P>: 'static + TimeMatrixFold<F::Sample> + Send,
83        Metadata<F>: 'static + Send + Sync,
84        F: SerialStatistic<P>,
85        F::Sample: Send,
86        P: InterpolationKind,
87        R: 'static + Clone + Fn(&InspectNode) + Send + Sync,
88    {
89        let name = name.into();
90        let matrix = Arc::new(Mutex::new(matrix));
91        self::record_lazy_time_matrix_with(&self.node, &name, matrix.clone(), record);
92        InspectedTimeMatrix::new(name, matrix)
93    }
94}
95
96impl Clone for TimeMatrixClient {
97    fn clone(&self) -> Self {
98        TimeMatrixClient { node: self.node.clone_weak() }
99    }
100}
101
102impl InspectSender for TimeMatrixClient {
103    fn inspect_time_matrix<F, P>(
104        &self,
105        name: impl Into<String>,
106        matrix: TimeMatrix<F, P>,
107    ) -> InspectedTimeMatrix<F::Sample>
108    where
109        TimeMatrix<F, P>: 'static + TimeMatrixFold<F::Sample> + Send,
110        Metadata<F>: 'static + Send + Sync,
111        F: SerialStatistic<P>,
112        F::Sample: Send,
113        P: InterpolationKind,
114    {
115        self.inspect_and_record_with(name, matrix, |_node| {})
116    }
117
118    fn inspect_time_matrix_with_metadata<F, P>(
119        &self,
120        name: impl Into<String>,
121        matrix: TimeMatrix<F, P>,
122        metadata: impl Into<Metadata<F>>,
123    ) -> InspectedTimeMatrix<F::Sample>
124    where
125        TimeMatrix<F, P>: 'static + TimeMatrixFold<F::Sample> + Send,
126        Metadata<F>: 'static + Send + Sync,
127        F: SerialStatistic<P>,
128        F::Sample: Send,
129        P: InterpolationKind,
130    {
131        let metadata = Arc::new(metadata.into());
132        self.inspect_and_record_with(name, matrix, move |node| {
133            use crate::experimental::series::metadata::Metadata;
134
135            node.record_child("metadata", |node| {
136                metadata.record(node);
137            })
138        })
139    }
140}
141
142#[derive(Derivative)]
143#[derivative(Debug, Clone)]
144pub struct InspectedTimeMatrix<T> {
145    name: String,
146    #[derivative(Debug = "ignore")]
147    matrix: Arc<Mutex<dyn TimeMatrixFold<T> + Send>>,
148}
149
150impl<T> InspectedTimeMatrix<T> {
151    pub(crate) fn new(
152        name: impl Into<String>,
153        matrix: Arc<Mutex<dyn TimeMatrixFold<T> + Send>>,
154    ) -> Self {
155        Self { name: name.into(), matrix }
156    }
157
158    /// Folding a sample to the time series.
159    ///
160    /// Note: the timestamp for the time series is generated only after acquiring a lock.
161    /// If the lock is already being held by another thread/process that reads the Inspect
162    /// data, the timestamp generated may fall on the time interval after.
163    ///
164    /// TODO(https://fxbug.dev/457421826) - Fix issue with delayed timestamp
165    pub fn fold(&self, sample: T) -> Result<(), FoldError> {
166        self.matrix.lock().fold(Timed::now(sample))
167    }
168
169    pub fn fold_or_log_error(&self, sample: T) {
170        if let Err(error) = self.matrix.lock().fold(Timed::now(sample)) {
171            warn!("failed to fold sample into time matrix \"{}\": {:?}", self.name, error);
172        }
173    }
174
175    /// Fold a sample with the given timestamp.
176    ///
177    /// This API was only added to avoid breaking changes to the `serve` API, which is
178    /// currently unused.
179    ///
180    /// TODO(https://fxbug.dev/457423931) - Remove this API
181    pub(crate) fn fold_at(&self, sample: Timed<T>) -> Result<(), FoldError> {
182        self.matrix.lock().fold(sample)
183    }
184}
185
186/// Records a lazy child node in the given node that records buffers and metadata for the given
187/// time matrix.
188///
189/// The function `f` is passed a node to record arbitrary data after the data semantic and buffers
190/// of the time matrix have been recorded using that same node.
191fn record_lazy_time_matrix_with<F>(
192    node: &InspectNode,
193    name: impl Into<String>,
194    matrix: Arc<Mutex<dyn TimeMatrixTick + Send>>,
195    f: F,
196) where
197    F: 'static + Clone + Fn(&InspectNode) + Send + Sync,
198{
199    let name = name.into();
200    node.record_lazy_child(name, move || {
201        let matrix = matrix.clone();
202        let f = f.clone();
203        async move {
204            let inspector = Inspector::default();
205            let result = matrix.lock().tick_and_get_buffers(Timestamp::now());
206            inspector.root().atomic_update(|node| {
207                if result.is_ok() {
208                    f(node);
209                }
210                SerializedBuffer::write_to_inspect_or_error(result, node);
211            });
212            Ok(inspector)
213        }
214        .boxed()
215    });
216}
217
218#[cfg(test)]
219mod tests {
220    use super::*;
221    use diagnostics_assertions::{AnyBytesProperty, assert_data_tree};
222    use fuchsia_async as fasync;
223
224    use crate::experimental::series::SamplingProfile;
225    use crate::experimental::series::interpolation::{ConstantSample, LastSample};
226    use crate::experimental::series::metadata::BitSetMap;
227    use crate::experimental::series::statistic::{Max, Union};
228
229    #[fuchsia::test]
230    fn inspected_time_matrix_folded_sample_appears_in_inspect() {
231        let mut exec = fasync::TestExecutor::new_with_fake_time();
232        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
233
234        let inspector = Inspector::default();
235        let client = TimeMatrixClient::new(inspector.root().create_child("serve_test_node"));
236        let time_matrix = TimeMatrix::<Max<u64>, ConstantSample>::new(
237            SamplingProfile::highly_granular(),
238            ConstantSample::default(),
239        );
240        let inspected_matrix = client.inspect_time_matrix("time_series_1", time_matrix);
241
242        inspected_matrix.fold(15).unwrap();
243        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
244
245        assert_data_tree!(@executor exec, inspector, root: contains {
246            serve_test_node: {
247                time_series_1: {
248                    "type": "gauge",
249                    "data": vec![
250                        1u8, // version number
251                        3, 0, 0, 0, // created timestamp
252                        10, 0, 0, 0, // last timestamp
253                        1, 0, // type: simple8b RLE; subtype: unsigned
254                        16, 0, // series 1: length in bytes
255                        10, 0, // series 1 granularity: 10s
256                        1, 0, // number of selector elements and value blocks
257                        0, 0,    // head selector index
258                        1,    // number of values in last block
259                        0x0f, // RLE selector
260                        15, 0, 0, 0, 0, 0, 1, 0, // value 15 appears 1 time
261                        7, 0, // series 2: length in bytes
262                        60, 0, // series 2 granularity: 60s
263                        0, 0, // number of selector elements and value blocks
264                        0, 0, // head selector index
265                        0, // number of values in last block
266                    ]
267                }
268            }
269        });
270    }
271
272    #[fuchsia::test]
273    async fn inspect_time_matrix_then_inspect_data_tree_contains_buffers() {
274        let inspector = Inspector::default();
275        let client = TimeMatrixClient::new(inspector.root().create_child("serve_test_node"));
276        let _matrix = client
277            .inspect_time_matrix("connectivity", TimeMatrix::<Union<u64>, LastSample>::default());
278
279        assert_data_tree!(inspector, root: contains {
280            serve_test_node: {
281                connectivity: {
282                    "type": "bitset",
283                    "data": AnyBytesProperty,
284                }
285            }
286        });
287    }
288
289    #[fuchsia::test]
290    async fn inspect_time_matrix_with_metadata_then_inspect_data_tree_contains_metadata() {
291        let inspector = Inspector::default();
292        let client = TimeMatrixClient::new(inspector.root().create_child("serve_test_node"));
293        let _matrix = client.inspect_time_matrix_with_metadata(
294            "engine",
295            TimeMatrix::<Union<u64>, LastSample>::default(),
296            BitSetMap::from_ordered(["check", "oil", "battery", "coolant"]),
297        );
298
299        assert_data_tree!(inspector, root: contains {
300            serve_test_node: {
301                engine: {
302                    "type": "bitset",
303                    "data": AnyBytesProperty,
304                    metadata: {
305                        index: {
306                            "0": "check",
307                            "1": "oil",
308                            "2": "battery",
309                            "3": "coolant",
310                        }
311                    }
312                }
313            }
314        });
315    }
316}