1use derivative::Derivative;
6use fuchsia_inspect::{Inspector, Node as InspectNode};
7use fuchsia_sync::Mutex;
8use futures::FutureExt as _;
9use log::warn;
10use std::sync::Arc;
11
12use crate::experimental::clock::{Timed, Timestamp};
13use crate::experimental::series::interpolation::InterpolationKind;
14use crate::experimental::series::statistic::{FoldError, Metadata, SerialStatistic};
15use crate::experimental::series::{SerializedBuffer, TimeMatrix, TimeMatrixFold, TimeMatrixTick};
16
17pub trait InspectSender {
18 fn inspect_time_matrix<F, P>(
25 &self,
26 name: impl Into<String>,
27 matrix: TimeMatrix<F, P>,
28 ) -> InspectedTimeMatrix<F::Sample>
29 where
30 TimeMatrix<F, P>: 'static + TimeMatrixFold<F::Sample> + Send,
31 Metadata<F>: 'static + Send + Sync,
32 F: SerialStatistic<P>,
33 F::Sample: Send,
34 P: InterpolationKind;
35
36 fn inspect_time_matrix_with_metadata<F, P>(
45 &self,
46 name: impl Into<String>,
47 matrix: TimeMatrix<F, P>,
48 metadata: impl Into<Metadata<F>>,
49 ) -> InspectedTimeMatrix<F::Sample>
50 where
51 TimeMatrix<F, P>: 'static + TimeMatrixFold<F::Sample> + Send,
52 Metadata<F>: 'static + Send + Sync,
53 F: SerialStatistic<P>,
54 F::Sample: Send,
55 P: InterpolationKind;
56}
57
58type SharedTimeMatrix = Arc<Mutex<dyn TimeMatrixTick>>;
59
60pub struct TimeMatrixClient {
61 node: InspectNode,
62}
63
64impl TimeMatrixClient {
65 pub fn new(node: InspectNode) -> Self {
72 Self { node }
73 }
74
75 fn inspect_and_record_with<F, P, R>(
76 &self,
77 name: impl Into<String>,
78 matrix: TimeMatrix<F, P>,
79 record: R,
80 ) -> InspectedTimeMatrix<F::Sample>
81 where
82 TimeMatrix<F, P>: 'static + TimeMatrixFold<F::Sample> + Send,
83 Metadata<F>: 'static + Send + Sync,
84 F: SerialStatistic<P>,
85 F::Sample: Send,
86 P: InterpolationKind,
87 R: 'static + Clone + Fn(&InspectNode) + Send + Sync,
88 {
89 let name = name.into();
90 let matrix = Arc::new(Mutex::new(matrix));
91 self::record_lazy_time_matrix_with(&self.node, &name, matrix.clone(), record);
92 InspectedTimeMatrix::new(name, matrix)
93 }
94}
95
96impl Clone for TimeMatrixClient {
97 fn clone(&self) -> Self {
98 TimeMatrixClient { node: self.node.clone_weak() }
99 }
100}
101
102impl InspectSender for TimeMatrixClient {
103 fn inspect_time_matrix<F, P>(
104 &self,
105 name: impl Into<String>,
106 matrix: TimeMatrix<F, P>,
107 ) -> InspectedTimeMatrix<F::Sample>
108 where
109 TimeMatrix<F, P>: 'static + TimeMatrixFold<F::Sample> + Send,
110 Metadata<F>: 'static + Send + Sync,
111 F: SerialStatistic<P>,
112 F::Sample: Send,
113 P: InterpolationKind,
114 {
115 self.inspect_and_record_with(name, matrix, |_node| {})
116 }
117
118 fn inspect_time_matrix_with_metadata<F, P>(
119 &self,
120 name: impl Into<String>,
121 matrix: TimeMatrix<F, P>,
122 metadata: impl Into<Metadata<F>>,
123 ) -> InspectedTimeMatrix<F::Sample>
124 where
125 TimeMatrix<F, P>: 'static + TimeMatrixFold<F::Sample> + Send,
126 Metadata<F>: 'static + Send + Sync,
127 F: SerialStatistic<P>,
128 F::Sample: Send,
129 P: InterpolationKind,
130 {
131 let metadata = Arc::new(metadata.into());
132 self.inspect_and_record_with(name, matrix, move |node| {
133 use crate::experimental::series::metadata::Metadata;
134
135 node.record_child("metadata", |node| {
136 metadata.record(node);
137 })
138 })
139 }
140}
141
142#[derive(Derivative)]
143#[derivative(Debug, Clone)]
144pub struct InspectedTimeMatrix<T> {
145 name: String,
146 #[derivative(Debug = "ignore")]
147 matrix: Arc<Mutex<dyn TimeMatrixFold<T> + Send>>,
148}
149
150impl<T> InspectedTimeMatrix<T> {
151 pub(crate) fn new(
152 name: impl Into<String>,
153 matrix: Arc<Mutex<dyn TimeMatrixFold<T> + Send>>,
154 ) -> Self {
155 Self { name: name.into(), matrix }
156 }
157
158 pub fn fold(&self, sample: T) -> Result<(), FoldError> {
166 self.matrix.lock().fold(Timed::now(sample))
167 }
168
169 pub fn fold_or_log_error(&self, sample: T) {
170 if let Err(error) = self.matrix.lock().fold(Timed::now(sample)) {
171 warn!("failed to fold sample into time matrix \"{}\": {:?}", self.name, error);
172 }
173 }
174
175 pub(crate) fn fold_at(&self, sample: Timed<T>) -> Result<(), FoldError> {
182 self.matrix.lock().fold(sample)
183 }
184}
185
186fn record_lazy_time_matrix_with<F>(
192 node: &InspectNode,
193 name: impl Into<String>,
194 matrix: Arc<Mutex<dyn TimeMatrixTick + Send>>,
195 f: F,
196) where
197 F: 'static + Clone + Fn(&InspectNode) + Send + Sync,
198{
199 let name = name.into();
200 node.record_lazy_child(name, move || {
201 let matrix = matrix.clone();
202 let f = f.clone();
203 async move {
204 let inspector = Inspector::default();
205 let result = matrix.lock().tick_and_get_buffers(Timestamp::now());
206 inspector.root().atomic_update(|node| {
207 if result.is_ok() {
208 f(node);
209 }
210 SerializedBuffer::write_to_inspect_or_error(result, node);
211 });
212 Ok(inspector)
213 }
214 .boxed()
215 });
216}
217
218#[cfg(test)]
219mod tests {
220 use super::*;
221 use diagnostics_assertions::{AnyBytesProperty, assert_data_tree};
222 use fuchsia_async as fasync;
223
224 use crate::experimental::series::SamplingProfile;
225 use crate::experimental::series::interpolation::{ConstantSample, LastSample};
226 use crate::experimental::series::metadata::BitSetMap;
227 use crate::experimental::series::statistic::{Max, Union};
228
229 #[fuchsia::test]
230 fn inspected_time_matrix_folded_sample_appears_in_inspect() {
231 let mut exec = fasync::TestExecutor::new_with_fake_time();
232 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
233
234 let inspector = Inspector::default();
235 let client = TimeMatrixClient::new(inspector.root().create_child("serve_test_node"));
236 let time_matrix = TimeMatrix::<Max<u64>, ConstantSample>::new(
237 SamplingProfile::highly_granular(),
238 ConstantSample::default(),
239 );
240 let inspected_matrix = client.inspect_time_matrix("time_series_1", time_matrix);
241
242 inspected_matrix.fold(15).unwrap();
243 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
244
245 assert_data_tree!(@executor exec, inspector, root: contains {
246 serve_test_node: {
247 time_series_1: {
248 "type": "gauge",
249 "data": vec![
250 1u8, 3, 0, 0, 0, 10, 0, 0, 0, 1, 0, 16, 0, 10, 0, 1, 0, 0, 0, 1, 0x0f, 15, 0, 0, 0, 0, 0, 1, 0, 7, 0, 60, 0, 0, 0, 0, 0, 0, ]
267 }
268 }
269 });
270 }
271
272 #[fuchsia::test]
273 async fn inspect_time_matrix_then_inspect_data_tree_contains_buffers() {
274 let inspector = Inspector::default();
275 let client = TimeMatrixClient::new(inspector.root().create_child("serve_test_node"));
276 let _matrix = client
277 .inspect_time_matrix("connectivity", TimeMatrix::<Union<u64>, LastSample>::default());
278
279 assert_data_tree!(inspector, root: contains {
280 serve_test_node: {
281 connectivity: {
282 "type": "bitset",
283 "data": AnyBytesProperty,
284 }
285 }
286 });
287 }
288
289 #[fuchsia::test]
290 async fn inspect_time_matrix_with_metadata_then_inspect_data_tree_contains_metadata() {
291 let inspector = Inspector::default();
292 let client = TimeMatrixClient::new(inspector.root().create_child("serve_test_node"));
293 let _matrix = client.inspect_time_matrix_with_metadata(
294 "engine",
295 TimeMatrix::<Union<u64>, LastSample>::default(),
296 BitSetMap::from_ordered(["check", "oil", "battery", "coolant"]),
297 );
298
299 assert_data_tree!(inspector, root: contains {
300 serve_test_node: {
301 engine: {
302 "type": "bitset",
303 "data": AnyBytesProperty,
304 metadata: {
305 index: {
306 "0": "check",
307 "1": "oil",
308 "2": "battery",
309 "3": "coolant",
310 }
311 }
312 }
313 }
314 });
315 }
316}