windowed_stats/experimental/series/mod.rs
1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Round-robin multi-resolution time series.
6
7mod interval;
8
9pub(crate) mod buffer;
10
11pub mod interpolation;
12pub mod metadata;
13pub mod statistic;
14
15use derivative::Derivative;
16use std::fmt::{Debug, Display};
17use std::io;
18use std::marker::PhantomData;
19use std::num::NonZeroUsize;
20
21use crate::experimental::Vec1;
22use crate::experimental::clock::{ObservationTime, Tick, Timed, Timestamp, TimestampExt};
23use crate::experimental::series::buffer::{
24 BufferStrategy, DeltaSimple8bRle, DeltaZigzagSimple8bRle, RingBuffer, Simple8bRle,
25 Uncompressed, ZigzagSimple8bRle, encoding,
26};
27use crate::experimental::series::interpolation::{
28 ConstantSample, Interpolation, InterpolationKind, LastSample,
29};
30use crate::experimental::series::metadata::{BitSetIndex, Metadata};
31use crate::experimental::series::statistic::{
32 FoldError, PostAggregation, SerialStatistic, Statistic,
33};
34
35pub use crate::experimental::series::buffer::{Capacity, decode};
36pub use crate::experimental::series::interval::{SamplingInterval, SamplingProfile};
37
38/// A [`TimeMatrix`] type that can be advanced forward in time.
39///
40/// This trait provides tick operations for `[TimeMatrix`] types. Ticking a [`TimeMatrix`] causes
41/// sample interpolation within and aggregation propagation across [`SamplingInterval`]s.
42///
43/// Importantly, this trait is `dyn` compatible and type erased; it can be used to tick a
44/// [`TimeMatrix`] regardless of its input type parameters (sample type, interpolation type, etc.).
45///
46/// See also the [`TimeMatrixFold`] subtrait.
47pub trait TimeMatrixTick {
48 fn tick(&mut self, timestamp: Timestamp) -> Result<(), FoldError>;
49
50 fn tick_and_get_buffers(&mut self, timestamp: Timestamp)
51 -> Result<SerializedBuffer, FoldError>;
52}
53
54/// A [`TimeMatrix`] type that can sample data.
55///
56/// This trait provides fold operations for `TimeMatrix` types. Folding samples updates
57/// aggregations and advances a [`TimeMatrix`] forward in time.
58///
59/// See also the [`TimeMatrixTick`] supertrait. This trait supports both ticking and sampling, but
60/// is not completely type erased: the sample input type parameter `T` is needed.
61pub trait TimeMatrixFold<T>: TimeMatrixTick {
62 fn fold(&mut self, sample: Timed<T>) -> Result<(), FoldError>;
63}
64
65/// A type that describes the semantics of data folded by `Sampler`s.
66///
67/// Data semantics determine how statistics are interpreted and time series are aggregated and
68/// buffered.
69pub trait DataSemantic {
70 type Metadata: Metadata;
71
72 fn display() -> impl Display;
73}
74
75/// A continually increasing value.
76///
77/// Counters are analogous to an odometer in a vehicle.
78#[derive(Debug)]
79pub enum Counter {}
80
81impl BufferStrategy<u64, LastSample> for Counter {
82 type Buffer = DeltaSimple8bRle;
83}
84
85impl DataSemantic for Counter {
86 type Metadata = ();
87
88 fn display() -> impl Display {
89 "counter"
90 }
91}
92
93/// A fluctuating value.
94///
95/// Gauges are analogous to a speedometer in a vehicle.
96#[derive(Debug)]
97pub enum Gauge {}
98
99impl<P> BufferStrategy<f32, P> for Gauge
100where
101 P: InterpolationKind,
102{
103 type Buffer = Uncompressed<f32>;
104}
105
106impl BufferStrategy<i64, ConstantSample> for Gauge {
107 type Buffer = ZigzagSimple8bRle;
108}
109
110impl BufferStrategy<i64, LastSample> for Gauge {
111 type Buffer = DeltaZigzagSimple8bRle<i64>;
112}
113
114impl BufferStrategy<u64, ConstantSample> for Gauge {
115 type Buffer = Simple8bRle;
116}
117
118impl BufferStrategy<u64, LastSample> for Gauge {
119 type Buffer = DeltaZigzagSimple8bRle<u64>;
120}
121
122impl DataSemantic for Gauge {
123 type Metadata = ();
124
125 fn display() -> impl Display {
126 "gauge"
127 }
128}
129
130/// A semantic like `Gauge` that avoids [`DeltaZigzagSimple8bRle`] until we fix
131/// some other issues.
132///
133/// TODO(https://fxbug.dev/436253782): Delete this type when the viewer can
134/// decode `DeltaZigzagSimple8bRle`.
135///
136/// OR
137///
138/// TODO(https://fxbug.dev/457443158): Delete this type when
139/// `ConstantAggregation` is introduced and netstack's time series is changed to
140/// `TimeMatrix<Diff<u64>, ConstantAggregation>`.
141///
142/// Whichever happens first.
143pub enum GaugeForceSimple8bRle {}
144
145impl<T: Into<u64>> BufferStrategy<T, LastSample> for GaugeForceSimple8bRle {
146 type Buffer = Simple8bRle;
147}
148
149impl DataSemantic for GaugeForceSimple8bRle {
150 type Metadata = ();
151
152 fn display() -> impl Display {
153 "gauge"
154 }
155}
156
157// TODO(https://fxbug.dev/375255178): Spell "bit set" consistently. `BitSet` suggests `bit_set` and
158// "bit set", but there are notably some instances of "bitset",
159// which instead suggests `Bitset` and `bitset`.
160/// A set of Boolean values.
161///
162/// Bit sets are analogous to indicator lamps in a vehicle.
163#[derive(Debug)]
164pub enum BitSet {}
165
166impl<A, P> BufferStrategy<A, P> for BitSet
167where
168 Simple8bRle: RingBuffer<A>,
169 P: InterpolationKind,
170{
171 type Buffer = Simple8bRle;
172}
173
174impl DataSemantic for BitSet {
175 type Metadata = BitSetIndex;
176
177 fn display() -> impl Display {
178 "bitset"
179 }
180}
181
182/// A buffer of serialized data from a time series.
183#[derive(Clone, Debug)]
184struct SerializedTimeSeries {
185 interval: SamplingInterval,
186 data: Vec<u8>,
187}
188
189impl SerializedTimeSeries {
190 /// Gets the sampling interval for the aggregations in the buffer.
191 pub fn interval(&self) -> &SamplingInterval {
192 &self.interval
193 }
194
195 /// Gets the serialized data.
196 pub fn data(&self) -> &[u8] {
197 self.data.as_slice()
198 }
199}
200
201/// An unbuffered statistical time series specification.
202///
203/// This type samples and interpolates timed data and produces aggregations per its statistic and
204/// sampling interval. It is a specification insofar that it does **not** buffer the series of
205/// aggregations.
206#[derive(Clone, Debug)]
207struct TimeSeries<F>
208where
209 F: Statistic,
210{
211 interval: SamplingInterval,
212 statistic: F,
213}
214
215impl<F> TimeSeries<F>
216where
217 F: Statistic,
218{
219 pub fn new(interval: SamplingInterval) -> Self
220 where
221 F: Default,
222 {
223 TimeSeries { interval, statistic: F::default() }
224 }
225
226 pub const fn with_statistic(interval: SamplingInterval, statistic: F) -> Self {
227 TimeSeries { interval, statistic }
228 }
229
230 /// Folds interpolations for intervals intersected by the given [`Tick`] and gets the
231 /// aggregations.
232 ///
233 /// The returned iterator performs the computation and so it must be consumed to change the
234 /// state of the statistic.
235 ///
236 /// [`Tick`]: crate::experimental::clock::Tick
237 #[must_use]
238 fn interpolate_and_get_aggregations<'i, P>(
239 &'i mut self,
240 interpolation: &'i mut P,
241 tick: Tick,
242 ) -> impl 'i + Iterator<Item = Result<(NonZeroUsize, F::Aggregation), FoldError>>
243 where
244 P: Interpolation<F::Sample>,
245 {
246 self.interval.fold_and_get_expirations(tick, PhantomData::<F::Sample>).flat_map(
247 move |expiration| {
248 expiration
249 .interpolate_and_get_aggregation(&mut self.statistic, interpolation)
250 .transpose()
251 },
252 )
253 }
254
255 /// Folds the given sample and interpolations for intervals intersected by the given [`Tick`]
256 /// and gets the aggregations.
257 ///
258 /// The returned iterator performs the computation and so it must be consumed to change the
259 /// state of the statistic.
260 ///
261 /// [`Tick`]: crate::experimental::clock::Tick
262 #[must_use]
263 fn fold_and_get_aggregations<'i, P>(
264 &'i mut self,
265 interpolation: &'i mut P,
266 tick: Tick,
267 sample: F::Sample,
268 ) -> impl 'i + Iterator<Item = Result<(NonZeroUsize, F::Aggregation), FoldError>>
269 where
270 P: Interpolation<F::Sample>,
271 {
272 self.interval.fold_and_get_expirations(tick, sample).flat_map(move |expiration| {
273 expiration.fold_and_get_aggregation(&mut self.statistic, interpolation).transpose()
274 })
275 }
276
277 /// Gets the sampling interval of the series.
278 pub fn interval(&self) -> &SamplingInterval {
279 &self.interval
280 }
281}
282
283impl<F, R, A> TimeSeries<PostAggregation<F, R>>
284where
285 F: Default + Statistic,
286 R: Clone + Fn(F::Aggregation) -> A,
287 A: Clone,
288{
289 pub fn with_transform(interval: SamplingInterval, transform: R) -> Self {
290 TimeSeries { interval, statistic: PostAggregation::from_transform(transform) }
291 }
292}
293
294/// A buffered round-robin statistical time series.
295///
296/// This type composes a [`TimeSeries`] with a round-robin buffer of aggregations and interpolation
297/// state. Aggregations produced by the time series when sampling or interpolating are pushed into
298/// the buffer.
299#[derive(Derivative)]
300#[derivative(
301 Clone(bound = "F: Clone, F::Buffer: Clone, P::Output<F::Sample>: Clone,"),
302 Debug(bound = "F: Debug,
303 F::Buffer: Debug,
304 P::Output<F::Sample>: Debug,")
305)]
306struct BufferedTimeSeries<F, P>
307where
308 F: SerialStatistic<P>,
309 P: InterpolationKind,
310{
311 buffer: F::Buffer,
312 interpolation: P::Output<F::Sample>,
313 series: TimeSeries<F>,
314}
315
316impl<F, P> BufferedTimeSeries<F, P>
317where
318 F: SerialStatistic<P>,
319 P: InterpolationKind,
320{
321 pub fn new(interpolation: P::Output<F::Sample>, series: TimeSeries<F>) -> Self {
322 let buffer = F::buffer(&series.interval);
323 BufferedTimeSeries { buffer, interpolation, series }
324 }
325
326 /// Folds interpolations for intervals intersected by the given [`Tick`] and buffers the
327 /// aggregations.
328 ///
329 /// # Errors
330 ///
331 /// Returns an error if sampling fails.
332 ///
333 /// [`Tick`]: crate::experimental::clock::Tick
334 fn interpolate(&mut self, tick: Tick) -> Result<(), FoldError> {
335 for aggregation in
336 self.series.interpolate_and_get_aggregations(&mut self.interpolation, tick)
337 {
338 let (count, aggregation) = aggregation?;
339 if count.get() == 1 {
340 self.buffer.push(aggregation);
341 } else {
342 self.buffer.fill(aggregation, count);
343 }
344 }
345 Ok(())
346 }
347
348 /// Folds the given sample and interpolations for intervals intersected by the given [`Tick`]
349 /// and buffers the aggregations.
350 ///
351 /// # Errors
352 ///
353 /// Returns an error if sampling fails.
354 ///
355 /// [`Tick`]: crate::experimental::clock::Tick
356 fn fold(&mut self, tick: Tick, sample: F::Sample) -> Result<(), FoldError> {
357 for aggregation in
358 self.series.fold_and_get_aggregations(&mut self.interpolation, tick, sample)
359 {
360 let (count, aggregation) = aggregation?;
361 if count.get() == 1 {
362 self.buffer.push(aggregation);
363 } else {
364 self.buffer.fill(aggregation, count);
365 }
366 }
367 Ok(())
368 }
369
370 pub fn serialize_and_get_buffer(&self) -> io::Result<SerializedTimeSeries> {
371 let mut data = vec![];
372 self.buffer.serialize(&mut data)?;
373 Ok(SerializedTimeSeries { interval: *self.series.interval(), data })
374 }
375}
376
377/// A buffer of data from time matrix.
378#[derive(Clone, Debug, PartialEq)]
379pub struct SerializedBuffer {
380 pub data_semantic: String,
381 pub data: Vec<u8>,
382}
383
384impl SerializedBuffer {
385 /// Records the current state of this `TimeMatrix` into `node`.
386 pub fn write_to_inspect(self, node: &fuchsia_inspect::Node) {
387 let Self { data_semantic, data } = self;
388 node.record_string("type", data_semantic);
389 node.record_bytes("data", data);
390 }
391
392 /// Records an attempt at retrieving a serialized buffer to inspect.
393 pub fn write_to_inspect_or_error<E: Debug>(
394 result: Result<Self, E>,
395 node: &fuchsia_inspect::Node,
396 ) {
397 match result {
398 Ok(b) => b.write_to_inspect(node),
399 Err(e) => node.record_string("type", format!("error: {:?}", e)),
400 }
401 }
402}
403
404/// One or more statistical round-robin time series.
405///
406/// A time matrix is a round-robin multi-resolution time series that samples and interpolates timed
407/// data, computes statistical aggregations for elapsed [sampling intervals][`SamplingInterval`],
408/// and buffers those aggregations. The sample data, statistic, and interpolation of series in a
409/// time matrix must be the same, but the sampling intervals can and should differ.
410#[derive(Derivative)]
411#[derivative(
412 Clone(bound = "F: Clone, F::Buffer: Clone, P::Output<F::Sample>: Clone,"),
413 Debug(bound = "F: Debug,
414 F::Buffer: Debug,
415 P::Output<F::Sample>: Debug,")
416)]
417pub struct TimeMatrix<F, P>
418where
419 F: SerialStatistic<P>,
420 P: InterpolationKind,
421{
422 created: Timestamp,
423 last: ObservationTime,
424 buffers: Vec1<BufferedTimeSeries<F, P>>,
425}
426
427impl<F, P> TimeMatrix<F, P>
428where
429 F: SerialStatistic<P>,
430 P: InterpolationKind,
431{
432 fn from_series_with<Q>(
433 created: Timestamp,
434 series: impl Into<Vec1<TimeSeries<F>>>,
435 mut interpolation: Q,
436 ) -> Self
437 where
438 Q: FnMut() -> P::Output<F::Sample>,
439 {
440 let buffers =
441 series.into().map_into(|series| BufferedTimeSeries::new((interpolation)(), series));
442 TimeMatrix { created, last: ObservationTime::at(created), buffers }
443 }
444
445 /// Constructs a time matrix with the given sampling profile and interpolation.
446 ///
447 /// Statistics are default initialized.
448 pub fn new(profile: impl Into<SamplingProfile>, interpolation: P::Output<F::Sample>) -> Self
449 where
450 F: Default,
451 {
452 Self::new_at(Timestamp::now(), profile, interpolation)
453 }
454
455 pub(crate) fn new_at(
456 timestamp: Timestamp,
457 profile: impl Into<SamplingProfile>,
458 interpolation: P::Output<F::Sample>,
459 ) -> Self
460 where
461 F: Default,
462 {
463 let sampling_intervals = profile.into().into_sampling_intervals();
464 TimeMatrix::from_series_with(
465 timestamp,
466 sampling_intervals.map_into(TimeSeries::new),
467 || interpolation.clone(),
468 )
469 }
470
471 /// Constructs a time matrix with the given statistic.
472 pub fn with_statistic(
473 profile: impl Into<SamplingProfile>,
474 interpolation: P::Output<F::Sample>,
475 statistic: F,
476 ) -> Self {
477 let sampling_intervals = profile.into().into_sampling_intervals();
478 TimeMatrix::from_series_with(
479 Timestamp::now(),
480 sampling_intervals
481 .map_into(|window| TimeSeries::with_statistic(window, statistic.clone())),
482 || interpolation.clone(),
483 )
484 }
485
486 /// Folds the given sample and interpolations and gets the aggregation buffers.
487 ///
488 /// To fold a sample without serializing buffers, use [`Sampler::fold`].
489 ///
490 /// [`Sampler::fold`]: crate::experimental::series::Sampler::fold
491 pub fn fold_and_get_buffers(
492 &mut self,
493 sample: Timed<F::Sample>,
494 ) -> Result<SerializedBuffer, FoldError> {
495 self.fold(sample)?;
496 let series_buffers = self
497 .buffers
498 .try_map_ref(BufferedTimeSeries::serialize_and_get_buffer)
499 .map_err::<FoldError, _>(From::from)?;
500 self.serialize(series_buffers).map_err(From::from)
501 }
502
503 fn serialize(
504 &self,
505 series_buffers: Vec1<SerializedTimeSeries>,
506 ) -> io::Result<SerializedBuffer> {
507 use crate::experimental::clock::DurationExt;
508 use byteorder::{LittleEndian, WriteBytesExt};
509 use std::io::Write;
510
511 let created_timestamp = u32::try_from(self.created.quantize()).unwrap_or(u32::MAX);
512 let end_timestamp =
513 u32::try_from(self.last.last_update_timestamp.quantize()).unwrap_or(u32::MAX);
514
515 let mut buffer = vec![];
516 buffer.write_u8(1)?; // Version number.
517 buffer.write_u32::<LittleEndian>(created_timestamp)?; // Matrix creation time.
518 buffer.write_u32::<LittleEndian>(end_timestamp)?; // Last observed or interpolated sample
519 // time.
520 encoding::serialize_buffer_type_descriptors::<F, P>(&mut buffer)?; // Buffer descriptors.
521
522 for series in series_buffers {
523 const GRANULARITY_FIELD_LEN: usize = 2;
524 let len = u16::try_from(series.data.len() + GRANULARITY_FIELD_LEN).unwrap_or(u16::MAX);
525 let granularity =
526 u16::try_from(series.interval().duration().into_quanta()).unwrap_or(u16::MAX);
527
528 buffer.write_u16::<LittleEndian>(len)?;
529 buffer.write_u16::<LittleEndian>(granularity)?;
530 buffer.write_all(&series.data[..len as usize - GRANULARITY_FIELD_LEN])?;
531 }
532 Ok(SerializedBuffer {
533 data_semantic: format!("{}", <F as Statistic>::Semantic::display()),
534 data: buffer,
535 })
536 }
537}
538
539impl<F, R, P, A> TimeMatrix<PostAggregation<F, R>, P>
540where
541 PostAggregation<F, R>: SerialStatistic<P, Aggregation = A>,
542 F: Default + SerialStatistic<P>,
543 R: Clone + Fn(F::Aggregation) -> A,
544 P: InterpolationKind,
545 A: Clone,
546{
547 /// Constructs a time matrix with the default statistic and given transform for
548 /// post-aggregation.
549 pub fn with_transform(
550 profile: impl Into<SamplingProfile>,
551 interpolation: P::Output<<PostAggregation<F, R> as Statistic>::Sample>,
552 transform: R,
553 ) -> Self
554 where
555 R: Clone,
556 {
557 let sampling_intervals = profile.into().into_sampling_intervals();
558 TimeMatrix::from_series_with(
559 Timestamp::now(),
560 sampling_intervals
561 .map_into(|window| TimeSeries::with_transform(window, transform.clone())),
562 || interpolation.clone(),
563 )
564 }
565}
566
567impl<F, P> Default for TimeMatrix<F, P>
568where
569 F: Default + SerialStatistic<P>,
570 P: InterpolationKind,
571 P::Output<F::Sample>: Default,
572{
573 fn default() -> Self {
574 TimeMatrix::new(SamplingProfile::default(), P::Output::default())
575 }
576}
577
578impl<F, P> TimeMatrixFold<F::Sample> for TimeMatrix<F, P>
579where
580 F: SerialStatistic<P>,
581 P: InterpolationKind,
582{
583 fn fold(&mut self, sample: Timed<F::Sample>) -> Result<(), FoldError> {
584 let (timestamp, sample) = sample.into();
585 let tick = self.last.tick(timestamp, true)?;
586 Ok(for buffer in self.buffers.iter_mut() {
587 buffer.fold(tick, sample.clone())?;
588 })
589 }
590}
591
592impl<F, P> TimeMatrixTick for TimeMatrix<F, P>
593where
594 F: SerialStatistic<P>,
595 P: InterpolationKind,
596{
597 fn tick(&mut self, timestamp: Timestamp) -> Result<(), FoldError> {
598 let tick = self.last.tick(timestamp.into(), false)?;
599 Ok(for buffer in self.buffers.iter_mut() {
600 buffer.interpolate(tick)?;
601 })
602 }
603
604 fn tick_and_get_buffers(
605 &mut self,
606 timestamp: Timestamp,
607 ) -> Result<SerializedBuffer, FoldError> {
608 self.tick(timestamp)?;
609 let series_buffers = self
610 .buffers
611 .try_map_ref(BufferedTimeSeries::serialize_and_get_buffer)
612 .map_err::<FoldError, _>(From::from)?;
613 self.serialize(series_buffers).map_err(From::from)
614 }
615}
616
617#[cfg(test)]
618mod tests {
619 use fuchsia_async as fasync;
620
621 use crate::experimental::clock::{Timed, Timestamp};
622 use crate::experimental::series::interpolation::{ConstantSample, LastSample};
623 use crate::experimental::series::statistic::{
624 ArithmeticMean, LatchMax, Max, PostAggregation, Sum, Transform, Union,
625 };
626 use crate::experimental::series::{
627 SamplingProfile, TimeMatrix, TimeMatrixFold, TimeMatrixTick,
628 };
629
630 fn fold_and_interpolate_f32(matrix: &mut impl TimeMatrixFold<f32>) {
631 matrix.fold(Timed::now(0.0)).unwrap();
632 matrix.fold(Timed::now(1.0)).unwrap();
633 matrix.fold(Timed::now(2.0)).unwrap();
634 matrix.tick(Timestamp::now()).unwrap();
635 }
636
637 // TODO(https://fxbug.dev/356218503): Replace this with meaningful unit tests that assert the
638 // outputs of a `TimeMatrix`.
639 // This "test" is considered successful as long as it builds.
640 #[test]
641 fn static_test_define_time_matrix() {
642 type Mean<T> = ArithmeticMean<T>;
643 type MeanTransform<T, F> = Transform<Mean<T>, F>;
644
645 let _exec = fasync::TestExecutor::new_with_fake_time();
646
647 // Arithmetic mean time matrices.
648 let _ = TimeMatrix::<Mean<f32>, ConstantSample>::default();
649 let _ = TimeMatrix::<Mean<f32>, LastSample>::new(
650 SamplingProfile::balanced(),
651 LastSample::or(0.0f32),
652 );
653 let _ = TimeMatrix::<_, ConstantSample>::with_statistic(
654 SamplingProfile::granular(),
655 ConstantSample::default(),
656 Mean::<f32>::default(),
657 );
658
659 // Discrete arithmetic mean time matrices.
660 let mut matrix = TimeMatrix::<MeanTransform<f32, i64>, LastSample>::with_transform(
661 SamplingProfile::highly_granular(),
662 LastSample::or(0.0f32),
663 |aggregation| aggregation.ceil() as i64,
664 );
665 fold_and_interpolate_f32(&mut matrix);
666 // This time matrix is constructed verbosely with no ad-hoc type definitions nor ergonomic
667 // constructors. This is as raw as it gets.
668 let mut matrix = TimeMatrix::<_, ConstantSample>::with_statistic(
669 SamplingProfile::default(),
670 ConstantSample::default(),
671 PostAggregation::<ArithmeticMean<f32>, _>::from_transform(|aggregation: f32| {
672 aggregation.ceil() as i64
673 }),
674 );
675 fold_and_interpolate_f32(&mut matrix);
676 }
677
678 // TODO(https://fxbug.dev/356218503): Replace this with meaningful unit tests that assert the
679 // outputs of a `TimeMatrix`.
680 // This "test" is considered successful as long as it builds.
681 #[test]
682 fn static_test_supported_statistic_and_interpolation_combinations() {
683 let _exec = fasync::TestExecutor::new_with_fake_time();
684
685 let _ = TimeMatrix::<ArithmeticMean<f32>, ConstantSample>::default();
686 let _ = TimeMatrix::<ArithmeticMean<f32>, LastSample>::default();
687 let _ = TimeMatrix::<LatchMax<u64>, LastSample>::default();
688 let _ = TimeMatrix::<Max<u64>, ConstantSample>::default();
689 let _ = TimeMatrix::<Max<u64>, LastSample>::default();
690 let _ = TimeMatrix::<Sum<u64>, ConstantSample>::default();
691 let _ = TimeMatrix::<Sum<u64>, LastSample>::default();
692 let _ = TimeMatrix::<Union<u64>, ConstantSample>::default();
693 let _ = TimeMatrix::<Union<u64>, LastSample>::default();
694 }
695
696 #[test]
697 fn time_matrix_with_uncompressed_buffer() {
698 let exec = fasync::TestExecutor::new_with_fake_time();
699 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
700 let mut time_matrix = TimeMatrix::<ArithmeticMean<f32>, ConstantSample>::new(
701 SamplingProfile::highly_granular(),
702 ConstantSample::default(),
703 );
704 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
705 assert_eq!(
706 buffer.data,
707 vec![
708 1, // version number
709 3, 0, 0, 0, // created timestamp
710 3, 0, 0, 0, // last timestamp
711 0, 0, // type: uncompressed; subtype: f32
712 4, 0, // series 1: length in bytes
713 10, 0, // series 1 granularity: 10s
714 0, 0, // number of elements
715 4, 0, // series 2: length in bytes
716 60, 0, // series 2 granularity: 60s
717 0, 0, // number of elements
718 ]
719 );
720
721 time_matrix.fold(Timed::now(f32::from_bits(42u32))).unwrap();
722 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
723 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
724 assert_eq!(
725 buffer.data,
726 vec![
727 1, // version number
728 3, 0, 0, 0, // created timestamp
729 10, 0, 0, 0, // last timestamp
730 0, 0, // type: uncompressed; subtype: f32
731 8, 0, // series 1: length in bytes
732 10, 0, // series 1 granularity: 10s
733 1, 0, // number of elements
734 42, 0, 0, 0, // item 1
735 4, 0, // series 2: length in bytes
736 60, 0, // series 2 granularity: 60s
737 0, 0, // number of elements
738 ]
739 );
740
741 // Advance several time steps to test ring buffer's `fill`
742 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(50_000_000_000));
743 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
744 assert_eq!(
745 buffer.data,
746 vec![
747 1, // version number
748 3, 0, 0, 0, // created timestamp
749 50, 0, 0, 0, // last timestamp
750 0, 0, // type: uncompressed; subtype: f32
751 24, 0, // series 1: length in bytes
752 10, 0, // series 1 granularity: 10s
753 5, 0, // number of elements
754 42, 0, 0, 0, // item 1
755 0, 0, 0, 0, // item 2
756 0, 0, 0, 0, // item 3
757 0, 0, 0, 0, // item 4
758 0, 0, 0, 0, // item 5
759 4, 0, // series 2: length in bytes
760 60, 0, // series 2 granularity: 60s
761 0, 0, // number of elements
762 ]
763 );
764 }
765
766 #[test]
767 fn time_matrix_with_simple8b_rle_buffer() {
768 let exec = fasync::TestExecutor::new_with_fake_time();
769 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
770 let mut time_matrix = TimeMatrix::<Max<u64>, ConstantSample>::new(
771 SamplingProfile::highly_granular(),
772 ConstantSample::default(),
773 );
774 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
775 assert_eq!(
776 buffer.data,
777 vec![
778 1, // version number
779 3, 0, 0, 0, // created timestamp
780 3, 0, 0, 0, // last timestamp
781 1, 0, // type: simple8b RLE; subtype: unsigned
782 7, 0, // series 1: length in bytes
783 10, 0, // series 1 granularity: 10s
784 0, 0, // number of selector elements and value blocks
785 0, 0, // head selector index
786 0, // number of values in last block
787 7, 0, // series 2: length in bytes
788 60, 0, // series 2 granularity: 60s
789 0, 0, // number of selector elements and value blocks
790 0, 0, // head selector index
791 0, // number of values in last block
792 ]
793 );
794
795 time_matrix.fold(Timed::now(15)).unwrap();
796 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
797 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
798 assert_eq!(
799 buffer.data,
800 vec![
801 1, // version number
802 3, 0, 0, 0, // created timestamp
803 10, 0, 0, 0, // last timestamp
804 1, 0, // type: simple8b RLE; subtype: unsigned
805 16, 0, // series 1: length in bytes
806 10, 0, // series 1 granularity: 10s
807 1, 0, // number of selector elements and value blocks
808 0, 0, // head selector index
809 1, // number of values in last block
810 0x0f, // RLE selector
811 15, 0, 0, 0, 0, 0, 1, 0, // value 15 appears 1 time
812 7, 0, // series 2: length in bytes
813 60, 0, // series 2 granularity: 60s
814 0, 0, // number of selector elements and value blocks
815 0, 0, // head selector index
816 0, // number of values in last block
817 ]
818 );
819
820 // Advance several time steps to test ring buffer's `fill`
821 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(50_000_000_000));
822 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
823 assert_eq!(
824 buffer.data,
825 vec![
826 1, // version number
827 3, 0, 0, 0, // created timestamp
828 50, 0, 0, 0, // last timestamp
829 1, 0, // type: simple8b RLE; subtype: unsigned
830 16, 0, // series 1: length in bytes
831 10, 0, // series 1 granularity: 10s
832 1, 0, // number of selector elements and value blocks
833 0, 0, // head selector index
834 5, // number of values in last block
835 0x03, // 4-bit selector
836 0x0f, 0, 0, 0, 0, 0, 0, 0, // values 15, 0, 0, 0, 0
837 7, 0, // series 2: length in bytes
838 60, 0, // series 2 granularity: 60s
839 0, 0, // number of selector elements and value blocks
840 0, 0, // head selector index
841 0, // number of values in last block
842 ]
843 );
844 }
845
846 #[test]
847 fn time_matrix_with_zigzag_simple8b_rle_buffer() {
848 let exec = fasync::TestExecutor::new_with_fake_time();
849 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
850 let mut time_matrix = TimeMatrix::<Max<i64>, ConstantSample>::new(
851 SamplingProfile::highly_granular(),
852 ConstantSample::default(),
853 );
854 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
855 assert_eq!(
856 buffer.data,
857 vec![
858 1, // version number
859 3, 0, 0, 0, // created timestamp
860 3, 0, 0, 0, // last timestamp
861 1, 1, // type: simple8b RLE; subtype: signed (zigzag encoded)
862 7, 0, // series 1: length in bytes
863 10, 0, // series 1 granularity: 10s
864 0, 0, // number of selector elements and value blocks
865 0, 0, // head selector index
866 0, // number of values in last block
867 7, 0, // series 2: length in bytes
868 60, 0, // series 2 granularity: 60s
869 0, 0, // number of selector elements and value blocks
870 0, 0, // head selector index
871 0, // number of values in last block
872 ]
873 );
874
875 time_matrix.fold(Timed::now(-8)).unwrap();
876 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
877 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
878 assert_eq!(
879 buffer.data,
880 vec![
881 1, // version number
882 3, 0, 0, 0, // created timestamp
883 10, 0, 0, 0, // last timestamp
884 1, 1, // type: simple8b RLE; subtype: signed (zigzag encoded)
885 16, 0, // series 1: length in bytes
886 10, 0, // series 1 granularity: 10s
887 1, 0, // number of selector elements and value blocks
888 0, 0, // head selector index
889 1, // number of values in last block
890 0x0f, // RLE selector
891 15, 0, 0, 0, 0, 0, 1, 0, // value -8 (encoded as 15) appears 1 time
892 7, 0, // series 2: length in bytes
893 60, 0, // series 2 granularity: 60s
894 0, 0, // number of selector elements and value blocks
895 0, 0, // head selector index
896 0, // number of values in last block
897 ]
898 );
899
900 // Advance several time steps to test ring buffer's `fill`
901 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(50_000_000_000));
902 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
903 assert_eq!(
904 buffer.data,
905 vec![
906 1, // version number
907 3, 0, 0, 0, // created timestamp
908 50, 0, 0, 0, // last timestamp
909 1, 1, // type: simple8b RLE; subtype: signed (zigzag encoded)
910 16, 0, // series 1: length in bytes
911 10, 0, // series 1 granularity: 10s
912 1, 0, // number of selector elements and value blocks
913 0, 0, // head selector index
914 5, // number of values in last block
915 0x03, // 4-bit selector
916 0x0f, 0, 0, 0, 0, 0, 0, 0, // values -8 (encoded as 15), 0, 0, 0, 0
917 7, 0, // series 2: length in bytes
918 60, 0, // series 2 granularity: 60s
919 0, 0, // number of selector elements and value blocks
920 0, 0, // head selector index
921 0, // number of values in last block
922 ]
923 );
924 }
925
926 #[test]
927 fn time_matrix_with_delta_simple8b_rle_buffer() {
928 let exec = fasync::TestExecutor::new_with_fake_time();
929 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
930 let mut time_matrix = TimeMatrix::<LatchMax<u64>, LastSample>::new(
931 SamplingProfile::highly_granular(),
932 LastSample::or(0),
933 );
934 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
935 assert_eq!(
936 buffer.data,
937 vec![
938 1, // version number
939 3, 0, 0, 0, // created timestamp
940 3, 0, 0, 0, // last timestamp
941 2, 0, // type: delta simple8b RLE; subtype: unsigned
942 7, 0, // series 1: length in bytes
943 10, 0, // series 1 granularity: 10s
944 0, 0, // number of base value + selector elements or value blocks
945 0, 0, // head selector index
946 0, // number of values in last block
947 7, 0, // series 2: length in bytes
948 60, 0, // series 2 granularity: 60s
949 0, 0, // number of base value + selector elements or value blocks
950 0, 0, // head selector index
951 0, // number of values in last block
952 ]
953 );
954
955 time_matrix.fold(Timed::now(42)).unwrap();
956 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
957 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
958 assert_eq!(
959 buffer.data,
960 vec![
961 1, // version number
962 3, 0, 0, 0, // created timestamp
963 10, 0, 0, 0, // last timestamp
964 2, 0, // type: delta simple8b RLE; subtype: unsigned
965 15, 0, // series 1: length in bytes
966 10, 0, // series 1 granularity: 10s
967 1, 0, // number of base value + selector elements or value blocks
968 0, 0, // head selector index
969 0, // number of values in last block
970 42, 0, 0, 0, 0, 0, 0, 0, // base value
971 7, 0, // series 2: length in bytes
972 60, 0, // series 2 granularity: 60s
973 0, 0, // number of base value + selector elements or value blocks
974 0, 0, // head selector index
975 0, // number of values in last block
976 ]
977 );
978
979 time_matrix.fold(Timed::now(57)).unwrap();
980 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(20_000_000_000));
981 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
982 assert_eq!(
983 buffer.data,
984 vec![
985 1, // version number
986 3, 0, 0, 0, // created timestamp
987 20, 0, 0, 0, // last timestamp
988 2, 0, // type: delta simple8b RLE; subtype: unsigned
989 24, 0, // series 1: length in bytes
990 10, 0, // series 1 granularity: 10s
991 2, 0, // number of base value + selector elements or value blocks
992 0, 0, // head selector index
993 1, // number of values in last block
994 42, 0, 0, 0, 0, 0, 0, 0, // base value
995 0x0f, // RLE selector
996 15, 0, 0, 0, 0, 0, 1, 0, // value 15 (delta) appears 1 time
997 7, 0, // series 2: length in bytes
998 60, 0, // series 2 granularity: 60s
999 0, 0, // number of base value + selector elements or value blocks
1000 0, 0, // head selector index
1001 0, // number of values in last block
1002 ]
1003 );
1004
1005 // Advance several time steps to test ring buffer's `fill`
1006 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(50_000_000_000));
1007 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
1008 assert_eq!(
1009 buffer.data,
1010 vec![
1011 1, // version number
1012 3, 0, 0, 0, // created timestamp
1013 50, 0, 0, 0, // last timestamp
1014 2, 0, // type: delta simple8b RLE; subtype: unsigned
1015 24, 0, // series 1: length in bytes
1016 10, 0, // series 1 granularity: 10s
1017 2, 0, // number of base value + selector elements or value blocks
1018 0, 0, // head selector index
1019 4, // number of values in last block
1020 42, 0, 0, 0, 0, 0, 0, 0, // base value
1021 0x03, // 4-bit selector
1022 0x0f, 0, 0, 0, 0, 0, 0, 0, // values 15, 0, 0, 0
1023 7, 0, // series 2: length in bytes
1024 60, 0, // series 2 granularity: 60s
1025 0, 0, // number of base value + selector elements or value blocks
1026 0, 0, // head selector index
1027 0, // number of values in last block
1028 ]
1029 );
1030 }
1031
1032 #[test]
1033 fn time_matrix_with_delta_zigzag_simple8b_rle_buffer_i64() {
1034 let exec = fasync::TestExecutor::new_with_fake_time();
1035 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
1036 let mut time_matrix = TimeMatrix::<Max<i64>, LastSample>::new(
1037 SamplingProfile::highly_granular(),
1038 LastSample::or(0),
1039 );
1040 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
1041 assert_eq!(
1042 buffer.data,
1043 vec![
1044 1, // version number
1045 3, 0, 0, 0, // created timestamp
1046 3, 0, 0, 0, // last timestamp
1047 2, 1, // type: delta simple8b RLE; subtype: signed
1048 7, 0, // series 1: length in bytes
1049 10, 0, // series 1 granularity: 10s
1050 0, 0, // number of base value + selector elements or value blocks
1051 0, 0, // head selector index
1052 0, // number of values in last block
1053 7, 0, // series 2: length in bytes
1054 60, 0, // series 2 granularity: 60s
1055 0, 0, // number of base value + selector elements or value blocks
1056 0, 0, // head selector index
1057 0, // number of values in last block
1058 ]
1059 );
1060
1061 time_matrix.fold(Timed::now(42)).unwrap();
1062 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
1063 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
1064 assert_eq!(
1065 buffer.data,
1066 vec![
1067 1, // version number
1068 3, 0, 0, 0, // created timestamp
1069 10, 0, 0, 0, // last timestamp
1070 2, 1, // type: delta simple8b RLE; subtype: signed
1071 15, 0, // series 1: length in bytes
1072 10, 0, // series 1 granularity: 10s
1073 1, 0, // number of base value + selector elements or value blocks
1074 0, 0, // head selector index
1075 0, // number of values in last block
1076 42, 0, 0, 0, 0, 0, 0, 0, // base value
1077 7, 0, // series 2: length in bytes
1078 60, 0, // series 2 granularity: 60s
1079 0, 0, // number of base value + selector elements or value blocks
1080 0, 0, // head selector index
1081 0, // number of values in last block
1082 ]
1083 );
1084
1085 time_matrix.fold(Timed::now(34)).unwrap();
1086 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(20_000_000_000));
1087 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
1088 assert_eq!(
1089 buffer.data,
1090 vec![
1091 1, // version number
1092 3, 0, 0, 0, // created timestamp
1093 20, 0, 0, 0, // last timestamp
1094 2, 1, // type: delta simple8b RLE; subtype: signed
1095 24, 0, // series 1: length in bytes
1096 10, 0, // series 1 granularity: 10s
1097 2, 0, // number of base value + selector elements or value blocks
1098 0, 0, // head selector index
1099 1, // number of values in last block
1100 42, 0, 0, 0, 0, 0, 0, 0, // base value
1101 0x0f, // RLE selector
1102 15, 0, 0, 0, 0, 0, 1, 0, // value -8 (delta) encoded as 15, appearing 1 time
1103 7, 0, // series 2: length in bytes
1104 60, 0, // series 2 granularity: 60s
1105 0, 0, // number of base value + selector elements or value blocks
1106 0, 0, // head selector index
1107 0, // number of values in last block
1108 ]
1109 );
1110
1111 // Advance several time steps to test ring buffer's `fill`
1112 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(50_000_000_000));
1113 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
1114 assert_eq!(
1115 buffer.data,
1116 vec![
1117 1, // version number
1118 3, 0, 0, 0, // created timestamp
1119 50, 0, 0, 0, // last timestamp
1120 2, 1, // type: delta simple8b RLE; subtype: signed
1121 24, 0, // series 1: length in bytes
1122 10, 0, // series 1 granularity: 10s
1123 2, 0, // number of base value + selector elements or value blocks
1124 0, 0, // head selector index
1125 4, // number of values in last block
1126 42, 0, 0, 0, 0, 0, 0, 0, // base value
1127 0x03, // 4-bit selector
1128 0x0f, 0, 0, 0, 0, 0, 0, 0, // diff values -8 (encoded as 15), 0, 0, 0
1129 7, 0, // series 2: length in bytes
1130 60, 0, // series 2 granularity: 60s
1131 0, 0, // number of base value + selector elements or value blocks
1132 0, 0, // head selector index
1133 0, // number of values in last block
1134 ]
1135 );
1136 }
1137
1138 #[test]
1139 fn time_matrix_with_delta_zigzag_simple8b_rle_buffer_u64() {
1140 let exec = fasync::TestExecutor::new_with_fake_time();
1141 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
1142 let mut time_matrix = TimeMatrix::<Max<u64>, LastSample>::new(
1143 SamplingProfile::highly_granular(),
1144 LastSample::or(0),
1145 );
1146 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
1147 assert_eq!(
1148 buffer.data,
1149 vec![
1150 1, // version number
1151 3, 0, 0, 0, // created timestamp
1152 3, 0, 0, 0, // last timestamp
1153 2, 2, // type: delta simple8b RLE; subtype: unsigned with signed diff
1154 7, 0, // series 1: length in bytes
1155 10, 0, // series 1 granularity: 10s
1156 0, 0, // number of base value + selector elements or value blocks
1157 0, 0, // head selector index
1158 0, // number of values in last block
1159 7, 0, // series 2: length in bytes
1160 60, 0, // series 2 granularity: 60s
1161 0, 0, // number of base value + selector elements or value blocks
1162 0, 0, // head selector index
1163 0, // number of values in last block
1164 ]
1165 );
1166
1167 time_matrix.fold(Timed::now(1)).unwrap();
1168 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
1169 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
1170 assert_eq!(
1171 buffer.data,
1172 vec![
1173 1, // version number
1174 3, 0, 0, 0, // created timestamp
1175 10, 0, 0, 0, // last timestamp
1176 2, 2, // type: delta simple8b RLE; subtype: unsigned with signed diff
1177 15, 0, // series 1: length in bytes
1178 10, 0, // series 1 granularity: 10s
1179 1, 0, // number of base value + selector elements or value blocks
1180 0, 0, // head selector index
1181 0, // number of values in last block
1182 1, 0, 0, 0, 0, 0, 0, 0, // base value
1183 7, 0, // series 2: length in bytes
1184 60, 0, // series 2 granularity: 60s
1185 0, 0, // number of base value + selector elements or value blocks
1186 0, 0, // head selector index
1187 0, // number of values in last block
1188 ]
1189 );
1190
1191 time_matrix.fold(Timed::now(u64::MAX)).unwrap();
1192 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(20_000_000_000));
1193 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
1194 assert_eq!(
1195 buffer.data,
1196 vec![
1197 1, // version number
1198 3, 0, 0, 0, // created timestamp
1199 20, 0, 0, 0, // last timestamp
1200 2, 2, // type: delta simple8b RLE; subtype: unsigned with signed diff
1201 24, 0, // series 1: length in bytes
1202 10, 0, // series 1 granularity: 10s
1203 2, 0, // number of base value + selector elements or value blocks
1204 0, 0, // head selector index
1205 1, // number of values in last block
1206 1, 0, 0, 0, 0, 0, 0, 0, // base value
1207 0x0f, // RLE selector
1208 3, 0, 0, 0, 0, 0, 1, 0, // value -2 (delta) encoded as 3, appearing 1 time
1209 7, 0, // series 2: length in bytes
1210 60, 0, // series 2 granularity: 60s
1211 0, 0, // number of base value + selector elements or value blocks
1212 0, 0, // head selector index
1213 0, // number of values in last block
1214 ]
1215 );
1216
1217 // Advance several time steps to test ring buffer's `fill`
1218 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(50_000_000_000));
1219 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
1220 assert_eq!(
1221 buffer.data,
1222 vec![
1223 1, // version number
1224 3, 0, 0, 0, // created timestamp
1225 50, 0, 0, 0, // last timestamp
1226 2, 2, // type: delta simple8b RLE; subtype: unsigned with signed diff
1227 24, 0, // series 1: length in bytes
1228 10, 0, // series 1 granularity: 10s
1229 2, 0, // number of base value + selector elements or value blocks
1230 0, 0, // head selector index
1231 4, // number of values in last block
1232 1, 0, 0, 0, 0, 0, 0, 0, // base value
1233 0x01, // 2-bit selector
1234 3, 0, 0, 0, 0, 0, 0, 0, // diff values -2 (encoded as 3), 0, 0, 0
1235 7, 0, // series 2: length in bytes
1236 60, 0, // series 2 granularity: 60s
1237 0, 0, // number of base value + selector elements or value blocks
1238 0, 0, // head selector index
1239 0, // number of values in last block
1240 ]
1241 );
1242 }
1243}