windowed_stats/experimental/
inspect.rs1use derivative::Derivative;
6use fuchsia_inspect::{Inspector, Node as InspectNode};
7use fuchsia_sync::Mutex;
8use futures::FutureExt as _;
9use log::warn;
10use std::sync::Arc;
11
12use crate::experimental::clock::{Timed, Timestamp};
13use crate::experimental::series::interpolation::InterpolationKind;
14use crate::experimental::series::statistic::{FoldError, Metadata, SerialStatistic};
15use crate::experimental::series::{SerializedBuffer, TimeMatrix, TimeMatrixFold, TimeMatrixTick};
16
17pub trait InspectSender {
18 fn inspect_time_matrix<F, P>(
25 &self,
26 name: impl Into<String>,
27 matrix: TimeMatrix<F, P>,
28 ) -> InspectedTimeMatrix<F::Sample>
29 where
30 TimeMatrix<F, P>: 'static + TimeMatrixFold<F::Sample> + Send,
31 Metadata<F>: 'static + Send + Sync,
32 F: SerialStatistic<P>,
33 F::Sample: Send,
34 P: InterpolationKind;
35
36 fn inspect_time_matrix_with_metadata<F, P>(
45 &self,
46 name: impl Into<String>,
47 matrix: TimeMatrix<F, P>,
48 metadata: impl Into<Metadata<F>>,
49 ) -> InspectedTimeMatrix<F::Sample>
50 where
51 TimeMatrix<F, P>: 'static + TimeMatrixFold<F::Sample> + Send,
52 Metadata<F>: 'static + Send + Sync,
53 F: SerialStatistic<P>,
54 F::Sample: Send,
55 P: InterpolationKind;
56}
57
58type SharedTimeMatrix = Arc<Mutex<dyn TimeMatrixTick>>;
59
60pub struct TimeMatrixClient {
61 node: InspectNode,
62}
63
64impl TimeMatrixClient {
65 pub fn new(node: InspectNode) -> Self {
72 Self { node }
73 }
74
75 fn inspect_and_record_with<F, P, R>(
76 &self,
77 name: impl Into<String>,
78 matrix: TimeMatrix<F, P>,
79 record: R,
80 ) -> InspectedTimeMatrix<F::Sample>
81 where
82 TimeMatrix<F, P>: 'static + TimeMatrixFold<F::Sample> + Send,
83 Metadata<F>: 'static + Send + Sync,
84 F: SerialStatistic<P>,
85 F::Sample: Send,
86 P: InterpolationKind,
87 R: 'static + Clone + Fn(&InspectNode) + Send + Sync,
88 {
89 let name = name.into();
90 let matrix = Arc::new(Mutex::new(matrix));
91 self::record_lazy_time_matrix_with(&self.node, &name, matrix.clone(), record);
92 InspectedTimeMatrix::new(name, matrix)
93 }
94}
95
96impl Clone for TimeMatrixClient {
97 fn clone(&self) -> Self {
98 TimeMatrixClient { node: self.node.clone_weak() }
99 }
100}
101
102impl InspectSender for TimeMatrixClient {
103 fn inspect_time_matrix<F, P>(
104 &self,
105 name: impl Into<String>,
106 matrix: TimeMatrix<F, P>,
107 ) -> InspectedTimeMatrix<F::Sample>
108 where
109 TimeMatrix<F, P>: 'static + TimeMatrixFold<F::Sample> + Send,
110 Metadata<F>: 'static + Send + Sync,
111 F: SerialStatistic<P>,
112 F::Sample: Send,
113 P: InterpolationKind,
114 {
115 self.inspect_and_record_with(name, matrix, |_node| {})
116 }
117
118 fn inspect_time_matrix_with_metadata<F, P>(
119 &self,
120 name: impl Into<String>,
121 matrix: TimeMatrix<F, P>,
122 metadata: impl Into<Metadata<F>>,
123 ) -> InspectedTimeMatrix<F::Sample>
124 where
125 TimeMatrix<F, P>: 'static + TimeMatrixFold<F::Sample> + Send,
126 Metadata<F>: 'static + Send + Sync,
127 F: SerialStatistic<P>,
128 F::Sample: Send,
129 P: InterpolationKind,
130 {
131 let metadata = Arc::new(metadata.into());
132 self.inspect_and_record_with(name, matrix, move |node| {
133 use crate::experimental::series::metadata::Metadata;
134 metadata.record_with_parent(node);
135 })
136 }
137}
138
139#[derive(Derivative)]
140#[derivative(Debug, Clone)]
141pub struct InspectedTimeMatrix<T> {
142 name: String,
143 #[derivative(Debug = "ignore")]
144 matrix: Arc<Mutex<dyn TimeMatrixFold<T> + Send>>,
145}
146
147impl<T> InspectedTimeMatrix<T> {
148 pub(crate) fn new(
149 name: impl Into<String>,
150 matrix: Arc<Mutex<dyn TimeMatrixFold<T> + Send>>,
151 ) -> Self {
152 Self { name: name.into(), matrix }
153 }
154
155 pub fn fold(&self, sample: T) -> Result<(), FoldError> {
163 self.matrix.lock().fold(Timed::now(sample))
164 }
165
166 pub fn fold_or_log_error(&self, sample: T) {
167 if let Err(error) = self.matrix.lock().fold(Timed::now(sample)) {
168 warn!("failed to fold sample into time matrix \"{}\": {:?}", self.name, error);
169 }
170 }
171
172 pub(crate) fn fold_at(&self, sample: Timed<T>) -> Result<(), FoldError> {
179 self.matrix.lock().fold(sample)
180 }
181}
182
183fn record_lazy_time_matrix_with<F>(
189 node: &InspectNode,
190 name: impl Into<String>,
191 matrix: Arc<Mutex<dyn TimeMatrixTick + Send>>,
192 f: F,
193) where
194 F: 'static + Clone + Fn(&InspectNode) + Send + Sync,
195{
196 let name = name.into();
197 node.record_lazy_child(name, move || {
198 let matrix = matrix.clone();
199 let f = f.clone();
200 async move {
201 let inspector = Inspector::default();
202 let result = matrix.lock().tick_and_get_buffers(Timestamp::now());
203 inspector.root().atomic_update(|node| {
204 if result.is_ok() {
205 f(node);
206 }
207 SerializedBuffer::write_to_inspect_or_error(result, node);
208 });
209 Ok(inspector)
210 }
211 .boxed()
212 });
213}
214
215#[cfg(test)]
216mod tests {
217 use super::*;
218 use diagnostics_assertions::{AnyBytesProperty, assert_data_tree};
219 use fuchsia_async as fasync;
220
221 use crate::experimental::series::SamplingProfile;
222 use crate::experimental::series::interpolation::{ConstantSample, LastSample};
223 use crate::experimental::series::metadata::BitSetMap;
224 use crate::experimental::series::statistic::{Max, Union};
225
226 #[fuchsia::test]
227 fn inspected_time_matrix_folded_sample_appears_in_inspect() {
228 let mut exec = fasync::TestExecutor::new_with_fake_time();
229 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
230
231 let inspector = Inspector::default();
232 let client = TimeMatrixClient::new(inspector.root().create_child("serve_test_node"));
233 let time_matrix = TimeMatrix::<Max<u64>, ConstantSample>::new(
234 SamplingProfile::highly_granular(),
235 ConstantSample::default(),
236 );
237 let inspected_matrix = client.inspect_time_matrix("time_series_1", time_matrix);
238
239 inspected_matrix.fold(15).unwrap();
240 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
241
242 assert_data_tree!(@executor exec, inspector, root: contains {
243 serve_test_node: {
244 time_series_1: {
245 "type": "gauge",
246 "data": vec![
247 1u8, 3, 0, 0, 0, 10, 0, 0, 0, 1, 0, 16, 0, 10, 0, 1, 0, 0, 0, 1, 0x0f, 15, 0, 0, 0, 0, 0, 1, 0, 7, 0, 60, 0, 0, 0, 0, 0, 0, ]
264 }
265 }
266 });
267 }
268
269 #[fuchsia::test]
270 async fn inspect_time_matrix_then_inspect_data_tree_contains_buffers() {
271 let inspector = Inspector::default();
272 let client = TimeMatrixClient::new(inspector.root().create_child("serve_test_node"));
273 let _matrix = client
274 .inspect_time_matrix("connectivity", TimeMatrix::<Union<u64>, LastSample>::default());
275
276 assert_data_tree!(inspector, root: contains {
277 serve_test_node: {
278 connectivity: {
279 "type": "bitset",
280 "data": AnyBytesProperty,
281 }
282 }
283 });
284 }
285
286 #[fuchsia::test]
287 async fn inspect_time_matrix_with_metadata_then_inspect_data_tree_contains_metadata() {
288 let inspector = Inspector::default();
289 let client = TimeMatrixClient::new(inspector.root().create_child("serve_test_node"));
290 let _matrix = client.inspect_time_matrix_with_metadata(
291 "engine",
292 TimeMatrix::<Union<u64>, LastSample>::default(),
293 BitSetMap::from_ordered(["check", "oil", "battery", "coolant"]),
294 );
295
296 assert_data_tree!(inspector, root: contains {
297 serve_test_node: {
298 engine: {
299 "type": "bitset",
300 "data": AnyBytesProperty,
301 metadata: {
302 index: {
303 "0": "check",
304 "1": "oil",
305 "2": "battery",
306 "3": "coolant",
307 }
308 }
309 }
310 }
311 });
312 }
313}