windowed_stats/experimental/
inspect.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use derivative::Derivative;
6use fuchsia_inspect::{Inspector, Node as InspectNode};
7use fuchsia_sync::Mutex;
8use futures::FutureExt as _;
9use log::warn;
10use std::sync::Arc;
11
12use crate::experimental::clock::{Timed, Timestamp};
13use crate::experimental::series::interpolation::InterpolationKind;
14use crate::experimental::series::statistic::{FoldError, Metadata, SerialStatistic};
15use crate::experimental::series::{SerializedBuffer, TimeMatrix, TimeMatrixFold, TimeMatrixTick};
16
17pub trait InspectSender {
18    /// Sends a [`TimeMatrix`] to the client's inspection server.
19    ///
20    /// See [`inspect_time_matrix_with_metadata`].
21    ///
22    /// [`inspect_time_matrix_with_metadata`]: crate::experimental::serve::TimeMatrixClient::inspect_time_matrix_with_metadata
23    /// [`TimeMatrix`]: crate::experimental::series::TimeMatrix
24    fn inspect_time_matrix<F, P>(
25        &self,
26        name: impl Into<String>,
27        matrix: TimeMatrix<F, P>,
28    ) -> InspectedTimeMatrix<F::Sample>
29    where
30        TimeMatrix<F, P>: 'static + TimeMatrixFold<F::Sample> + Send,
31        Metadata<F>: 'static + Send + Sync,
32        F: SerialStatistic<P>,
33        F::Sample: Send,
34        P: InterpolationKind;
35
36    /// Sends a [`TimeMatrix`] to the client's inspection server.
37    ///
38    /// This function lazily records the given [`TimeMatrix`] to Inspect. The server end
39    /// periodically interpolates the matrix and records data as needed. The returned
40    /// [handle][`InspectedTimeMatrix`] can be used to fold samples into the matrix.
41    ///
42    /// [`InspectedTimeMatrix`]: crate::experimental::serve::InspectedTimeMatrix
43    /// [`TimeMatrix`]: crate::experimental::series::TimeMatrix
44    fn inspect_time_matrix_with_metadata<F, P>(
45        &self,
46        name: impl Into<String>,
47        matrix: TimeMatrix<F, P>,
48        metadata: impl Into<Metadata<F>>,
49    ) -> InspectedTimeMatrix<F::Sample>
50    where
51        TimeMatrix<F, P>: 'static + TimeMatrixFold<F::Sample> + Send,
52        Metadata<F>: 'static + Send + Sync,
53        F: SerialStatistic<P>,
54        F::Sample: Send,
55        P: InterpolationKind;
56}
57
58type SharedTimeMatrix = Arc<Mutex<dyn TimeMatrixTick>>;
59
60pub struct TimeMatrixClient {
61    node: InspectNode,
62}
63
64impl TimeMatrixClient {
65    /// Create a new TimeMatrixClient that holds a given Inspect Node
66    ///
67    /// Note: If TimeMatrixClient is constructed with a weak reference to Inspect
68    /// Node, then the original Node needs to be preserved for time series
69    /// data shows up in Inspect. If TimeMatrixClient is constructed with the original
70    /// Inspect node, then the TimeMatrixClient itself needs to be preserved.
71    pub fn new(node: InspectNode) -> Self {
72        Self { node }
73    }
74
75    fn inspect_and_record_with<F, P, R>(
76        &self,
77        name: impl Into<String>,
78        matrix: TimeMatrix<F, P>,
79        record: R,
80    ) -> InspectedTimeMatrix<F::Sample>
81    where
82        TimeMatrix<F, P>: 'static + TimeMatrixFold<F::Sample> + Send,
83        Metadata<F>: 'static + Send + Sync,
84        F: SerialStatistic<P>,
85        F::Sample: Send,
86        P: InterpolationKind,
87        R: 'static + Clone + Fn(&InspectNode) + Send + Sync,
88    {
89        let name = name.into();
90        let matrix = Arc::new(Mutex::new(matrix));
91        self::record_lazy_time_matrix_with(&self.node, &name, matrix.clone(), record);
92        InspectedTimeMatrix::new(name, matrix)
93    }
94}
95
96impl Clone for TimeMatrixClient {
97    fn clone(&self) -> Self {
98        TimeMatrixClient { node: self.node.clone_weak() }
99    }
100}
101
102impl InspectSender for TimeMatrixClient {
103    fn inspect_time_matrix<F, P>(
104        &self,
105        name: impl Into<String>,
106        matrix: TimeMatrix<F, P>,
107    ) -> InspectedTimeMatrix<F::Sample>
108    where
109        TimeMatrix<F, P>: 'static + TimeMatrixFold<F::Sample> + Send,
110        Metadata<F>: 'static + Send + Sync,
111        F: SerialStatistic<P>,
112        F::Sample: Send,
113        P: InterpolationKind,
114    {
115        self.inspect_and_record_with(name, matrix, |_node| {})
116    }
117
118    fn inspect_time_matrix_with_metadata<F, P>(
119        &self,
120        name: impl Into<String>,
121        matrix: TimeMatrix<F, P>,
122        metadata: impl Into<Metadata<F>>,
123    ) -> InspectedTimeMatrix<F::Sample>
124    where
125        TimeMatrix<F, P>: 'static + TimeMatrixFold<F::Sample> + Send,
126        Metadata<F>: 'static + Send + Sync,
127        F: SerialStatistic<P>,
128        F::Sample: Send,
129        P: InterpolationKind,
130    {
131        let metadata = Arc::new(metadata.into());
132        self.inspect_and_record_with(name, matrix, move |node| {
133            use crate::experimental::series::metadata::Metadata;
134            metadata.record_with_parent(node);
135        })
136    }
137}
138
139#[derive(Derivative)]
140#[derivative(Debug, Clone)]
141pub struct InspectedTimeMatrix<T> {
142    name: String,
143    #[derivative(Debug = "ignore")]
144    matrix: Arc<Mutex<dyn TimeMatrixFold<T> + Send>>,
145}
146
147impl<T> InspectedTimeMatrix<T> {
148    pub(crate) fn new(
149        name: impl Into<String>,
150        matrix: Arc<Mutex<dyn TimeMatrixFold<T> + Send>>,
151    ) -> Self {
152        Self { name: name.into(), matrix }
153    }
154
155    /// Folding a sample to the time series.
156    ///
157    /// Note: the timestamp for the time series is generated only after acquiring a lock.
158    /// If the lock is already being held by another thread/process that reads the Inspect
159    /// data, the timestamp generated may fall on the time interval after.
160    ///
161    /// TODO(https://fxbug.dev/457421826) - Fix issue with delayed timestamp
162    pub fn fold(&self, sample: T) -> Result<(), FoldError> {
163        self.matrix.lock().fold(Timed::now(sample))
164    }
165
166    pub fn fold_or_log_error(&self, sample: T) {
167        if let Err(error) = self.matrix.lock().fold(Timed::now(sample)) {
168            warn!("failed to fold sample into time matrix \"{}\": {:?}", self.name, error);
169        }
170    }
171
172    /// Fold a sample with the given timestamp.
173    ///
174    /// This API was only added to avoid breaking changes to the `serve` API, which is
175    /// currently unused.
176    ///
177    /// TODO(https://fxbug.dev/457423931) - Remove this API
178    pub(crate) fn fold_at(&self, sample: Timed<T>) -> Result<(), FoldError> {
179        self.matrix.lock().fold(sample)
180    }
181}
182
183/// Records a lazy child node in the given node that records buffers and metadata for the given
184/// time matrix.
185///
186/// The function `f` is passed a node to record arbitrary data after the data semantic and buffers
187/// of the time matrix have been recorded using that same node.
188fn record_lazy_time_matrix_with<F>(
189    node: &InspectNode,
190    name: impl Into<String>,
191    matrix: Arc<Mutex<dyn TimeMatrixTick + Send>>,
192    f: F,
193) where
194    F: 'static + Clone + Fn(&InspectNode) + Send + Sync,
195{
196    let name = name.into();
197    node.record_lazy_child(name, move || {
198        let matrix = matrix.clone();
199        let f = f.clone();
200        async move {
201            let inspector = Inspector::default();
202            let result = matrix.lock().tick_and_get_buffers(Timestamp::now());
203            inspector.root().atomic_update(|node| {
204                if result.is_ok() {
205                    f(node);
206                }
207                SerializedBuffer::write_to_inspect_or_error(result, node);
208            });
209            Ok(inspector)
210        }
211        .boxed()
212    });
213}
214
215#[cfg(test)]
216mod tests {
217    use super::*;
218    use diagnostics_assertions::{AnyBytesProperty, assert_data_tree};
219    use fuchsia_async as fasync;
220
221    use crate::experimental::series::SamplingProfile;
222    use crate::experimental::series::interpolation::{ConstantSample, LastSample};
223    use crate::experimental::series::metadata::BitSetMap;
224    use crate::experimental::series::statistic::{Max, Union};
225
226    #[fuchsia::test]
227    fn inspected_time_matrix_folded_sample_appears_in_inspect() {
228        let mut exec = fasync::TestExecutor::new_with_fake_time();
229        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
230
231        let inspector = Inspector::default();
232        let client = TimeMatrixClient::new(inspector.root().create_child("serve_test_node"));
233        let time_matrix = TimeMatrix::<Max<u64>, ConstantSample>::new(
234            SamplingProfile::highly_granular(),
235            ConstantSample::default(),
236        );
237        let inspected_matrix = client.inspect_time_matrix("time_series_1", time_matrix);
238
239        inspected_matrix.fold(15).unwrap();
240        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
241
242        assert_data_tree!(@executor exec, inspector, root: contains {
243            serve_test_node: {
244                time_series_1: {
245                    "type": "gauge",
246                    "data": vec![
247                        1u8, // version number
248                        3, 0, 0, 0, // created timestamp
249                        10, 0, 0, 0, // last timestamp
250                        1, 0, // type: simple8b RLE; subtype: unsigned
251                        16, 0, // series 1: length in bytes
252                        10, 0, // series 1 granularity: 10s
253                        1, 0, // number of selector elements and value blocks
254                        0, 0,    // head selector index
255                        1,    // number of values in last block
256                        0x0f, // RLE selector
257                        15, 0, 0, 0, 0, 0, 1, 0, // value 15 appears 1 time
258                        7, 0, // series 2: length in bytes
259                        60, 0, // series 2 granularity: 60s
260                        0, 0, // number of selector elements and value blocks
261                        0, 0, // head selector index
262                        0, // number of values in last block
263                    ]
264                }
265            }
266        });
267    }
268
269    #[fuchsia::test]
270    async fn inspect_time_matrix_then_inspect_data_tree_contains_buffers() {
271        let inspector = Inspector::default();
272        let client = TimeMatrixClient::new(inspector.root().create_child("serve_test_node"));
273        let _matrix = client
274            .inspect_time_matrix("connectivity", TimeMatrix::<Union<u64>, LastSample>::default());
275
276        assert_data_tree!(inspector, root: contains {
277            serve_test_node: {
278                connectivity: {
279                    "type": "bitset",
280                    "data": AnyBytesProperty,
281                }
282            }
283        });
284    }
285
286    #[fuchsia::test]
287    async fn inspect_time_matrix_with_metadata_then_inspect_data_tree_contains_metadata() {
288        let inspector = Inspector::default();
289        let client = TimeMatrixClient::new(inspector.root().create_child("serve_test_node"));
290        let _matrix = client.inspect_time_matrix_with_metadata(
291            "engine",
292            TimeMatrix::<Union<u64>, LastSample>::default(),
293            BitSetMap::from_ordered(["check", "oil", "battery", "coolant"]),
294        );
295
296        assert_data_tree!(inspector, root: contains {
297            serve_test_node: {
298                engine: {
299                    "type": "bitset",
300                    "data": AnyBytesProperty,
301                    metadata: {
302                        index: {
303                            "0": "check",
304                            "1": "oil",
305                            "2": "battery",
306                            "3": "coolant",
307                        }
308                    }
309                }
310            }
311        });
312    }
313}