Skip to main content

windowed_stats/experimental/
inspect.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use derivative::Derivative;
6use fuchsia_inspect::{Inspector, Node as InspectNode};
7use fuchsia_sync::Mutex;
8use futures::FutureExt as _;
9use log::warn;
10use std::sync::Arc;
11
12use crate::experimental::clock::{Timed, Timestamp};
13use crate::experimental::series::interpolation::InterpolationKind;
14use crate::experimental::series::statistic::{FoldError, Metadata, SerialStatistic};
15use crate::experimental::series::{SerializedBuffer, TimeMatrix, TimeMatrixFold, TimeMatrixTick};
16
17pub trait InspectSender {
18    /// Sends a [`TimeMatrix`] to the client's inspection server.
19    ///
20    /// See [`inspect_time_matrix_with_metadata`].
21    ///
22    /// [`inspect_time_matrix_with_metadata`]: crate::experimental::serve::TimeMatrixClient::inspect_time_matrix_with_metadata
23    /// [`TimeMatrix`]: crate::experimental::series::TimeMatrix
24    fn inspect_time_matrix<F, P>(
25        &self,
26        name: impl Into<String>,
27        matrix: TimeMatrix<F, P>,
28    ) -> InspectedTimeMatrix<F::Sample>
29    where
30        TimeMatrix<F, P>: 'static + TimeMatrixFold<F::Sample> + Send,
31        Metadata<F>: 'static + Send + Sync,
32        F: SerialStatistic<P>,
33        F::Sample: Send,
34        P: InterpolationKind;
35
36    /// Sends a [`TimeMatrix`] to the client's inspection server.
37    ///
38    /// This function lazily records the given [`TimeMatrix`] to Inspect. The server end
39    /// periodically interpolates the matrix and records data as needed. The returned
40    /// [handle][`InspectedTimeMatrix`] can be used to fold samples into the matrix.
41    ///
42    /// [`InspectedTimeMatrix`]: crate::experimental::serve::InspectedTimeMatrix
43    /// [`TimeMatrix`]: crate::experimental::series::TimeMatrix
44    fn inspect_time_matrix_with_metadata<F, P>(
45        &self,
46        name: impl Into<String>,
47        matrix: TimeMatrix<F, P>,
48        metadata: impl Into<Metadata<F>>,
49    ) -> InspectedTimeMatrix<F::Sample>
50    where
51        TimeMatrix<F, P>: 'static + TimeMatrixFold<F::Sample> + Send,
52        Metadata<F>: 'static + Send + Sync,
53        F: SerialStatistic<P>,
54        F::Sample: Send,
55        P: InterpolationKind;
56
57    /// Clones the client and scopes it to a child node with the given name.
58    fn clone_with_child(&self, name: &str) -> Self;
59}
60
61type SharedTimeMatrix = Arc<Mutex<dyn TimeMatrixTick>>;
62
63pub struct TimeMatrixClient {
64    node: InspectNode,
65}
66
67impl TimeMatrixClient {
68    /// Create a new TimeMatrixClient that holds a given Inspect Node
69    ///
70    /// Note: If TimeMatrixClient is constructed with a weak reference to Inspect
71    /// Node, then the original Node needs to be preserved for time series
72    /// data shows up in Inspect. If TimeMatrixClient is constructed with the original
73    /// Inspect node, then the TimeMatrixClient itself needs to be preserved.
74    pub fn new(node: InspectNode) -> Self {
75        Self { node }
76    }
77
78    fn inspect_and_record_with<F, P, R>(
79        &self,
80        name: impl Into<String>,
81        matrix: TimeMatrix<F, P>,
82        record: R,
83    ) -> InspectedTimeMatrix<F::Sample>
84    where
85        TimeMatrix<F, P>: 'static + TimeMatrixFold<F::Sample> + Send,
86        Metadata<F>: 'static + Send + Sync,
87        F: SerialStatistic<P>,
88        F::Sample: Send,
89        P: InterpolationKind,
90        R: 'static + Clone + Fn(&InspectNode) + Send + Sync,
91    {
92        let name = name.into();
93        let matrix = Arc::new(Mutex::new(matrix));
94        self::record_lazy_time_matrix_with(&self.node, &name, matrix.clone(), record);
95        InspectedTimeMatrix::new(name, matrix)
96    }
97}
98
99impl Clone for TimeMatrixClient {
100    fn clone(&self) -> Self {
101        TimeMatrixClient { node: self.node.clone_weak() }
102    }
103}
104
105impl InspectSender for TimeMatrixClient {
106    fn inspect_time_matrix<F, P>(
107        &self,
108        name: impl Into<String>,
109        matrix: TimeMatrix<F, P>,
110    ) -> InspectedTimeMatrix<F::Sample>
111    where
112        TimeMatrix<F, P>: 'static + TimeMatrixFold<F::Sample> + Send,
113        Metadata<F>: 'static + Send + Sync,
114        F: SerialStatistic<P>,
115        F::Sample: Send,
116        P: InterpolationKind,
117    {
118        self.inspect_and_record_with(name, matrix, |_node| {})
119    }
120
121    fn inspect_time_matrix_with_metadata<F, P>(
122        &self,
123        name: impl Into<String>,
124        matrix: TimeMatrix<F, P>,
125        metadata: impl Into<Metadata<F>>,
126    ) -> InspectedTimeMatrix<F::Sample>
127    where
128        TimeMatrix<F, P>: 'static + TimeMatrixFold<F::Sample> + Send,
129        Metadata<F>: 'static + Send + Sync,
130        F: SerialStatistic<P>,
131        F::Sample: Send,
132        P: InterpolationKind,
133    {
134        let metadata = Arc::new(metadata.into());
135        self.inspect_and_record_with(name, matrix, move |node| {
136            use crate::experimental::series::metadata::Metadata;
137            metadata.record_with_parent(node);
138        })
139    }
140
141    fn clone_with_child(&self, name: &str) -> Self {
142        Self { node: self.node.create_child(name) }
143    }
144}
145
146#[derive(Derivative)]
147#[derivative(Debug, Clone)]
148pub struct InspectedTimeMatrix<T> {
149    name: String,
150    #[derivative(Debug = "ignore")]
151    matrix: Arc<Mutex<dyn TimeMatrixFold<T> + Send>>,
152}
153
154impl<T> InspectedTimeMatrix<T> {
155    pub(crate) fn new(
156        name: impl Into<String>,
157        matrix: Arc<Mutex<dyn TimeMatrixFold<T> + Send>>,
158    ) -> Self {
159        Self { name: name.into(), matrix }
160    }
161
162    /// Folding a sample to the time series.
163    ///
164    /// Note: the timestamp for the time series is generated only after acquiring a lock.
165    /// If the lock is already being held by another thread/process that reads the Inspect
166    /// data, the timestamp generated may fall on the time interval after.
167    ///
168    /// TODO(https://fxbug.dev/457421826) - Fix issue with delayed timestamp
169    pub fn fold(&self, sample: T) -> Result<(), FoldError> {
170        self.matrix.lock().fold(Timed::now(sample))
171    }
172
173    pub fn fold_or_log_error(&self, sample: T) {
174        if let Err(error) = self.matrix.lock().fold(Timed::now(sample)) {
175            warn!("failed to fold sample into time matrix \"{}\": {:?}", self.name, error);
176        }
177    }
178
179    /// Fold a sample with the given timestamp.
180    ///
181    /// This API was only added to avoid breaking changes to the `serve` API, which is
182    /// currently unused.
183    ///
184    /// TODO(https://fxbug.dev/457423931) - Remove this API
185    pub(crate) fn fold_at(&self, sample: Timed<T>) -> Result<(), FoldError> {
186        self.matrix.lock().fold(sample)
187    }
188}
189
190/// Records a lazy child node in the given node that records buffers and metadata for the given
191/// time matrix.
192///
193/// The function `f` is passed a node to record arbitrary data after the data semantic and buffers
194/// of the time matrix have been recorded using that same node.
195fn record_lazy_time_matrix_with<F>(
196    node: &InspectNode,
197    name: impl Into<String>,
198    matrix: Arc<Mutex<dyn TimeMatrixTick + Send>>,
199    f: F,
200) where
201    F: 'static + Clone + Fn(&InspectNode) + Send + Sync,
202{
203    let name = name.into();
204    node.record_lazy_child(name, move || {
205        let matrix = matrix.clone();
206        let f = f.clone();
207        async move {
208            let inspector = Inspector::default();
209            let result = matrix.lock().tick_and_get_buffers(Timestamp::now());
210            inspector.root().atomic_update(|node| {
211                if result.is_ok() {
212                    f(node);
213                }
214                SerializedBuffer::write_to_inspect_or_error(result, node);
215            });
216            Ok(inspector)
217        }
218        .boxed()
219    });
220}
221
222#[cfg(test)]
223mod tests {
224    use super::*;
225    use diagnostics_assertions::{AnyBytesProperty, assert_data_tree};
226    use fuchsia_async as fasync;
227
228    use crate::experimental::series::SamplingProfile;
229    use crate::experimental::series::interpolation::{ConstantSample, LastSample};
230    use crate::experimental::series::metadata::BitSetMap;
231    use crate::experimental::series::statistic::{Max, Union};
232
233    #[fuchsia::test]
234    fn inspected_time_matrix_folded_sample_appears_in_inspect() {
235        let mut exec = fasync::TestExecutor::new_with_fake_time();
236        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
237
238        let inspector = Inspector::default();
239        let client = TimeMatrixClient::new(inspector.root().create_child("serve_test_node"));
240        let time_matrix = TimeMatrix::<Max<u64>, ConstantSample>::new(
241            SamplingProfile::highly_granular(),
242            ConstantSample::default(),
243        );
244        let inspected_matrix = client.inspect_time_matrix("time_series_1", time_matrix);
245
246        inspected_matrix.fold(15).unwrap();
247        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
248
249        assert_data_tree!(@executor exec, inspector, root: contains {
250            serve_test_node: {
251                time_series_1: {
252                    "type": "gauge",
253                    "data": vec![
254                        1u8, // version number
255                        3, 0, 0, 0, // created timestamp
256                        10, 0, 0, 0, // last timestamp
257                        1, 0, // type: simple8b RLE; subtype: unsigned
258                        16, 0, // series 1: length in bytes
259                        10, 0, // series 1 granularity: 10s
260                        1, 0, // number of selector elements and value blocks
261                        0, 0,    // head selector index
262                        1,    // number of values in last block
263                        0x0f, // RLE selector
264                        15, 0, 0, 0, 0, 0, 1, 0, // value 15 appears 1 time
265                        7, 0, // series 2: length in bytes
266                        60, 0, // series 2 granularity: 60s
267                        0, 0, // number of selector elements and value blocks
268                        0, 0, // head selector index
269                        0, // number of values in last block
270                    ]
271                }
272            }
273        });
274    }
275
276    #[fuchsia::test]
277    async fn inspect_time_matrix_then_inspect_data_tree_contains_buffers() {
278        let inspector = Inspector::default();
279        let client = TimeMatrixClient::new(inspector.root().create_child("serve_test_node"));
280        let _matrix = client
281            .inspect_time_matrix("connectivity", TimeMatrix::<Union<u64>, LastSample>::default());
282
283        assert_data_tree!(inspector, root: contains {
284            serve_test_node: {
285                connectivity: {
286                    "type": "bitset",
287                    "data": AnyBytesProperty,
288                }
289            }
290        });
291    }
292
293    #[fuchsia::test]
294    async fn inspect_time_matrix_with_metadata_then_inspect_data_tree_contains_metadata() {
295        let inspector = Inspector::default();
296        let client = TimeMatrixClient::new(inspector.root().create_child("serve_test_node"));
297        let _matrix = client.inspect_time_matrix_with_metadata(
298            "engine",
299            TimeMatrix::<Union<u64>, LastSample>::default(),
300            BitSetMap::from_ordered(["check", "oil", "battery", "coolant"]),
301        );
302
303        assert_data_tree!(inspector, root: contains {
304            serve_test_node: {
305                engine: {
306                    "type": "bitset",
307                    "data": AnyBytesProperty,
308                    metadata: {
309                        index: {
310                            "0": "check",
311                            "1": "oil",
312                            "2": "battery",
313                            "3": "coolant",
314                        }
315                    }
316                }
317            }
318        });
319    }
320
321    #[fuchsia::test]
322    async fn inspected_time_matrix_clone_with_child_properly_scoped() {
323        let inspector = Inspector::default();
324        let client = TimeMatrixClient::new(inspector.root().create_child("serve_test_node"));
325        let child_client = client.clone_with_child("child");
326        let _matrix = child_client
327            .inspect_time_matrix("connectivity", TimeMatrix::<Union<u64>, LastSample>::default());
328
329        assert_data_tree!(inspector, root: contains {
330            serve_test_node: {
331                child: {
332                    connectivity: {
333                        "type": "bitset",
334                        "data": AnyBytesProperty,
335                    }
336                }
337            }
338        });
339    }
340}