windowed_stats/experimental/series/
mod.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Round-robin multi-resolution time series.
6
7mod interval;
8
9pub(crate) mod buffer;
10
11pub mod interpolation;
12pub mod metadata;
13pub mod statistic;
14
15use derivative::Derivative;
16use std::fmt::{Debug, Display};
17use std::io;
18use std::marker::PhantomData;
19use std::num::NonZeroUsize;
20
21use crate::experimental::Vec1;
22use crate::experimental::clock::{ObservationTime, Tick, Timed, Timestamp, TimestampExt};
23use crate::experimental::series::buffer::{
24    BufferStrategy, DeltaSimple8bRle, DeltaZigzagSimple8bRle, RingBuffer, Simple8bRle,
25    Uncompressed, ZigzagSimple8bRle, encoding,
26};
27use crate::experimental::series::interpolation::{
28    ConstantSample, Interpolation, InterpolationKind, LastSample,
29};
30use crate::experimental::series::metadata::{BitSetIndex, Metadata};
31use crate::experimental::series::statistic::{
32    FoldError, PostAggregation, SerialStatistic, Statistic,
33};
34
35pub use crate::experimental::series::buffer::{Capacity, decode};
36pub use crate::experimental::series::interval::{SamplingInterval, SamplingProfile};
37
38/// A [`TimeMatrix`] type that can be advanced forward in time.
39///
40/// This trait provides tick operations for `[TimeMatrix`] types. Ticking a [`TimeMatrix`] causes
41/// sample interpolation within and aggregation propagation across [`SamplingInterval`]s.
42///
43/// Importantly, this trait is `dyn` compatible and type erased; it can be used to tick a
44/// [`TimeMatrix`] regardless of its input type parameters (sample type, interpolation type, etc.).
45///
46/// See also the [`TimeMatrixFold`] subtrait.
47pub trait TimeMatrixTick {
48    fn tick(&mut self, timestamp: Timestamp) -> Result<(), FoldError>;
49
50    fn tick_and_get_buffers(&mut self, timestamp: Timestamp)
51    -> Result<SerializedBuffer, FoldError>;
52}
53
54/// A [`TimeMatrix`] type that can sample data.
55///
56/// This trait provides fold operations for `TimeMatrix` types. Folding samples updates
57/// aggregations and advances a [`TimeMatrix`] forward in time.
58///
59/// See also the [`TimeMatrixTick`] supertrait. This trait supports both ticking and sampling, but
60/// is not completely type erased: the sample input type parameter `T` is needed.
61pub trait TimeMatrixFold<T>: TimeMatrixTick {
62    fn fold(&mut self, sample: Timed<T>) -> Result<(), FoldError>;
63}
64
65/// A type that describes the semantics of data folded by `Sampler`s.
66///
67/// Data semantics determine how statistics are interpreted and time series are aggregated and
68/// buffered.
69pub trait DataSemantic {
70    type Metadata: Metadata;
71
72    fn display() -> impl Display;
73}
74
75/// A continually increasing value.
76///
77/// Counters are analogous to an odometer in a vehicle.
78#[derive(Debug)]
79pub enum Counter {}
80
81impl BufferStrategy<u64, LastSample> for Counter {
82    type Buffer = DeltaSimple8bRle;
83}
84
85impl DataSemantic for Counter {
86    type Metadata = ();
87
88    fn display() -> impl Display {
89        "counter"
90    }
91}
92
93/// A fluctuating value.
94///
95/// Gauges are analogous to a speedometer in a vehicle.
96#[derive(Debug)]
97pub enum Gauge {}
98
99impl<P> BufferStrategy<f32, P> for Gauge
100where
101    P: InterpolationKind,
102{
103    type Buffer = Uncompressed<f32>;
104}
105
106impl BufferStrategy<i64, ConstantSample> for Gauge {
107    type Buffer = ZigzagSimple8bRle;
108}
109
110impl BufferStrategy<i64, LastSample> for Gauge {
111    type Buffer = DeltaZigzagSimple8bRle<i64>;
112}
113
114impl BufferStrategy<u64, ConstantSample> for Gauge {
115    type Buffer = Simple8bRle;
116}
117
118impl BufferStrategy<u64, LastSample> for Gauge {
119    type Buffer = DeltaZigzagSimple8bRle<u64>;
120}
121
122impl DataSemantic for Gauge {
123    type Metadata = ();
124
125    fn display() -> impl Display {
126        "gauge"
127    }
128}
129
130/// A semantic like `Gauge` that avoids [`DeltaZigzagSimple8bRle`] until we fix
131/// some other issues.
132///
133/// TODO(https://fxbug.dev/436253782): Delete this type when the viewer can
134/// decode `DeltaZigzagSimple8bRle`.
135///
136/// OR
137///
138/// TODO(https://fxbug.dev/457443158): Delete this type when
139/// `ConstantAggregation` is introduced and netstack's time series is changed to
140/// `TimeMatrix<Diff<u64>, ConstantAggregation>`.
141///
142/// Whichever happens first.
143pub enum GaugeForceSimple8bRle {}
144
145impl<T: Into<u64>> BufferStrategy<T, LastSample> for GaugeForceSimple8bRle {
146    type Buffer = Simple8bRle;
147}
148
149impl DataSemantic for GaugeForceSimple8bRle {
150    type Metadata = ();
151
152    fn display() -> impl Display {
153        "gauge"
154    }
155}
156
157// TODO(https://fxbug.dev/375255178): Spell "bit set" consistently. `BitSet` suggests `bit_set` and
158//                                    "bit set", but there are notably some instances of "bitset",
159//                                    which instead suggests `Bitset` and `bitset`.
160/// A set of Boolean values.
161///
162/// Bit sets are analogous to indicator lamps in a vehicle.
163#[derive(Debug)]
164pub enum BitSet {}
165
166impl<A, P> BufferStrategy<A, P> for BitSet
167where
168    Simple8bRle: RingBuffer<A>,
169    P: InterpolationKind,
170{
171    type Buffer = Simple8bRle;
172}
173
174impl DataSemantic for BitSet {
175    type Metadata = BitSetIndex;
176
177    fn display() -> impl Display {
178        "bitset"
179    }
180}
181
182/// A buffer of serialized data from a time series.
183#[derive(Clone, Debug)]
184struct SerializedTimeSeries {
185    interval: SamplingInterval,
186    data: Vec<u8>,
187}
188
189impl SerializedTimeSeries {
190    /// Gets the sampling interval for the aggregations in the buffer.
191    pub fn interval(&self) -> &SamplingInterval {
192        &self.interval
193    }
194
195    /// Gets the serialized data.
196    pub fn data(&self) -> &[u8] {
197        self.data.as_slice()
198    }
199}
200
201/// An unbuffered statistical time series specification.
202///
203/// This type samples and interpolates timed data and produces aggregations per its statistic and
204/// sampling interval. It is a specification insofar that it does **not** buffer the series of
205/// aggregations.
206#[derive(Clone, Debug)]
207struct TimeSeries<F>
208where
209    F: Statistic,
210{
211    interval: SamplingInterval,
212    statistic: F,
213}
214
215impl<F> TimeSeries<F>
216where
217    F: Statistic,
218{
219    pub fn new(interval: SamplingInterval) -> Self
220    where
221        F: Default,
222    {
223        TimeSeries { interval, statistic: F::default() }
224    }
225
226    pub const fn with_statistic(interval: SamplingInterval, statistic: F) -> Self {
227        TimeSeries { interval, statistic }
228    }
229
230    /// Folds interpolations for intervals intersected by the given [`Tick`] and gets the
231    /// aggregations.
232    ///
233    /// The returned iterator performs the computation and so it must be consumed to change the
234    /// state of the statistic.
235    ///
236    /// [`Tick`]: crate::experimental::clock::Tick
237    #[must_use]
238    fn interpolate_and_get_aggregations<'i, P>(
239        &'i mut self,
240        interpolation: &'i mut P,
241        tick: Tick,
242    ) -> impl 'i + Iterator<Item = Result<(NonZeroUsize, F::Aggregation), FoldError>>
243    where
244        P: Interpolation<F::Sample>,
245    {
246        self.interval.fold_and_get_expirations(tick, PhantomData::<F::Sample>).flat_map(
247            move |expiration| {
248                expiration
249                    .interpolate_and_get_aggregation(&mut self.statistic, interpolation)
250                    .transpose()
251            },
252        )
253    }
254
255    /// Folds the given sample and interpolations for intervals intersected by the given [`Tick`]
256    /// and gets the aggregations.
257    ///
258    /// The returned iterator performs the computation and so it must be consumed to change the
259    /// state of the statistic.
260    ///
261    /// [`Tick`]: crate::experimental::clock::Tick
262    #[must_use]
263    fn fold_and_get_aggregations<'i, P>(
264        &'i mut self,
265        interpolation: &'i mut P,
266        tick: Tick,
267        sample: F::Sample,
268    ) -> impl 'i + Iterator<Item = Result<(NonZeroUsize, F::Aggregation), FoldError>>
269    where
270        P: Interpolation<F::Sample>,
271    {
272        self.interval.fold_and_get_expirations(tick, sample).flat_map(move |expiration| {
273            expiration.fold_and_get_aggregation(&mut self.statistic, interpolation).transpose()
274        })
275    }
276
277    /// Gets the sampling interval of the series.
278    pub fn interval(&self) -> &SamplingInterval {
279        &self.interval
280    }
281}
282
283impl<F, R, A> TimeSeries<PostAggregation<F, R>>
284where
285    F: Default + Statistic,
286    R: Clone + Fn(F::Aggregation) -> A,
287    A: Clone,
288{
289    pub fn with_transform(interval: SamplingInterval, transform: R) -> Self {
290        TimeSeries { interval, statistic: PostAggregation::from_transform(transform) }
291    }
292}
293
294/// A buffered round-robin statistical time series.
295///
296/// This type composes a [`TimeSeries`] with a round-robin buffer of aggregations and interpolation
297/// state. Aggregations produced by the time series when sampling or interpolating are pushed into
298/// the buffer.
299#[derive(Derivative)]
300#[derivative(
301    Clone(bound = "F: Clone, F::Buffer: Clone, P::Output<F::Sample>: Clone,"),
302    Debug(bound = "F: Debug,
303                   F::Buffer: Debug,
304                   P::Output<F::Sample>: Debug,")
305)]
306struct BufferedTimeSeries<F, P>
307where
308    F: SerialStatistic<P>,
309    P: InterpolationKind,
310{
311    buffer: F::Buffer,
312    interpolation: P::Output<F::Sample>,
313    series: TimeSeries<F>,
314}
315
316impl<F, P> BufferedTimeSeries<F, P>
317where
318    F: SerialStatistic<P>,
319    P: InterpolationKind,
320{
321    pub fn new(interpolation: P::Output<F::Sample>, series: TimeSeries<F>) -> Self {
322        let buffer = F::buffer(&series.interval);
323        BufferedTimeSeries { buffer, interpolation, series }
324    }
325
326    /// Folds interpolations for intervals intersected by the given [`Tick`] and buffers the
327    /// aggregations.
328    ///
329    /// # Errors
330    ///
331    /// Returns an error if sampling fails.
332    ///
333    /// [`Tick`]: crate::experimental::clock::Tick
334    fn interpolate(&mut self, tick: Tick) -> Result<(), FoldError> {
335        for aggregation in
336            self.series.interpolate_and_get_aggregations(&mut self.interpolation, tick)
337        {
338            let (count, aggregation) = aggregation?;
339            if count.get() == 1 {
340                self.buffer.push(aggregation);
341            } else {
342                self.buffer.fill(aggregation, count);
343            }
344        }
345        Ok(())
346    }
347
348    /// Folds the given sample and interpolations for intervals intersected by the given [`Tick`]
349    /// and buffers the aggregations.
350    ///
351    /// # Errors
352    ///
353    /// Returns an error if sampling fails.
354    ///
355    /// [`Tick`]: crate::experimental::clock::Tick
356    fn fold(&mut self, tick: Tick, sample: F::Sample) -> Result<(), FoldError> {
357        for aggregation in
358            self.series.fold_and_get_aggregations(&mut self.interpolation, tick, sample)
359        {
360            let (count, aggregation) = aggregation?;
361            if count.get() == 1 {
362                self.buffer.push(aggregation);
363            } else {
364                self.buffer.fill(aggregation, count);
365            }
366        }
367        Ok(())
368    }
369
370    pub fn serialize_and_get_buffer(&self) -> io::Result<SerializedTimeSeries> {
371        let mut data = vec![];
372        self.buffer.serialize(&mut data)?;
373        Ok(SerializedTimeSeries { interval: *self.series.interval(), data })
374    }
375}
376
377/// A buffer of data from time matrix.
378#[derive(Clone, Debug, PartialEq)]
379pub struct SerializedBuffer {
380    pub data_semantic: String,
381    pub data: Vec<u8>,
382}
383
384impl SerializedBuffer {
385    /// Records the current state of this `TimeMatrix` into `node`.
386    pub fn write_to_inspect(self, node: &fuchsia_inspect::Node) {
387        let Self { data_semantic, data } = self;
388        node.record_string("type", data_semantic);
389        node.record_bytes("data", data);
390    }
391
392    /// Records an attempt at retrieving a serialized buffer to inspect.
393    pub fn write_to_inspect_or_error<E: Debug>(
394        result: Result<Self, E>,
395        node: &fuchsia_inspect::Node,
396    ) {
397        match result {
398            Ok(b) => b.write_to_inspect(node),
399            Err(e) => node.record_string("type", format!("error: {:?}", e)),
400        }
401    }
402}
403
404/// One or more statistical round-robin time series.
405///
406/// A time matrix is a round-robin multi-resolution time series that samples and interpolates timed
407/// data, computes statistical aggregations for elapsed [sampling intervals][`SamplingInterval`],
408/// and buffers those aggregations. The sample data, statistic, and interpolation of series in a
409/// time matrix must be the same, but the sampling intervals can and should differ.
410#[derive(Derivative)]
411#[derivative(
412    Clone(bound = "F: Clone, F::Buffer: Clone, P::Output<F::Sample>: Clone,"),
413    Debug(bound = "F: Debug,
414                   F::Buffer: Debug,
415                   P::Output<F::Sample>: Debug,")
416)]
417pub struct TimeMatrix<F, P>
418where
419    F: SerialStatistic<P>,
420    P: InterpolationKind,
421{
422    created: Timestamp,
423    last: ObservationTime,
424    buffers: Vec1<BufferedTimeSeries<F, P>>,
425}
426
427impl<F, P> TimeMatrix<F, P>
428where
429    F: SerialStatistic<P>,
430    P: InterpolationKind,
431{
432    fn from_series_with<Q>(
433        created: Timestamp,
434        series: impl Into<Vec1<TimeSeries<F>>>,
435        mut interpolation: Q,
436    ) -> Self
437    where
438        Q: FnMut() -> P::Output<F::Sample>,
439    {
440        let buffers =
441            series.into().map_into(|series| BufferedTimeSeries::new((interpolation)(), series));
442        TimeMatrix { created, last: ObservationTime::at(created), buffers }
443    }
444
445    /// Constructs a time matrix with the given sampling profile and interpolation.
446    ///
447    /// Statistics are default initialized.
448    pub fn new(profile: impl Into<SamplingProfile>, interpolation: P::Output<F::Sample>) -> Self
449    where
450        F: Default,
451    {
452        Self::new_at(Timestamp::now(), profile, interpolation)
453    }
454
455    pub(crate) fn new_at(
456        timestamp: Timestamp,
457        profile: impl Into<SamplingProfile>,
458        interpolation: P::Output<F::Sample>,
459    ) -> Self
460    where
461        F: Default,
462    {
463        let sampling_intervals = profile.into().into_sampling_intervals();
464        TimeMatrix::from_series_with(
465            timestamp,
466            sampling_intervals.map_into(TimeSeries::new),
467            || interpolation.clone(),
468        )
469    }
470
471    /// Constructs a time matrix with the given statistic.
472    pub fn with_statistic(
473        profile: impl Into<SamplingProfile>,
474        interpolation: P::Output<F::Sample>,
475        statistic: F,
476    ) -> Self {
477        let sampling_intervals = profile.into().into_sampling_intervals();
478        TimeMatrix::from_series_with(
479            Timestamp::now(),
480            sampling_intervals
481                .map_into(|window| TimeSeries::with_statistic(window, statistic.clone())),
482            || interpolation.clone(),
483        )
484    }
485
486    /// Folds the given sample and interpolations and gets the aggregation buffers.
487    ///
488    /// To fold a sample without serializing buffers, use [`Sampler::fold`].
489    ///
490    /// [`Sampler::fold`]: crate::experimental::series::Sampler::fold
491    pub fn fold_and_get_buffers(
492        &mut self,
493        sample: Timed<F::Sample>,
494    ) -> Result<SerializedBuffer, FoldError> {
495        self.fold(sample)?;
496        let series_buffers = self
497            .buffers
498            .try_map_ref(BufferedTimeSeries::serialize_and_get_buffer)
499            .map_err::<FoldError, _>(From::from)?;
500        self.serialize(series_buffers).map_err(From::from)
501    }
502
503    fn serialize(
504        &self,
505        series_buffers: Vec1<SerializedTimeSeries>,
506    ) -> io::Result<SerializedBuffer> {
507        use crate::experimental::clock::DurationExt;
508        use byteorder::{LittleEndian, WriteBytesExt};
509        use std::io::Write;
510
511        let created_timestamp = u32::try_from(self.created.quantize()).unwrap_or(u32::MAX);
512        let end_timestamp =
513            u32::try_from(self.last.last_update_timestamp.quantize()).unwrap_or(u32::MAX);
514
515        let mut buffer = vec![];
516        buffer.write_u8(1)?; // Version number.
517        buffer.write_u32::<LittleEndian>(created_timestamp)?; // Matrix creation time.
518        buffer.write_u32::<LittleEndian>(end_timestamp)?; // Last observed or interpolated sample
519        // time.
520        encoding::serialize_buffer_type_descriptors::<F, P>(&mut buffer)?; // Buffer descriptors.
521
522        for series in series_buffers {
523            const GRANULARITY_FIELD_LEN: usize = 2;
524            let len = u16::try_from(series.data.len() + GRANULARITY_FIELD_LEN).unwrap_or(u16::MAX);
525            let granularity =
526                u16::try_from(series.interval().duration().into_quanta()).unwrap_or(u16::MAX);
527
528            buffer.write_u16::<LittleEndian>(len)?;
529            buffer.write_u16::<LittleEndian>(granularity)?;
530            buffer.write_all(&series.data[..len as usize - GRANULARITY_FIELD_LEN])?;
531        }
532        Ok(SerializedBuffer {
533            data_semantic: format!("{}", <F as Statistic>::Semantic::display()),
534            data: buffer,
535        })
536    }
537}
538
539impl<F, R, P, A> TimeMatrix<PostAggregation<F, R>, P>
540where
541    PostAggregation<F, R>: SerialStatistic<P, Aggregation = A>,
542    F: Default + SerialStatistic<P>,
543    R: Clone + Fn(F::Aggregation) -> A,
544    P: InterpolationKind,
545    A: Clone,
546{
547    /// Constructs a time matrix with the default statistic and given transform for
548    /// post-aggregation.
549    pub fn with_transform(
550        profile: impl Into<SamplingProfile>,
551        interpolation: P::Output<<PostAggregation<F, R> as Statistic>::Sample>,
552        transform: R,
553    ) -> Self
554    where
555        R: Clone,
556    {
557        let sampling_intervals = profile.into().into_sampling_intervals();
558        TimeMatrix::from_series_with(
559            Timestamp::now(),
560            sampling_intervals
561                .map_into(|window| TimeSeries::with_transform(window, transform.clone())),
562            || interpolation.clone(),
563        )
564    }
565}
566
567impl<F, P> Default for TimeMatrix<F, P>
568where
569    F: Default + SerialStatistic<P>,
570    P: InterpolationKind,
571    P::Output<F::Sample>: Default,
572{
573    fn default() -> Self {
574        TimeMatrix::new(SamplingProfile::default(), P::Output::default())
575    }
576}
577
578impl<F, P> TimeMatrixFold<F::Sample> for TimeMatrix<F, P>
579where
580    F: SerialStatistic<P>,
581    P: InterpolationKind,
582{
583    fn fold(&mut self, sample: Timed<F::Sample>) -> Result<(), FoldError> {
584        let (timestamp, sample) = sample.into();
585        let tick = self.last.tick(timestamp, true)?;
586        Ok(for buffer in self.buffers.iter_mut() {
587            buffer.fold(tick, sample.clone())?;
588        })
589    }
590}
591
592impl<F, P> TimeMatrixTick for TimeMatrix<F, P>
593where
594    F: SerialStatistic<P>,
595    P: InterpolationKind,
596{
597    fn tick(&mut self, timestamp: Timestamp) -> Result<(), FoldError> {
598        let tick = self.last.tick(timestamp.into(), false)?;
599        Ok(for buffer in self.buffers.iter_mut() {
600            buffer.interpolate(tick)?;
601        })
602    }
603
604    fn tick_and_get_buffers(
605        &mut self,
606        timestamp: Timestamp,
607    ) -> Result<SerializedBuffer, FoldError> {
608        self.tick(timestamp)?;
609        let series_buffers = self
610            .buffers
611            .try_map_ref(BufferedTimeSeries::serialize_and_get_buffer)
612            .map_err::<FoldError, _>(From::from)?;
613        self.serialize(series_buffers).map_err(From::from)
614    }
615}
616
617#[cfg(test)]
618mod tests {
619    use fuchsia_async as fasync;
620
621    use crate::experimental::clock::{Timed, Timestamp};
622    use crate::experimental::series::interpolation::{ConstantSample, LastSample};
623    use crate::experimental::series::statistic::{
624        ArithmeticMean, LatchMax, Max, PostAggregation, Sum, Transform, Union,
625    };
626    use crate::experimental::series::{
627        SamplingProfile, TimeMatrix, TimeMatrixFold, TimeMatrixTick,
628    };
629
630    fn fold_and_interpolate_f32(matrix: &mut impl TimeMatrixFold<f32>) {
631        matrix.fold(Timed::now(0.0)).unwrap();
632        matrix.fold(Timed::now(1.0)).unwrap();
633        matrix.fold(Timed::now(2.0)).unwrap();
634        matrix.tick(Timestamp::now()).unwrap();
635    }
636
637    // TODO(https://fxbug.dev/356218503): Replace this with meaningful unit tests that assert the
638    //                                    outputs of a `TimeMatrix`.
639    // This "test" is considered successful as long as it builds.
640    #[test]
641    fn static_test_define_time_matrix() {
642        type Mean<T> = ArithmeticMean<T>;
643        type MeanTransform<T, F> = Transform<Mean<T>, F>;
644
645        let _exec = fasync::TestExecutor::new_with_fake_time();
646
647        // Arithmetic mean time matrices.
648        let _ = TimeMatrix::<Mean<f32>, ConstantSample>::default();
649        let _ = TimeMatrix::<Mean<f32>, LastSample>::new(
650            SamplingProfile::balanced(),
651            LastSample::or(0.0f32),
652        );
653        let _ = TimeMatrix::<_, ConstantSample>::with_statistic(
654            SamplingProfile::granular(),
655            ConstantSample::default(),
656            Mean::<f32>::default(),
657        );
658
659        // Discrete arithmetic mean time matrices.
660        let mut matrix = TimeMatrix::<MeanTransform<f32, i64>, LastSample>::with_transform(
661            SamplingProfile::highly_granular(),
662            LastSample::or(0.0f32),
663            |aggregation| aggregation.ceil() as i64,
664        );
665        fold_and_interpolate_f32(&mut matrix);
666        // This time matrix is constructed verbosely with no ad-hoc type definitions nor ergonomic
667        // constructors. This is as raw as it gets.
668        let mut matrix = TimeMatrix::<_, ConstantSample>::with_statistic(
669            SamplingProfile::default(),
670            ConstantSample::default(),
671            PostAggregation::<ArithmeticMean<f32>, _>::from_transform(|aggregation: f32| {
672                aggregation.ceil() as i64
673            }),
674        );
675        fold_and_interpolate_f32(&mut matrix);
676    }
677
678    // TODO(https://fxbug.dev/356218503): Replace this with meaningful unit tests that assert the
679    //                                    outputs of a `TimeMatrix`.
680    // This "test" is considered successful as long as it builds.
681    #[test]
682    fn static_test_supported_statistic_and_interpolation_combinations() {
683        let _exec = fasync::TestExecutor::new_with_fake_time();
684
685        let _ = TimeMatrix::<ArithmeticMean<f32>, ConstantSample>::default();
686        let _ = TimeMatrix::<ArithmeticMean<f32>, LastSample>::default();
687        let _ = TimeMatrix::<LatchMax<u64>, LastSample>::default();
688        let _ = TimeMatrix::<Max<u64>, ConstantSample>::default();
689        let _ = TimeMatrix::<Max<u64>, LastSample>::default();
690        let _ = TimeMatrix::<Sum<u64>, ConstantSample>::default();
691        let _ = TimeMatrix::<Sum<u64>, LastSample>::default();
692        let _ = TimeMatrix::<Union<u64>, ConstantSample>::default();
693        let _ = TimeMatrix::<Union<u64>, LastSample>::default();
694    }
695
696    #[test]
697    fn time_matrix_with_uncompressed_buffer() {
698        let exec = fasync::TestExecutor::new_with_fake_time();
699        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
700        let mut time_matrix = TimeMatrix::<ArithmeticMean<f32>, ConstantSample>::new(
701            SamplingProfile::highly_granular(),
702            ConstantSample::default(),
703        );
704        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
705        assert_eq!(
706            buffer.data,
707            vec![
708                1, // version number
709                3, 0, 0, 0, // created timestamp
710                3, 0, 0, 0, // last timestamp
711                0, 0, // type: uncompressed; subtype: f32
712                4, 0, // series 1: length in bytes
713                10, 0, // series 1 granularity: 10s
714                0, 0, // number of elements
715                4, 0, // series 2: length in bytes
716                60, 0, // series 2 granularity: 60s
717                0, 0, // number of elements
718            ]
719        );
720
721        time_matrix.fold(Timed::now(f32::from_bits(42u32))).unwrap();
722        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
723        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
724        assert_eq!(
725            buffer.data,
726            vec![
727                1, // version number
728                3, 0, 0, 0, // created timestamp
729                10, 0, 0, 0, // last timestamp
730                0, 0, // type: uncompressed; subtype: f32
731                8, 0, // series 1: length in bytes
732                10, 0, // series 1 granularity: 10s
733                1, 0, // number of elements
734                42, 0, 0, 0, // item 1
735                4, 0, // series 2: length in bytes
736                60, 0, // series 2 granularity: 60s
737                0, 0, // number of elements
738            ]
739        );
740
741        // Advance several time steps to test ring buffer's `fill`
742        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(50_000_000_000));
743        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
744        assert_eq!(
745            buffer.data,
746            vec![
747                1, // version number
748                3, 0, 0, 0, // created timestamp
749                50, 0, 0, 0, // last timestamp
750                0, 0, // type: uncompressed; subtype: f32
751                24, 0, // series 1: length in bytes
752                10, 0, // series 1 granularity: 10s
753                5, 0, // number of elements
754                42, 0, 0, 0, // item 1
755                0, 0, 0, 0, // item 2
756                0, 0, 0, 0, // item 3
757                0, 0, 0, 0, // item 4
758                0, 0, 0, 0, // item 5
759                4, 0, // series 2: length in bytes
760                60, 0, // series 2 granularity: 60s
761                0, 0, // number of elements
762            ]
763        );
764    }
765
766    #[test]
767    fn time_matrix_with_simple8b_rle_buffer() {
768        let exec = fasync::TestExecutor::new_with_fake_time();
769        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
770        let mut time_matrix = TimeMatrix::<Max<u64>, ConstantSample>::new(
771            SamplingProfile::highly_granular(),
772            ConstantSample::default(),
773        );
774        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
775        assert_eq!(
776            buffer.data,
777            vec![
778                1, // version number
779                3, 0, 0, 0, // created timestamp
780                3, 0, 0, 0, // last timestamp
781                1, 0, // type: simple8b RLE; subtype: unsigned
782                7, 0, // series 1: length in bytes
783                10, 0, // series 1 granularity: 10s
784                0, 0, // number of selector elements and value blocks
785                0, 0, // head selector index
786                0, // number of values in last block
787                7, 0, // series 2: length in bytes
788                60, 0, // series 2 granularity: 60s
789                0, 0, // number of selector elements and value blocks
790                0, 0, // head selector index
791                0, // number of values in last block
792            ]
793        );
794
795        time_matrix.fold(Timed::now(15)).unwrap();
796        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
797        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
798        assert_eq!(
799            buffer.data,
800            vec![
801                1, // version number
802                3, 0, 0, 0, // created timestamp
803                10, 0, 0, 0, // last timestamp
804                1, 0, // type: simple8b RLE; subtype: unsigned
805                16, 0, // series 1: length in bytes
806                10, 0, // series 1 granularity: 10s
807                1, 0, // number of selector elements and value blocks
808                0, 0,    // head selector index
809                1,    // number of values in last block
810                0x0f, // RLE selector
811                15, 0, 0, 0, 0, 0, 1, 0, // value 15 appears 1 time
812                7, 0, // series 2: length in bytes
813                60, 0, // series 2 granularity: 60s
814                0, 0, // number of selector elements and value blocks
815                0, 0, // head selector index
816                0, // number of values in last block
817            ]
818        );
819
820        // Advance several time steps to test ring buffer's `fill`
821        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(50_000_000_000));
822        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
823        assert_eq!(
824            buffer.data,
825            vec![
826                1, // version number
827                3, 0, 0, 0, // created timestamp
828                50, 0, 0, 0, // last timestamp
829                1, 0, // type: simple8b RLE; subtype: unsigned
830                16, 0, // series 1: length in bytes
831                10, 0, // series 1 granularity: 10s
832                1, 0, // number of selector elements and value blocks
833                0, 0,    // head selector index
834                5,    // number of values in last block
835                0x03, // 4-bit selector
836                0x0f, 0, 0, 0, 0, 0, 0, 0, // values 15, 0, 0, 0, 0
837                7, 0, // series 2: length in bytes
838                60, 0, // series 2 granularity: 60s
839                0, 0, // number of selector elements and value blocks
840                0, 0, // head selector index
841                0, // number of values in last block
842            ]
843        );
844    }
845
846    #[test]
847    fn time_matrix_with_zigzag_simple8b_rle_buffer() {
848        let exec = fasync::TestExecutor::new_with_fake_time();
849        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
850        let mut time_matrix = TimeMatrix::<Max<i64>, ConstantSample>::new(
851            SamplingProfile::highly_granular(),
852            ConstantSample::default(),
853        );
854        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
855        assert_eq!(
856            buffer.data,
857            vec![
858                1, // version number
859                3, 0, 0, 0, // created timestamp
860                3, 0, 0, 0, // last timestamp
861                1, 1, // type: simple8b RLE; subtype: signed (zigzag encoded)
862                7, 0, // series 1: length in bytes
863                10, 0, // series 1 granularity: 10s
864                0, 0, // number of selector elements and value blocks
865                0, 0, // head selector index
866                0, // number of values in last block
867                7, 0, // series 2: length in bytes
868                60, 0, // series 2 granularity: 60s
869                0, 0, // number of selector elements and value blocks
870                0, 0, // head selector index
871                0, // number of values in last block
872            ]
873        );
874
875        time_matrix.fold(Timed::now(-8)).unwrap();
876        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
877        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
878        assert_eq!(
879            buffer.data,
880            vec![
881                1, // version number
882                3, 0, 0, 0, // created timestamp
883                10, 0, 0, 0, // last timestamp
884                1, 1, // type: simple8b RLE; subtype: signed (zigzag encoded)
885                16, 0, // series 1: length in bytes
886                10, 0, // series 1 granularity: 10s
887                1, 0, // number of selector elements and value blocks
888                0, 0,    // head selector index
889                1,    // number of values in last block
890                0x0f, // RLE selector
891                15, 0, 0, 0, 0, 0, 1, 0, // value -8 (encoded as 15) appears 1 time
892                7, 0, // series 2: length in bytes
893                60, 0, // series 2 granularity: 60s
894                0, 0, // number of selector elements and value blocks
895                0, 0, // head selector index
896                0, // number of values in last block
897            ]
898        );
899
900        // Advance several time steps to test ring buffer's `fill`
901        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(50_000_000_000));
902        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
903        assert_eq!(
904            buffer.data,
905            vec![
906                1, // version number
907                3, 0, 0, 0, // created timestamp
908                50, 0, 0, 0, // last timestamp
909                1, 1, // type: simple8b RLE; subtype: signed (zigzag encoded)
910                16, 0, // series 1: length in bytes
911                10, 0, // series 1 granularity: 10s
912                1, 0, // number of selector elements and value blocks
913                0, 0,    // head selector index
914                5,    // number of values in last block
915                0x03, // 4-bit selector
916                0x0f, 0, 0, 0, 0, 0, 0, 0, // values -8 (encoded as 15), 0, 0, 0, 0
917                7, 0, // series 2: length in bytes
918                60, 0, // series 2 granularity: 60s
919                0, 0, // number of selector elements and value blocks
920                0, 0, // head selector index
921                0, // number of values in last block
922            ]
923        );
924    }
925
926    #[test]
927    fn time_matrix_with_delta_simple8b_rle_buffer() {
928        let exec = fasync::TestExecutor::new_with_fake_time();
929        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
930        let mut time_matrix = TimeMatrix::<LatchMax<u64>, LastSample>::new(
931            SamplingProfile::highly_granular(),
932            LastSample::or(0),
933        );
934        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
935        assert_eq!(
936            buffer.data,
937            vec![
938                1, // version number
939                3, 0, 0, 0, // created timestamp
940                3, 0, 0, 0, // last timestamp
941                2, 0, // type: delta simple8b RLE; subtype: unsigned
942                7, 0, // series 1: length in bytes
943                10, 0, // series 1 granularity: 10s
944                0, 0, // number of base value + selector elements or value blocks
945                0, 0, // head selector index
946                0, // number of values in last block
947                7, 0, // series 2: length in bytes
948                60, 0, // series 2 granularity: 60s
949                0, 0, // number of base value + selector elements or value blocks
950                0, 0, // head selector index
951                0, // number of values in last block
952            ]
953        );
954
955        time_matrix.fold(Timed::now(42)).unwrap();
956        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
957        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
958        assert_eq!(
959            buffer.data,
960            vec![
961                1, // version number
962                3, 0, 0, 0, // created timestamp
963                10, 0, 0, 0, // last timestamp
964                2, 0, // type: delta simple8b RLE; subtype: unsigned
965                15, 0, // series 1: length in bytes
966                10, 0, // series 1 granularity: 10s
967                1, 0, // number of base value + selector elements or value blocks
968                0, 0, // head selector index
969                0, // number of values in last block
970                42, 0, 0, 0, 0, 0, 0, 0, // base value
971                7, 0, // series 2: length in bytes
972                60, 0, // series 2 granularity: 60s
973                0, 0, // number of base value + selector elements or value blocks
974                0, 0, // head selector index
975                0, // number of values in last block
976            ]
977        );
978
979        time_matrix.fold(Timed::now(57)).unwrap();
980        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(20_000_000_000));
981        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
982        assert_eq!(
983            buffer.data,
984            vec![
985                1, // version number
986                3, 0, 0, 0, // created timestamp
987                20, 0, 0, 0, // last timestamp
988                2, 0, // type: delta simple8b RLE; subtype: unsigned
989                24, 0, // series 1: length in bytes
990                10, 0, // series 1 granularity: 10s
991                2, 0, // number of base value + selector elements or value blocks
992                0, 0, // head selector index
993                1, // number of values in last block
994                42, 0, 0, 0, 0, 0, 0, 0,    // base value
995                0x0f, // RLE selector
996                15, 0, 0, 0, 0, 0, 1, 0, // value 15 (delta) appears 1 time
997                7, 0, // series 2: length in bytes
998                60, 0, // series 2 granularity: 60s
999                0, 0, // number of base value + selector elements or value blocks
1000                0, 0, // head selector index
1001                0, // number of values in last block
1002            ]
1003        );
1004
1005        // Advance several time steps to test ring buffer's `fill`
1006        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(50_000_000_000));
1007        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
1008        assert_eq!(
1009            buffer.data,
1010            vec![
1011                1, // version number
1012                3, 0, 0, 0, // created timestamp
1013                50, 0, 0, 0, // last timestamp
1014                2, 0, // type: delta simple8b RLE; subtype: unsigned
1015                24, 0, // series 1: length in bytes
1016                10, 0, // series 1 granularity: 10s
1017                2, 0, // number of base value + selector elements or value blocks
1018                0, 0, // head selector index
1019                4, // number of values in last block
1020                42, 0, 0, 0, 0, 0, 0, 0,    // base value
1021                0x03, // 4-bit selector
1022                0x0f, 0, 0, 0, 0, 0, 0, 0, // values 15, 0, 0, 0
1023                7, 0, // series 2: length in bytes
1024                60, 0, // series 2 granularity: 60s
1025                0, 0, // number of base value + selector elements or value blocks
1026                0, 0, // head selector index
1027                0, // number of values in last block
1028            ]
1029        );
1030    }
1031
1032    #[test]
1033    fn time_matrix_with_delta_zigzag_simple8b_rle_buffer_i64() {
1034        let exec = fasync::TestExecutor::new_with_fake_time();
1035        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
1036        let mut time_matrix = TimeMatrix::<Max<i64>, LastSample>::new(
1037            SamplingProfile::highly_granular(),
1038            LastSample::or(0),
1039        );
1040        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
1041        assert_eq!(
1042            buffer.data,
1043            vec![
1044                1, // version number
1045                3, 0, 0, 0, // created timestamp
1046                3, 0, 0, 0, // last timestamp
1047                2, 1, // type: delta simple8b RLE; subtype: signed
1048                7, 0, // series 1: length in bytes
1049                10, 0, // series 1 granularity: 10s
1050                0, 0, // number of base value + selector elements or value blocks
1051                0, 0, // head selector index
1052                0, // number of values in last block
1053                7, 0, // series 2: length in bytes
1054                60, 0, // series 2 granularity: 60s
1055                0, 0, // number of base value + selector elements or value blocks
1056                0, 0, // head selector index
1057                0, // number of values in last block
1058            ]
1059        );
1060
1061        time_matrix.fold(Timed::now(42)).unwrap();
1062        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
1063        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
1064        assert_eq!(
1065            buffer.data,
1066            vec![
1067                1, // version number
1068                3, 0, 0, 0, // created timestamp
1069                10, 0, 0, 0, // last timestamp
1070                2, 1, // type: delta simple8b RLE; subtype: signed
1071                15, 0, // series 1: length in bytes
1072                10, 0, // series 1 granularity: 10s
1073                1, 0, // number of base value + selector elements or value blocks
1074                0, 0, // head selector index
1075                0, // number of values in last block
1076                42, 0, 0, 0, 0, 0, 0, 0, // base value
1077                7, 0, // series 2: length in bytes
1078                60, 0, // series 2 granularity: 60s
1079                0, 0, // number of base value + selector elements or value blocks
1080                0, 0, // head selector index
1081                0, // number of values in last block
1082            ]
1083        );
1084
1085        time_matrix.fold(Timed::now(34)).unwrap();
1086        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(20_000_000_000));
1087        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
1088        assert_eq!(
1089            buffer.data,
1090            vec![
1091                1, // version number
1092                3, 0, 0, 0, // created timestamp
1093                20, 0, 0, 0, // last timestamp
1094                2, 1, // type: delta simple8b RLE; subtype: signed
1095                24, 0, // series 1: length in bytes
1096                10, 0, // series 1 granularity: 10s
1097                2, 0, // number of base value + selector elements or value blocks
1098                0, 0, // head selector index
1099                1, // number of values in last block
1100                42, 0, 0, 0, 0, 0, 0, 0,    // base value
1101                0x0f, // RLE selector
1102                15, 0, 0, 0, 0, 0, 1, 0, // value -8 (delta) encoded as 15, appearing 1 time
1103                7, 0, // series 2: length in bytes
1104                60, 0, // series 2 granularity: 60s
1105                0, 0, // number of base value + selector elements or value blocks
1106                0, 0, // head selector index
1107                0, // number of values in last block
1108            ]
1109        );
1110
1111        // Advance several time steps to test ring buffer's `fill`
1112        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(50_000_000_000));
1113        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
1114        assert_eq!(
1115            buffer.data,
1116            vec![
1117                1, // version number
1118                3, 0, 0, 0, // created timestamp
1119                50, 0, 0, 0, // last timestamp
1120                2, 1, // type: delta simple8b RLE; subtype: signed
1121                24, 0, // series 1: length in bytes
1122                10, 0, // series 1 granularity: 10s
1123                2, 0, // number of base value + selector elements or value blocks
1124                0, 0, // head selector index
1125                4, // number of values in last block
1126                42, 0, 0, 0, 0, 0, 0, 0,    // base value
1127                0x03, // 4-bit selector
1128                0x0f, 0, 0, 0, 0, 0, 0, 0, // diff values -8 (encoded as 15), 0, 0, 0
1129                7, 0, // series 2: length in bytes
1130                60, 0, // series 2 granularity: 60s
1131                0, 0, // number of base value + selector elements or value blocks
1132                0, 0, // head selector index
1133                0, // number of values in last block
1134            ]
1135        );
1136    }
1137
1138    #[test]
1139    fn time_matrix_with_delta_zigzag_simple8b_rle_buffer_u64() {
1140        let exec = fasync::TestExecutor::new_with_fake_time();
1141        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
1142        let mut time_matrix = TimeMatrix::<Max<u64>, LastSample>::new(
1143            SamplingProfile::highly_granular(),
1144            LastSample::or(0),
1145        );
1146        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
1147        assert_eq!(
1148            buffer.data,
1149            vec![
1150                1, // version number
1151                3, 0, 0, 0, // created timestamp
1152                3, 0, 0, 0, // last timestamp
1153                2, 2, // type: delta simple8b RLE; subtype: unsigned with signed diff
1154                7, 0, // series 1: length in bytes
1155                10, 0, // series 1 granularity: 10s
1156                0, 0, // number of base value + selector elements or value blocks
1157                0, 0, // head selector index
1158                0, // number of values in last block
1159                7, 0, // series 2: length in bytes
1160                60, 0, // series 2 granularity: 60s
1161                0, 0, // number of base value + selector elements or value blocks
1162                0, 0, // head selector index
1163                0, // number of values in last block
1164            ]
1165        );
1166
1167        time_matrix.fold(Timed::now(1)).unwrap();
1168        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
1169        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
1170        assert_eq!(
1171            buffer.data,
1172            vec![
1173                1, // version number
1174                3, 0, 0, 0, // created timestamp
1175                10, 0, 0, 0, // last timestamp
1176                2, 2, // type: delta simple8b RLE; subtype: unsigned with signed diff
1177                15, 0, // series 1: length in bytes
1178                10, 0, // series 1 granularity: 10s
1179                1, 0, // number of base value + selector elements or value blocks
1180                0, 0, // head selector index
1181                0, // number of values in last block
1182                1, 0, 0, 0, 0, 0, 0, 0, // base value
1183                7, 0, // series 2: length in bytes
1184                60, 0, // series 2 granularity: 60s
1185                0, 0, // number of base value + selector elements or value blocks
1186                0, 0, // head selector index
1187                0, // number of values in last block
1188            ]
1189        );
1190
1191        time_matrix.fold(Timed::now(u64::MAX)).unwrap();
1192        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(20_000_000_000));
1193        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
1194        assert_eq!(
1195            buffer.data,
1196            vec![
1197                1, // version number
1198                3, 0, 0, 0, // created timestamp
1199                20, 0, 0, 0, // last timestamp
1200                2, 2, // type: delta simple8b RLE; subtype: unsigned with signed diff
1201                24, 0, // series 1: length in bytes
1202                10, 0, // series 1 granularity: 10s
1203                2, 0, // number of base value + selector elements or value blocks
1204                0, 0, // head selector index
1205                1, // number of values in last block
1206                1, 0, 0, 0, 0, 0, 0, 0,    // base value
1207                0x0f, // RLE selector
1208                3, 0, 0, 0, 0, 0, 1, 0, // value -2 (delta) encoded as 3, appearing 1 time
1209                7, 0, // series 2: length in bytes
1210                60, 0, // series 2 granularity: 60s
1211                0, 0, // number of base value + selector elements or value blocks
1212                0, 0, // head selector index
1213                0, // number of values in last block
1214            ]
1215        );
1216
1217        // Advance several time steps to test ring buffer's `fill`
1218        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(50_000_000_000));
1219        let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
1220        assert_eq!(
1221            buffer.data,
1222            vec![
1223                1, // version number
1224                3, 0, 0, 0, // created timestamp
1225                50, 0, 0, 0, // last timestamp
1226                2, 2, // type: delta simple8b RLE; subtype: unsigned with signed diff
1227                24, 0, // series 1: length in bytes
1228                10, 0, // series 1 granularity: 10s
1229                2, 0, // number of base value + selector elements or value blocks
1230                0, 0, // head selector index
1231                4, // number of values in last block
1232                1, 0, 0, 0, 0, 0, 0, 0,    // base value
1233                0x01, // 2-bit selector
1234                3, 0, 0, 0, 0, 0, 0, 0, // diff values -2 (encoded as 3), 0, 0, 0
1235                7, 0, // series 2: length in bytes
1236                60, 0, // series 2 granularity: 60s
1237                0, 0, // number of base value + selector elements or value blocks
1238                0, 0, // head selector index
1239                0, // number of values in last block
1240            ]
1241        );
1242    }
1243}