windowed_stats/experimental/series/
mod.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Round-robin multi-resolution time series.
6
7mod interval;
8
9pub(crate) mod buffer;
10
11pub mod interpolation;
12pub mod metadata;
13pub mod statistic;
14
15use derivative::Derivative;
16use std::fmt::{Debug, Display};
17use std::io;
18use std::marker::PhantomData;
19use std::num::NonZeroUsize;
20
21use crate::experimental::Vec1;
22use crate::experimental::clock::{ObservationTime, Tick, Timed, Timestamp, TimestampExt};
23use crate::experimental::series::buffer::{
24    BufferStrategy, DeltaSimple8bRle, DeltaZigzagSimple8bRle, RingBuffer, Simple8bRle,
25    Uncompressed, ZigzagSimple8bRle, encoding,
26};
27use crate::experimental::series::interpolation::{
28    ConstantSample, Interpolation, InterpolationKind, LastSample,
29};
30use crate::experimental::series::metadata::{BitSetIndex, Metadata};
31use crate::experimental::series::statistic::{
32    FoldError, PostAggregation, SerialStatistic, Statistic,
33};
34
35pub use crate::experimental::series::buffer::Capacity;
36pub use crate::experimental::series::interval::{SamplingInterval, SamplingProfile};
37
38/// A [`TimeMatrix`] type that can be advanced forward in time.
39///
40/// This trait provides tick operations for `[TimeMatrix`] types. Ticking a [`TimeMatrix`] causes
41/// sample interpolation within and aggregation propagation across [`SamplingInterval`]s.
42///
43/// Importantly, this trait is `dyn` compatible and type erased; it can be used to tick a
44/// [`TimeMatrix`] regardless of its input type parameters (sample type, interpolation type, etc.).
45///
46/// See also the [`TimeMatrixFold`] subtrait.
47pub trait TimeMatrixTick {
48    fn tick(&mut self, timestamp: Timestamp) -> Result<(), FoldError>;
49
50    fn tick_and_get_buffers(&mut self, timestamp: Timestamp)
51    -> Result<SerializedBuffer, FoldError>;
52}
53
54/// A [`TimeMatrix`] type that can sample data.
55///
56/// This trait provides fold operations for `TimeMatrix` types. Folding samples updates
57/// aggregations and advances a [`TimeMatrix`] forward in time.
58///
59/// See also the [`TimeMatrixTick`] supertrait. This trait supports both ticking and sampling, but
60/// is not completely type erased: the sample input type parameter `T` is needed.
61pub trait TimeMatrixFold<T>: TimeMatrixTick {
62    fn fold(&mut self, sample: Timed<T>) -> Result<(), FoldError>;
63}
64
65/// A type that describes the semantics of data folded by `Sampler`s.
66///
67/// Data semantics determine how statistics are interpreted and time series are aggregated and
68/// buffered.
69pub trait DataSemantic {
70    type Metadata: Metadata;
71
72    fn display() -> impl Display;
73}
74
75/// A continually increasing value.
76///
77/// Counters are analogous to an odometer in a vehicle.
78#[derive(Debug)]
79pub enum Counter {}
80
81impl BufferStrategy<u64, LastSample> for Counter {
82    type Buffer = DeltaSimple8bRle;
83}
84
85impl DataSemantic for Counter {
86    type Metadata = ();
87
88    fn display() -> impl Display {
89        "counter"
90    }
91}
92
93/// A fluctuating value.
94///
95/// Gauges are analogous to a speedometer in a vehicle.
96#[derive(Debug)]
97pub enum Gauge {}
98
99impl<P> BufferStrategy<f32, P> for Gauge
100where
101    P: InterpolationKind,
102{
103    type Buffer = Uncompressed<f32>;
104}
105
106impl BufferStrategy<i64, ConstantSample> for Gauge {
107    type Buffer = ZigzagSimple8bRle;
108}
109
110impl BufferStrategy<i64, LastSample> for Gauge {
111    type Buffer = DeltaZigzagSimple8bRle<i64>;
112}
113
114impl BufferStrategy<u64, ConstantSample> for Gauge {
115    type Buffer = Simple8bRle;
116}
117
118impl BufferStrategy<u64, LastSample> for Gauge {
119    type Buffer = DeltaZigzagSimple8bRle<u64>;
120}
121
122impl DataSemantic for Gauge {
123    type Metadata = ();
124
125    fn display() -> impl Display {
126        "gauge"
127    }
128}
129
130/// A semantic like `Gauge` that avoids [`DeltaZigzagSimple8bRle`] until we fix
131/// some other issues.
132///
133/// TODO(https://fxbug.dev/436253782): Delete this type when the viewer can
134/// decode `DeltaZigzagSimple8bRle`.
135///
136/// OR
137///
138/// TODO(https://fxbug.dev/457443158): Delete this type when
139/// `ConstantAggregation` is introduced and netstack's time series is changed to
140/// `TimeMatrix<Diff<u64>, ConstantAggregation>`.
141///
142/// Whichever happens first.
143pub enum GaugeForceSimple8bRle {}
144
145impl<T: Into<u64>> BufferStrategy<T, LastSample> for GaugeForceSimple8bRle {
146    type Buffer = Simple8bRle;
147}
148
149impl DataSemantic for GaugeForceSimple8bRle {
150    type Metadata = ();
151
152    fn display() -> impl Display {
153        "gauge"
154    }
155}
156
157// TODO(https://fxbug.dev/375255178): Spell "bit set" consistently. `BitSet` suggests `bit_set` and
158//                                    "bit set", but there are notably some instances of "bitset",
159//                                    which instead suggests `Bitset` and `bitset`.
160/// A set of Boolean values.
161///
162/// Bit sets are analogous to indicator lamps in a vehicle.
163#[derive(Debug)]
164pub enum BitSet {}
165
166impl<A, P> BufferStrategy<A, P> for BitSet
167where
168    Simple8bRle: RingBuffer<A>,
169    P: InterpolationKind,
170{
171    type Buffer = Simple8bRle;
172}
173
174impl DataSemantic for BitSet {
175    type Metadata = BitSetIndex;
176
177    fn display() -> impl Display {
178        "bitset"
179    }
180}
181
182/// A buffer of serialized data from a time series.
183#[derive(Clone, Debug)]
184struct SerializedTimeSeries {
185    interval: SamplingInterval,
186    data: Vec<u8>,
187}
188
189impl SerializedTimeSeries {
190    /// Gets the sampling interval for the aggregations in the buffer.
191    pub fn interval(&self) -> &SamplingInterval {
192        &self.interval
193    }
194
195    /// Gets the serialized data.
196    pub fn data(&self) -> &[u8] {
197        self.data.as_slice()
198    }
199}
200
201/// An unbuffered statistical time series specification.
202///
203/// This type samples and interpolates timed data and produces aggregations per its statistic and
204/// sampling interval. It is a specification insofar that it does **not** buffer the series of
205/// aggregations.
206#[derive(Clone, Debug)]
207struct TimeSeries<F>
208where
209    F: Statistic,
210{
211    interval: SamplingInterval,
212    statistic: F,
213}
214
215impl<F> TimeSeries<F>
216where
217    F: Statistic,
218{
219    pub fn new(interval: SamplingInterval) -> Self
220    where
221        F: Default,
222    {
223        TimeSeries { interval, statistic: F::default() }
224    }
225
226    pub const fn with_statistic(interval: SamplingInterval, statistic: F) -> Self {
227        TimeSeries { interval, statistic }
228    }
229
230    /// Folds interpolations for intervals intersected by the given [`Tick`] and gets the
231    /// aggregations.
232    ///
233    /// The returned iterator performs the computation and so it must be consumed to change the
234    /// state of the statistic.
235    ///
236    /// [`Tick`]: crate::experimental::clock::Tick
237    #[must_use]
238    fn interpolate_and_get_aggregations<'i, P>(
239        &'i mut self,
240        interpolation: &'i mut P,
241        tick: Tick,
242    ) -> impl 'i + Iterator<Item = Result<(NonZeroUsize, F::Aggregation), FoldError>>
243    where
244        P: Interpolation<F::Sample>,
245    {
246        self.interval.fold_and_get_expirations(tick, PhantomData::<F::Sample>).flat_map(
247            move |expiration| {
248                expiration
249                    .interpolate_and_get_aggregation(&mut self.statistic, interpolation)
250                    .transpose()
251            },
252        )
253    }
254
255    /// Folds the given sample and interpolations for intervals intersected by the given [`Tick`]
256    /// and gets the aggregations.
257    ///
258    /// The returned iterator performs the computation and so it must be consumed to change the
259    /// state of the statistic.
260    ///
261    /// [`Tick`]: crate::experimental::clock::Tick
262    #[must_use]
263    fn fold_and_get_aggregations<'i, P>(
264        &'i mut self,
265        interpolation: &'i mut P,
266        tick: Tick,
267        sample: F::Sample,
268    ) -> impl 'i + Iterator<Item = Result<(NonZeroUsize, F::Aggregation), FoldError>>
269    where
270        P: Interpolation<F::Sample>,
271    {
272        self.interval.fold_and_get_expirations(tick, sample).flat_map(move |expiration| {
273            expiration.fold_and_get_aggregation(&mut self.statistic, interpolation).transpose()
274        })
275    }
276
277    /// Gets the sampling interval of the series.
278    pub fn interval(&self) -> &SamplingInterval {
279        &self.interval
280    }
281}
282
283impl<F, R, A> TimeSeries<PostAggregation<F, R>>
284where
285    F: Default + Statistic,
286    R: Clone + Fn(F::Aggregation) -> A,
287    A: Clone,
288{
289    pub fn with_transform(interval: SamplingInterval, transform: R) -> Self {
290        TimeSeries { interval, statistic: PostAggregation::from_transform(transform) }
291    }
292}
293
294/// A buffered round-robin statistical time series.
295///
296/// This type composes a [`TimeSeries`] with a round-robin buffer of aggregations and interpolation
297/// state. Aggregations produced by the time series when sampling or interpolating are pushed into
298/// the buffer.
299#[derive(Derivative)]
300#[derivative(
301    Clone(bound = "F: Clone, F::Buffer: Clone, P::Output<F::Sample>: Clone,"),
302    Debug(bound = "F: Debug,
303                   F::Buffer: Debug,
304                   P::Output<F::Sample>: Debug,")
305)]
306struct BufferedTimeSeries<F, P>
307where
308    F: SerialStatistic<P>,
309    P: InterpolationKind,
310{
311    buffer: F::Buffer,
312    interpolation: P::Output<F::Sample>,
313    series: TimeSeries<F>,
314}
315
316impl<F, P> BufferedTimeSeries<F, P>
317where
318    F: SerialStatistic<P>,
319    P: InterpolationKind,
320{
321    pub fn new(interpolation: P::Output<F::Sample>, series: TimeSeries<F>) -> Self {
322        let buffer = F::buffer(&series.interval);
323        BufferedTimeSeries { buffer, interpolation, series }
324    }
325
326    /// Folds interpolations for intervals intersected by the given [`Tick`] and buffers the
327    /// aggregations.
328    ///
329    /// # Errors
330    ///
331    /// Returns an error if sampling fails.
332    ///
333    /// [`Tick`]: crate::experimental::clock::Tick
334    fn interpolate(&mut self, tick: Tick) -> Result<(), FoldError> {
335        for aggregation in
336            self.series.interpolate_and_get_aggregations(&mut self.interpolation, tick)
337        {
338            let (count, aggregation) = aggregation?;
339            if count.get() == 1 {
340                self.buffer.push(aggregation);
341            } else {
342                self.buffer.fill(aggregation, count);
343            }
344        }
345        Ok(())
346    }
347
348    /// Folds the given sample and interpolations for intervals intersected by the given [`Tick`]
349    /// and buffers the aggregations.
350    ///
351    /// # Errors
352    ///
353    /// Returns an error if sampling fails.
354    ///
355    /// [`Tick`]: crate::experimental::clock::Tick
356    fn fold(&mut self, tick: Tick, sample: F::Sample) -> Result<(), FoldError> {
357        for aggregation in
358            self.series.fold_and_get_aggregations(&mut self.interpolation, tick, sample)
359        {
360            let (count, aggregation) = aggregation?;
361            if count.get() == 1 {
362                self.buffer.push(aggregation);
363            } else {
364                self.buffer.fill(aggregation, count);
365            }
366        }
367        Ok(())
368    }
369
370    pub fn serialize_and_get_buffer(&self) -> io::Result<SerializedTimeSeries> {
371        let mut data = vec![];
372        self.buffer.serialize(&mut data)?;
373        Ok(SerializedTimeSeries { interval: *self.series.interval(), data })
374    }
375}
376
377/// A buffer of data from time matrix.
378#[derive(Clone, Debug, PartialEq)]
379pub struct SerializedBuffer {
380    pub data_semantic: String,
381    pub data: Vec<u8>,
382}
383
384impl SerializedBuffer {
385    /// Records the current state of this `TimeMatrix` into `node`.
386    pub fn write_to_inspect(self, node: &fuchsia_inspect::Node) {
387        let Self { data_semantic, data } = self;
388        node.record_string("type", data_semantic);
389        node.record_bytes("data", data);
390    }
391
392    /// Records an attempt at retrieving a serialized buffer to inspect.
393    pub fn write_to_inspect_or_error<E: Debug>(
394        result: Result<Self, E>,
395        node: &fuchsia_inspect::Node,
396    ) {
397        match result {
398            Ok(b) => b.write_to_inspect(node),
399            Err(e) => node.record_string("type", format!("error: {:?}", e)),
400        }
401    }
402}
403
404/// One or more statistical round-robin time series.
405///
406/// A time matrix is a round-robin multi-resolution time series that samples and interpolates timed
407/// data, computes statistical aggregations for elapsed [sampling intervals][`SamplingInterval`],
408/// and buffers those aggregations. The sample data, statistic, and interpolation of series in a
409/// time matrix must be the same, but the sampling intervals can and should differ.
410#[derive(Derivative)]
411#[derivative(
412    Clone(bound = "F: Clone, F::Buffer: Clone, P::Output<F::Sample>: Clone,"),
413    Debug(bound = "F: Debug,
414                   F::Buffer: Debug,
415                   P::Output<F::Sample>: Debug,")
416)]
417pub struct TimeMatrix<F, P>
418where
419    F: SerialStatistic<P>,
420    P: InterpolationKind,
421{
422    created: Timestamp,
423    last: ObservationTime,
424    buffers: Vec1<BufferedTimeSeries<F, P>>,
425}
426
427impl<F, P> TimeMatrix<F, P>
428where
429    F: SerialStatistic<P>,
430    P: InterpolationKind,
431{
432    fn from_series_with<Q>(series: impl Into<Vec1<TimeSeries<F>>>, mut interpolation: Q) -> Self
433    where
434        Q: FnMut() -> P::Output<F::Sample>,
435    {
436        let buffers =
437            series.into().map_into(|series| BufferedTimeSeries::new((interpolation)(), series));
438        TimeMatrix { created: Timestamp::now(), last: ObservationTime::default(), buffers }
439    }
440
441    /// Constructs a time matrix with the given sampling profile and interpolation.
442    ///
443    /// Statistics are default initialized.
444    pub fn new(profile: impl Into<SamplingProfile>, interpolation: P::Output<F::Sample>) -> Self
445    where
446        F: Default,
447    {
448        let sampling_intervals = profile.into().into_sampling_intervals();
449        TimeMatrix::from_series_with(sampling_intervals.map_into(TimeSeries::new), || {
450            interpolation.clone()
451        })
452    }
453
454    /// Constructs a time matrix with the given statistic.
455    pub fn with_statistic(
456        profile: impl Into<SamplingProfile>,
457        interpolation: P::Output<F::Sample>,
458        statistic: F,
459    ) -> Self {
460        let sampling_intervals = profile.into().into_sampling_intervals();
461        TimeMatrix::from_series_with(
462            sampling_intervals
463                .map_into(|window| TimeSeries::with_statistic(window, statistic.clone())),
464            || interpolation.clone(),
465        )
466    }
467
468    /// Folds the given sample and interpolations and gets the aggregation buffers.
469    ///
470    /// To fold a sample without serializing buffers, use [`Sampler::fold`].
471    ///
472    /// [`Sampler::fold`]: crate::experimental::series::Sampler::fold
473    pub fn fold_and_get_buffers(
474        &mut self,
475        sample: Timed<F::Sample>,
476    ) -> Result<SerializedBuffer, FoldError> {
477        self.fold(sample)?;
478        let series_buffers = self
479            .buffers
480            .try_map_ref(BufferedTimeSeries::serialize_and_get_buffer)
481            .map_err::<FoldError, _>(From::from)?;
482        self.serialize(series_buffers).map_err(From::from)
483    }
484
485    fn serialize(
486        &self,
487        series_buffers: Vec1<SerializedTimeSeries>,
488    ) -> io::Result<SerializedBuffer> {
489        use crate::experimental::clock::DurationExt;
490        use byteorder::{LittleEndian, WriteBytesExt};
491        use std::io::Write;
492
493        let created_timestamp = u32::try_from(self.created.quantize()).unwrap_or(u32::MAX);
494        let end_timestamp =
495            u32::try_from(self.last.last_update_timestamp.quantize()).unwrap_or(u32::MAX);
496
497        let mut buffer = vec![];
498        buffer.write_u8(1)?; // Version number.
499        buffer.write_u32::<LittleEndian>(created_timestamp)?; // Matrix creation time.
500        buffer.write_u32::<LittleEndian>(end_timestamp)?; // Last observed or interpolated sample
501        // time.
502        encoding::serialize_buffer_type_descriptors::<F, P>(&mut buffer)?; // Buffer descriptors.
503
504        for series in series_buffers {
505            const GRANULARITY_FIELD_LEN: usize = 2;
506            let len = u16::try_from(series.data.len() + GRANULARITY_FIELD_LEN).unwrap_or(u16::MAX);
507            let granularity =
508                u16::try_from(series.interval().duration().into_quanta()).unwrap_or(u16::MAX);
509
510            buffer.write_u16::<LittleEndian>(len)?;
511            buffer.write_u16::<LittleEndian>(granularity)?;
512            buffer.write_all(&series.data[..len as usize - GRANULARITY_FIELD_LEN])?;
513        }
514        Ok(SerializedBuffer {
515            data_semantic: format!("{}", <F as Statistic>::Semantic::display()),
516            data: buffer,
517        })
518    }
519}
520
521impl<F, R, P, A> TimeMatrix<PostAggregation<F, R>, P>
522where
523    PostAggregation<F, R>: SerialStatistic<P, Aggregation = A>,
524    F: Default + SerialStatistic<P>,
525    R: Clone + Fn(F::Aggregation) -> A,
526    P: InterpolationKind,
527    A: Clone,
528{
529    /// Constructs a time matrix with the default statistic and given transform for
530    /// post-aggregation.
531    pub fn with_transform(
532        profile: impl Into<SamplingProfile>,
533        interpolation: P::Output<<PostAggregation<F, R> as Statistic>::Sample>,
534        transform: R,
535    ) -> Self
536    where
537        R: Clone,
538    {
539        let sampling_intervals = profile.into().into_sampling_intervals();
540        TimeMatrix::from_series_with(
541            sampling_intervals
542                .map_into(|window| TimeSeries::with_transform(window, transform.clone())),
543            || interpolation.clone(),
544        )
545    }
546}
547
548impl<F, P> Default for TimeMatrix<F, P>
549where
550    F: Default + SerialStatistic<P>,
551    P: InterpolationKind,
552    P::Output<F::Sample>: Default,
553{
554    fn default() -> Self {
555        TimeMatrix::new(SamplingProfile::default(), P::Output::default())
556    }
557}
558
559impl<F, P> TimeMatrixFold<F::Sample> for TimeMatrix<F, P>
560where
561    F: SerialStatistic<P>,
562    P: InterpolationKind,
563{
564    fn fold(&mut self, sample: Timed<F::Sample>) -> Result<(), FoldError> {
565        let (timestamp, sample) = sample.into();
566        let tick = self.last.tick(timestamp, true)?;
567        Ok(for buffer in self.buffers.iter_mut() {
568            buffer.fold(tick, sample.clone())?;
569        })
570    }
571}
572
573impl<F, P> TimeMatrixTick for TimeMatrix<F, P>
574where
575    F: SerialStatistic<P>,
576    P: InterpolationKind,
577{
578    fn tick(&mut self, timestamp: Timestamp) -> Result<(), FoldError> {
579        let tick = self.last.tick(timestamp.into(), false)?;
580        Ok(for buffer in self.buffers.iter_mut() {
581            buffer.interpolate(tick)?;
582        })
583    }
584
585    fn tick_and_get_buffers(
586        &mut self,
587        timestamp: Timestamp,
588    ) -> Result<SerializedBuffer, FoldError> {
589        self.tick(timestamp)?;
590        let series_buffers = self
591            .buffers
592            .try_map_ref(BufferedTimeSeries::serialize_and_get_buffer)
593            .map_err::<FoldError, _>(From::from)?;
594        self.serialize(series_buffers).map_err(From::from)
595    }
596}
597
598#[cfg(test)]
599mod tests {
600    use fuchsia_async as fasync;
601
602    use crate::experimental::clock::{Timed, Timestamp};
603    use crate::experimental::series::interpolation::{ConstantSample, LastSample};
604    use crate::experimental::series::statistic::{
605        ArithmeticMean, LatchMax, Max, PostAggregation, Sum, Transform, Union,
606    };
607    use crate::experimental::series::{
608        SamplingProfile, TimeMatrix, TimeMatrixFold, TimeMatrixTick,
609    };
610
611    fn fold_and_interpolate_f32(matrix: &mut impl TimeMatrixFold<f32>) {
612        matrix.fold(Timed::now(0.0)).unwrap();
613        matrix.fold(Timed::now(1.0)).unwrap();
614        matrix.fold(Timed::now(2.0)).unwrap();
615        matrix.tick(Timestamp::now()).unwrap();
616    }
617
618    // TODO(https://fxbug.dev/356218503): Replace this with meaningful unit tests that assert the
619    //                                    outputs of a `TimeMatrix`.
620    // This "test" is considered successful as long as it builds.
621    #[test]
622    fn static_test_define_time_matrix() {
623        type Mean<T> = ArithmeticMean<T>;
624        type MeanTransform<T, F> = Transform<Mean<T>, F>;
625
626        let _exec = fasync::TestExecutor::new_with_fake_time();
627
628        // Arithmetic mean time matrices.
629        let _ = TimeMatrix::<Mean<f32>, ConstantSample>::default();
630        let _ = TimeMatrix::<Mean<f32>, LastSample>::new(
631            SamplingProfile::balanced(),
632            LastSample::or(0.0f32),
633        );
634        let _ = TimeMatrix::<_, ConstantSample>::with_statistic(
635            SamplingProfile::granular(),
636            ConstantSample::default(),
637            Mean::<f32>::default(),
638        );
639
640        // Discrete arithmetic mean time matrices.
641        let mut matrix = TimeMatrix::<MeanTransform<f32, i64>, LastSample>::with_transform(
642            SamplingProfile::highly_granular(),
643            LastSample::or(0.0f32),
644            |aggregation| aggregation.ceil() as i64,
645        );
646        fold_and_interpolate_f32(&mut matrix);
647        // This time matrix is constructed verbosely with no ad-hoc type definitions nor ergonomic
648        // constructors. This is as raw as it gets.
649        let mut matrix = TimeMatrix::<_, ConstantSample>::with_statistic(
650            SamplingProfile::default(),
651            ConstantSample::default(),
652            PostAggregation::<ArithmeticMean<f32>, _>::from_transform(|aggregation: f32| {
653                aggregation.ceil() as i64
654            }),
655        );
656        fold_and_interpolate_f32(&mut matrix);
657    }
658
659    // TODO(https://fxbug.dev/356218503): Replace this with meaningful unit tests that assert the
660    //                                    outputs of a `TimeMatrix`.
661    // This "test" is considered successful as long as it builds.
662    #[test]
663    fn static_test_supported_statistic_and_interpolation_combinations() {
664        let _exec = fasync::TestExecutor::new_with_fake_time();
665
666        let _ = TimeMatrix::<ArithmeticMean<f32>, ConstantSample>::default();
667        let _ = TimeMatrix::<ArithmeticMean<f32>, LastSample>::default();
668        let _ = TimeMatrix::<LatchMax<u64>, LastSample>::default();
669        let _ = TimeMatrix::<Max<u64>, ConstantSample>::default();
670        let _ = TimeMatrix::<Max<u64>, LastSample>::default();
671        let _ = TimeMatrix::<Sum<u64>, ConstantSample>::default();
672        let _ = TimeMatrix::<Sum<u64>, LastSample>::default();
673        let _ = TimeMatrix::<Union<u64>, ConstantSample>::default();
674        let _ = TimeMatrix::<Union<u64>, LastSample>::default();
675    }
676
677    #[test]
678    fn time_matrix_with_uncompressed_buffer() {
679        let exec = fasync::TestExecutor::new_with_fake_time();
680        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
681        let mut time_matrix = TimeMatrix::<ArithmeticMean<f32>, ConstantSample>::new(
682            SamplingProfile::highly_granular(),
683            ConstantSample::default(),
684        );
685        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
686        assert_eq!(
687            buffer.data,
688            vec![
689                1, // version number
690                3, 0, 0, 0, // created timestamp
691                3, 0, 0, 0, // last timestamp
692                0, 0, // type: uncompressed; subtype: f32
693                4, 0, // series 1: length in bytes
694                10, 0, // series 1 granularity: 10s
695                0, 0, // number of elements
696                4, 0, // series 2: length in bytes
697                60, 0, // series 2 granularity: 60s
698                0, 0, // number of elements
699            ]
700        );
701
702        time_matrix.fold(Timed::now(f32::from_bits(42u32))).unwrap();
703        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
704        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
705        assert_eq!(
706            buffer.data,
707            vec![
708                1, // version number
709                3, 0, 0, 0, // created timestamp
710                10, 0, 0, 0, // last timestamp
711                0, 0, // type: uncompressed; subtype: f32
712                8, 0, // series 1: length in bytes
713                10, 0, // series 1 granularity: 10s
714                1, 0, // number of elements
715                42, 0, 0, 0, // item 1
716                4, 0, // series 2: length in bytes
717                60, 0, // series 2 granularity: 60s
718                0, 0, // number of elements
719            ]
720        );
721
722        // Advance several time steps to test ring buffer's `fill`
723        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(50_000_000_000));
724        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
725        assert_eq!(
726            buffer.data,
727            vec![
728                1, // version number
729                3, 0, 0, 0, // created timestamp
730                50, 0, 0, 0, // last timestamp
731                0, 0, // type: uncompressed; subtype: f32
732                24, 0, // series 1: length in bytes
733                10, 0, // series 1 granularity: 10s
734                5, 0, // number of elements
735                42, 0, 0, 0, // item 1
736                0, 0, 0, 0, // item 2
737                0, 0, 0, 0, // item 3
738                0, 0, 0, 0, // item 4
739                0, 0, 0, 0, // item 5
740                4, 0, // series 2: length in bytes
741                60, 0, // series 2 granularity: 60s
742                0, 0, // number of elements
743            ]
744        );
745    }
746
747    #[test]
748    fn time_matrix_with_simple8b_rle_buffer() {
749        let exec = fasync::TestExecutor::new_with_fake_time();
750        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
751        let mut time_matrix = TimeMatrix::<Max<u64>, ConstantSample>::new(
752            SamplingProfile::highly_granular(),
753            ConstantSample::default(),
754        );
755        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
756        assert_eq!(
757            buffer.data,
758            vec![
759                1, // version number
760                3, 0, 0, 0, // created timestamp
761                3, 0, 0, 0, // last timestamp
762                1, 0, // type: simple8b RLE; subtype: unsigned
763                7, 0, // series 1: length in bytes
764                10, 0, // series 1 granularity: 10s
765                0, 0, // number of selector elements and value blocks
766                0, 0, // head selector index
767                0, // number of values in last block
768                7, 0, // series 2: length in bytes
769                60, 0, // series 2 granularity: 60s
770                0, 0, // number of selector elements and value blocks
771                0, 0, // head selector index
772                0, // number of values in last block
773            ]
774        );
775
776        time_matrix.fold(Timed::now(15)).unwrap();
777        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
778        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
779        assert_eq!(
780            buffer.data,
781            vec![
782                1, // version number
783                3, 0, 0, 0, // created timestamp
784                10, 0, 0, 0, // last timestamp
785                1, 0, // type: simple8b RLE; subtype: unsigned
786                16, 0, // series 1: length in bytes
787                10, 0, // series 1 granularity: 10s
788                1, 0, // number of selector elements and value blocks
789                0, 0,    // head selector index
790                1,    // number of values in last block
791                0x0f, // RLE selector
792                15, 0, 0, 0, 0, 0, 1, 0, // value 15 appears 1 time
793                7, 0, // series 2: length in bytes
794                60, 0, // series 2 granularity: 60s
795                0, 0, // number of selector elements and value blocks
796                0, 0, // head selector index
797                0, // number of values in last block
798            ]
799        );
800
801        // Advance several time steps to test ring buffer's `fill`
802        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(50_000_000_000));
803        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
804        assert_eq!(
805            buffer.data,
806            vec![
807                1, // version number
808                3, 0, 0, 0, // created timestamp
809                50, 0, 0, 0, // last timestamp
810                1, 0, // type: simple8b RLE; subtype: unsigned
811                16, 0, // series 1: length in bytes
812                10, 0, // series 1 granularity: 10s
813                1, 0, // number of selector elements and value blocks
814                0, 0,    // head selector index
815                5,    // number of values in last block
816                0x03, // 4-bit selector
817                0x0f, 0, 0, 0, 0, 0, 0, 0, // values 15, 0, 0, 0, 0
818                7, 0, // series 2: length in bytes
819                60, 0, // series 2 granularity: 60s
820                0, 0, // number of selector elements and value blocks
821                0, 0, // head selector index
822                0, // number of values in last block
823            ]
824        );
825    }
826
827    #[test]
828    fn time_matrix_with_zigzag_simple8b_rle_buffer() {
829        let exec = fasync::TestExecutor::new_with_fake_time();
830        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
831        let mut time_matrix = TimeMatrix::<Max<i64>, ConstantSample>::new(
832            SamplingProfile::highly_granular(),
833            ConstantSample::default(),
834        );
835        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
836        assert_eq!(
837            buffer.data,
838            vec![
839                1, // version number
840                3, 0, 0, 0, // created timestamp
841                3, 0, 0, 0, // last timestamp
842                1, 1, // type: simple8b RLE; subtype: signed (zigzag encoded)
843                7, 0, // series 1: length in bytes
844                10, 0, // series 1 granularity: 10s
845                0, 0, // number of selector elements and value blocks
846                0, 0, // head selector index
847                0, // number of values in last block
848                7, 0, // series 2: length in bytes
849                60, 0, // series 2 granularity: 60s
850                0, 0, // number of selector elements and value blocks
851                0, 0, // head selector index
852                0, // number of values in last block
853            ]
854        );
855
856        time_matrix.fold(Timed::now(-8)).unwrap();
857        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
858        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
859        assert_eq!(
860            buffer.data,
861            vec![
862                1, // version number
863                3, 0, 0, 0, // created timestamp
864                10, 0, 0, 0, // last timestamp
865                1, 1, // type: simple8b RLE; subtype: signed (zigzag encoded)
866                16, 0, // series 1: length in bytes
867                10, 0, // series 1 granularity: 10s
868                1, 0, // number of selector elements and value blocks
869                0, 0,    // head selector index
870                1,    // number of values in last block
871                0x0f, // RLE selector
872                15, 0, 0, 0, 0, 0, 1, 0, // value -8 (encoded as 15) appears 1 time
873                7, 0, // series 2: length in bytes
874                60, 0, // series 2 granularity: 60s
875                0, 0, // number of selector elements and value blocks
876                0, 0, // head selector index
877                0, // number of values in last block
878            ]
879        );
880
881        // Advance several time steps to test ring buffer's `fill`
882        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(50_000_000_000));
883        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
884        assert_eq!(
885            buffer.data,
886            vec![
887                1, // version number
888                3, 0, 0, 0, // created timestamp
889                50, 0, 0, 0, // last timestamp
890                1, 1, // type: simple8b RLE; subtype: signed (zigzag encoded)
891                16, 0, // series 1: length in bytes
892                10, 0, // series 1 granularity: 10s
893                1, 0, // number of selector elements and value blocks
894                0, 0,    // head selector index
895                5,    // number of values in last block
896                0x03, // 4-bit selector
897                0x0f, 0, 0, 0, 0, 0, 0, 0, // values -8 (encoded as 15), 0, 0, 0, 0
898                7, 0, // series 2: length in bytes
899                60, 0, // series 2 granularity: 60s
900                0, 0, // number of selector elements and value blocks
901                0, 0, // head selector index
902                0, // number of values in last block
903            ]
904        );
905    }
906
907    #[test]
908    fn time_matrix_with_delta_simple8b_rle_buffer() {
909        let exec = fasync::TestExecutor::new_with_fake_time();
910        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
911        let mut time_matrix = TimeMatrix::<LatchMax<u64>, LastSample>::new(
912            SamplingProfile::highly_granular(),
913            LastSample::or(0),
914        );
915        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
916        assert_eq!(
917            buffer.data,
918            vec![
919                1, // version number
920                3, 0, 0, 0, // created timestamp
921                3, 0, 0, 0, // last timestamp
922                2, 0, // type: delta simple8b RLE; subtype: unsigned
923                7, 0, // series 1: length in bytes
924                10, 0, // series 1 granularity: 10s
925                0, 0, // number of base value + selector elements or value blocks
926                0, 0, // head selector index
927                0, // number of values in last block
928                7, 0, // series 2: length in bytes
929                60, 0, // series 2 granularity: 60s
930                0, 0, // number of base value + selector elements or value blocks
931                0, 0, // head selector index
932                0, // number of values in last block
933            ]
934        );
935
936        time_matrix.fold(Timed::now(42)).unwrap();
937        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
938        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
939        assert_eq!(
940            buffer.data,
941            vec![
942                1, // version number
943                3, 0, 0, 0, // created timestamp
944                10, 0, 0, 0, // last timestamp
945                2, 0, // type: delta simple8b RLE; subtype: unsigned
946                15, 0, // series 1: length in bytes
947                10, 0, // series 1 granularity: 10s
948                1, 0, // number of base value + selector elements or value blocks
949                0, 0, // head selector index
950                0, // number of values in last block
951                42, 0, 0, 0, 0, 0, 0, 0, // base value
952                7, 0, // series 2: length in bytes
953                60, 0, // series 2 granularity: 60s
954                0, 0, // number of base value + selector elements or value blocks
955                0, 0, // head selector index
956                0, // number of values in last block
957            ]
958        );
959
960        time_matrix.fold(Timed::now(57)).unwrap();
961        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(20_000_000_000));
962        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
963        assert_eq!(
964            buffer.data,
965            vec![
966                1, // version number
967                3, 0, 0, 0, // created timestamp
968                20, 0, 0, 0, // last timestamp
969                2, 0, // type: delta simple8b RLE; subtype: unsigned
970                24, 0, // series 1: length in bytes
971                10, 0, // series 1 granularity: 10s
972                2, 0, // number of base value + selector elements or value blocks
973                0, 0, // head selector index
974                1, // number of values in last block
975                42, 0, 0, 0, 0, 0, 0, 0,    // base value
976                0x0f, // RLE selector
977                15, 0, 0, 0, 0, 0, 1, 0, // value 15 (delta) appears 1 time
978                7, 0, // series 2: length in bytes
979                60, 0, // series 2 granularity: 60s
980                0, 0, // number of base value + selector elements or value blocks
981                0, 0, // head selector index
982                0, // number of values in last block
983            ]
984        );
985
986        // Advance several time steps to test ring buffer's `fill`
987        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(50_000_000_000));
988        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
989        assert_eq!(
990            buffer.data,
991            vec![
992                1, // version number
993                3, 0, 0, 0, // created timestamp
994                50, 0, 0, 0, // last timestamp
995                2, 0, // type: delta simple8b RLE; subtype: unsigned
996                24, 0, // series 1: length in bytes
997                10, 0, // series 1 granularity: 10s
998                2, 0, // number of base value + selector elements or value blocks
999                0, 0, // head selector index
1000                4, // number of values in last block
1001                42, 0, 0, 0, 0, 0, 0, 0,    // base value
1002                0x03, // 4-bit selector
1003                0x0f, 0, 0, 0, 0, 0, 0, 0, // values 15, 0, 0, 0
1004                7, 0, // series 2: length in bytes
1005                60, 0, // series 2 granularity: 60s
1006                0, 0, // number of base value + selector elements or value blocks
1007                0, 0, // head selector index
1008                0, // number of values in last block
1009            ]
1010        );
1011    }
1012
1013    #[test]
1014    fn time_matrix_with_delta_zigzag_simple8b_rle_buffer_i64() {
1015        let exec = fasync::TestExecutor::new_with_fake_time();
1016        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
1017        let mut time_matrix = TimeMatrix::<Max<i64>, LastSample>::new(
1018            SamplingProfile::highly_granular(),
1019            LastSample::or(0),
1020        );
1021        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
1022        assert_eq!(
1023            buffer.data,
1024            vec![
1025                1, // version number
1026                3, 0, 0, 0, // created timestamp
1027                3, 0, 0, 0, // last timestamp
1028                2, 1, // type: delta simple8b RLE; subtype: signed
1029                7, 0, // series 1: length in bytes
1030                10, 0, // series 1 granularity: 10s
1031                0, 0, // number of base value + selector elements or value blocks
1032                0, 0, // head selector index
1033                0, // number of values in last block
1034                7, 0, // series 2: length in bytes
1035                60, 0, // series 2 granularity: 60s
1036                0, 0, // number of base value + selector elements or value blocks
1037                0, 0, // head selector index
1038                0, // number of values in last block
1039            ]
1040        );
1041
1042        time_matrix.fold(Timed::now(42)).unwrap();
1043        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
1044        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
1045        assert_eq!(
1046            buffer.data,
1047            vec![
1048                1, // version number
1049                3, 0, 0, 0, // created timestamp
1050                10, 0, 0, 0, // last timestamp
1051                2, 1, // type: delta simple8b RLE; subtype: signed
1052                15, 0, // series 1: length in bytes
1053                10, 0, // series 1 granularity: 10s
1054                1, 0, // number of base value + selector elements or value blocks
1055                0, 0, // head selector index
1056                0, // number of values in last block
1057                42, 0, 0, 0, 0, 0, 0, 0, // base value
1058                7, 0, // series 2: length in bytes
1059                60, 0, // series 2 granularity: 60s
1060                0, 0, // number of base value + selector elements or value blocks
1061                0, 0, // head selector index
1062                0, // number of values in last block
1063            ]
1064        );
1065
1066        time_matrix.fold(Timed::now(34)).unwrap();
1067        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(20_000_000_000));
1068        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
1069        assert_eq!(
1070            buffer.data,
1071            vec![
1072                1, // version number
1073                3, 0, 0, 0, // created timestamp
1074                20, 0, 0, 0, // last timestamp
1075                2, 1, // type: delta simple8b RLE; subtype: signed
1076                24, 0, // series 1: length in bytes
1077                10, 0, // series 1 granularity: 10s
1078                2, 0, // number of base value + selector elements or value blocks
1079                0, 0, // head selector index
1080                1, // number of values in last block
1081                42, 0, 0, 0, 0, 0, 0, 0,    // base value
1082                0x0f, // RLE selector
1083                15, 0, 0, 0, 0, 0, 1, 0, // value -8 (delta) encoded as 15, appearing 1 time
1084                7, 0, // series 2: length in bytes
1085                60, 0, // series 2 granularity: 60s
1086                0, 0, // number of base value + selector elements or value blocks
1087                0, 0, // head selector index
1088                0, // number of values in last block
1089            ]
1090        );
1091
1092        // Advance several time steps to test ring buffer's `fill`
1093        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(50_000_000_000));
1094        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
1095        assert_eq!(
1096            buffer.data,
1097            vec![
1098                1, // version number
1099                3, 0, 0, 0, // created timestamp
1100                50, 0, 0, 0, // last timestamp
1101                2, 1, // type: delta simple8b RLE; subtype: signed
1102                24, 0, // series 1: length in bytes
1103                10, 0, // series 1 granularity: 10s
1104                2, 0, // number of base value + selector elements or value blocks
1105                0, 0, // head selector index
1106                4, // number of values in last block
1107                42, 0, 0, 0, 0, 0, 0, 0,    // base value
1108                0x03, // 4-bit selector
1109                0x0f, 0, 0, 0, 0, 0, 0, 0, // diff values -8 (encoded as 15), 0, 0, 0
1110                7, 0, // series 2: length in bytes
1111                60, 0, // series 2 granularity: 60s
1112                0, 0, // number of base value + selector elements or value blocks
1113                0, 0, // head selector index
1114                0, // number of values in last block
1115            ]
1116        );
1117    }
1118
1119    #[test]
1120    fn time_matrix_with_delta_zigzag_simple8b_rle_buffer_u64() {
1121        let exec = fasync::TestExecutor::new_with_fake_time();
1122        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
1123        let mut time_matrix = TimeMatrix::<Max<u64>, LastSample>::new(
1124            SamplingProfile::highly_granular(),
1125            LastSample::or(0),
1126        );
1127        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
1128        assert_eq!(
1129            buffer.data,
1130            vec![
1131                1, // version number
1132                3, 0, 0, 0, // created timestamp
1133                3, 0, 0, 0, // last timestamp
1134                2, 2, // type: delta simple8b RLE; subtype: unsigned with signed diff
1135                7, 0, // series 1: length in bytes
1136                10, 0, // series 1 granularity: 10s
1137                0, 0, // number of base value + selector elements or value blocks
1138                0, 0, // head selector index
1139                0, // number of values in last block
1140                7, 0, // series 2: length in bytes
1141                60, 0, // series 2 granularity: 60s
1142                0, 0, // number of base value + selector elements or value blocks
1143                0, 0, // head selector index
1144                0, // number of values in last block
1145            ]
1146        );
1147
1148        time_matrix.fold(Timed::now(1)).unwrap();
1149        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
1150        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
1151        assert_eq!(
1152            buffer.data,
1153            vec![
1154                1, // version number
1155                3, 0, 0, 0, // created timestamp
1156                10, 0, 0, 0, // last timestamp
1157                2, 2, // type: delta simple8b RLE; subtype: unsigned with signed diff
1158                15, 0, // series 1: length in bytes
1159                10, 0, // series 1 granularity: 10s
1160                1, 0, // number of base value + selector elements or value blocks
1161                0, 0, // head selector index
1162                0, // number of values in last block
1163                1, 0, 0, 0, 0, 0, 0, 0, // base value
1164                7, 0, // series 2: length in bytes
1165                60, 0, // series 2 granularity: 60s
1166                0, 0, // number of base value + selector elements or value blocks
1167                0, 0, // head selector index
1168                0, // number of values in last block
1169            ]
1170        );
1171
1172        time_matrix.fold(Timed::now(u64::MAX)).unwrap();
1173        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(20_000_000_000));
1174        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
1175        assert_eq!(
1176            buffer.data,
1177            vec![
1178                1, // version number
1179                3, 0, 0, 0, // created timestamp
1180                20, 0, 0, 0, // last timestamp
1181                2, 2, // type: delta simple8b RLE; subtype: unsigned with signed diff
1182                24, 0, // series 1: length in bytes
1183                10, 0, // series 1 granularity: 10s
1184                2, 0, // number of base value + selector elements or value blocks
1185                0, 0, // head selector index
1186                1, // number of values in last block
1187                1, 0, 0, 0, 0, 0, 0, 0,    // base value
1188                0x0f, // RLE selector
1189                3, 0, 0, 0, 0, 0, 1, 0, // value -2 (delta) encoded as 3, appearing 1 time
1190                7, 0, // series 2: length in bytes
1191                60, 0, // series 2 granularity: 60s
1192                0, 0, // number of base value + selector elements or value blocks
1193                0, 0, // head selector index
1194                0, // number of values in last block
1195            ]
1196        );
1197
1198        // Advance several time steps to test ring buffer's `fill`
1199        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(50_000_000_000));
1200        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
1201        assert_eq!(
1202            buffer.data,
1203            vec![
1204                1, // version number
1205                3, 0, 0, 0, // created timestamp
1206                50, 0, 0, 0, // last timestamp
1207                2, 2, // type: delta simple8b RLE; subtype: unsigned with signed diff
1208                24, 0, // series 1: length in bytes
1209                10, 0, // series 1 granularity: 10s
1210                2, 0, // number of base value + selector elements or value blocks
1211                0, 0, // head selector index
1212                4, // number of values in last block
1213                1, 0, 0, 0, 0, 0, 0, 0,    // base value
1214                0x01, // 2-bit selector
1215                3, 0, 0, 0, 0, 0, 0, 0, // diff values -2 (encoded as 3), 0, 0, 0
1216                7, 0, // series 2: length in bytes
1217                60, 0, // series 2 granularity: 60s
1218                0, 0, // number of base value + selector elements or value blocks
1219                0, 0, // head selector index
1220                0, // number of values in last block
1221            ]
1222        );
1223    }
1224}