windowed_stats/experimental/series/
mod.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Round-robin multi-resolution time series.
6
7mod interval;
8
9pub(crate) mod buffer;
10
11pub mod interpolation;
12pub mod metadata;
13pub mod statistic;
14
15use derivative::Derivative;
16use std::fmt::{Debug, Display};
17use std::io;
18use std::marker::PhantomData;
19use std::num::NonZeroUsize;
20
21use crate::experimental::Vec1;
22use crate::experimental::clock::{ObservationTime, Tick, Timed, Timestamp, TimestampExt};
23use crate::experimental::series::buffer::{
24    BufferStrategy, DeltaSimple8bRle, DeltaZigzagSimple8bRle, RingBuffer, Simple8bRle,
25    Uncompressed, ZigzagSimple8bRle, encoding,
26};
27use crate::experimental::series::interpolation::{
28    ConstantSample, Interpolation, InterpolationKind, LastSample,
29};
30use crate::experimental::series::metadata::{BitSetIndex, Metadata};
31use crate::experimental::series::statistic::{
32    FoldError, PostAggregation, SerialStatistic, Statistic,
33};
34
35pub use crate::experimental::series::buffer::Capacity;
36pub use crate::experimental::series::interval::{SamplingInterval, SamplingProfile};
37
38pub trait Interpolator {
39    /// Interpolates samples to the given timestamp.
40    ///
41    /// This function queries the aggregations of the series. Typically, the timestamp is the
42    /// current time.
43    fn interpolate(&mut self, timestamp: Timestamp) -> Result<(), FoldError>;
44
45    /// Interpolates samples to the given timestamp and gets the serialized aggregation buffers.
46    ///
47    /// This function queries the aggregations of the series. Typically, the timestamp is the
48    /// current time.
49    fn interpolate_and_get_buffers(
50        &mut self,
51        timestamp: Timestamp,
52    ) -> Result<SerializedBuffer, FoldError>;
53}
54
55/// A buffered round-robin sampler over [timed samples][`Timed`] (e.g., a [`TimeMatrix`]).
56///
57/// Round-robin samplers aggregate samples into buffered time series and produce a serialized
58/// buffer of aggregations per series.
59///
60/// [`Timed`]: crate::experimental::clock::Timed
61/// [`TimeMatrix`]: crate::experimental::series::TimeMatrix
62pub trait MatrixSampler<T>: Interpolator {
63    fn fold(&mut self, sample: Timed<T>) -> Result<(), FoldError>;
64}
65
66/// A type that describes the semantics of data folded by `Sampler`s.
67///
68/// Data semantics determine how statistics are interpreted and time series are aggregated and
69/// buffered.
70pub trait DataSemantic {
71    type Metadata: Metadata;
72
73    fn display() -> impl Display;
74}
75
76/// A continually increasing value.
77///
78/// Counters are analogous to an odometer in a vehicle.
79#[derive(Debug)]
80pub enum Counter {}
81
82impl BufferStrategy<u64, LastSample> for Counter {
83    type Buffer = DeltaSimple8bRle;
84}
85
86impl DataSemantic for Counter {
87    type Metadata = ();
88
89    fn display() -> impl Display {
90        "counter"
91    }
92}
93
94/// A fluctuating value.
95///
96/// Gauges are analogous to a speedometer in a vehicle.
97#[derive(Debug)]
98pub enum Gauge {}
99
100impl<P> BufferStrategy<f32, P> for Gauge
101where
102    P: InterpolationKind,
103{
104    type Buffer = Uncompressed<f32>;
105}
106
107impl BufferStrategy<i64, ConstantSample> for Gauge {
108    type Buffer = ZigzagSimple8bRle;
109}
110
111impl BufferStrategy<i64, LastSample> for Gauge {
112    type Buffer = DeltaZigzagSimple8bRle<i64>;
113}
114
115impl BufferStrategy<u64, ConstantSample> for Gauge {
116    type Buffer = Simple8bRle;
117}
118
119impl BufferStrategy<u64, LastSample> for Gauge {
120    type Buffer = DeltaZigzagSimple8bRle<u64>;
121}
122
123impl DataSemantic for Gauge {
124    type Metadata = ();
125
126    fn display() -> impl Display {
127        "gauge"
128    }
129}
130
131// TODO(https://fxbug.dev/375255178): Spell "bit set" consistently. `BitSet` suggests `bit_set` and
132//                                    "bit set", but there are notably some instances of "bitset",
133//                                    which instead suggests `Bitset` and `bitset`.
134/// A set of Boolean values.
135///
136/// Bit sets are analogous to indicator lamps in a vehicle.
137#[derive(Debug)]
138pub enum BitSet {}
139
140impl<A, P> BufferStrategy<A, P> for BitSet
141where
142    Simple8bRle: RingBuffer<A>,
143    P: InterpolationKind,
144{
145    type Buffer = Simple8bRle;
146}
147
148impl DataSemantic for BitSet {
149    type Metadata = BitSetIndex;
150
151    fn display() -> impl Display {
152        "bitset"
153    }
154}
155
156/// A buffer of serialized data from a time series.
157#[derive(Clone, Debug)]
158struct SerializedTimeSeries {
159    interval: SamplingInterval,
160    data: Vec<u8>,
161}
162
163impl SerializedTimeSeries {
164    /// Gets the sampling interval for the aggregations in the buffer.
165    pub fn interval(&self) -> &SamplingInterval {
166        &self.interval
167    }
168
169    /// Gets the serialized data.
170    pub fn data(&self) -> &[u8] {
171        self.data.as_slice()
172    }
173}
174
175/// An unbuffered statistical time series specification.
176///
177/// This type samples and interpolates timed data and produces aggregations per its statistic and
178/// sampling interval. It is a specification insofar that it does **not** buffer the series of
179/// aggregations.
180#[derive(Clone, Debug)]
181struct TimeSeries<F>
182where
183    F: Statistic,
184{
185    interval: SamplingInterval,
186    statistic: F,
187}
188
189impl<F> TimeSeries<F>
190where
191    F: Statistic,
192{
193    pub fn new(interval: SamplingInterval) -> Self
194    where
195        F: Default,
196    {
197        TimeSeries { interval, statistic: F::default() }
198    }
199
200    pub const fn with_statistic(interval: SamplingInterval, statistic: F) -> Self {
201        TimeSeries { interval, statistic }
202    }
203
204    /// Folds interpolations for intervals intersected by the given [`Tick`] and gets the
205    /// aggregations.
206    ///
207    /// The returned iterator performs the computation and so it must be consumed to change the
208    /// state of the statistic.
209    ///
210    /// [`Tick`]: crate::experimental::clock::Tick
211    #[must_use]
212    fn interpolate_and_get_aggregations<'i, P>(
213        &'i mut self,
214        interpolation: &'i mut P,
215        tick: Tick,
216    ) -> impl 'i + Iterator<Item = Result<(NonZeroUsize, F::Aggregation), FoldError>>
217    where
218        P: Interpolation<F::Sample>,
219    {
220        self.interval.fold_and_get_expirations(tick, PhantomData::<F::Sample>).flat_map(
221            move |expiration| {
222                expiration
223                    .interpolate_and_get_aggregation(&mut self.statistic, interpolation)
224                    .transpose()
225            },
226        )
227    }
228
229    /// Folds the given sample and interpolations for intervals intersected by the given [`Tick`]
230    /// and gets the aggregations.
231    ///
232    /// The returned iterator performs the computation and so it must be consumed to change the
233    /// state of the statistic.
234    ///
235    /// [`Tick`]: crate::experimental::clock::Tick
236    #[must_use]
237    fn fold_and_get_aggregations<'i, P>(
238        &'i mut self,
239        interpolation: &'i mut P,
240        tick: Tick,
241        sample: F::Sample,
242    ) -> impl 'i + Iterator<Item = Result<(NonZeroUsize, F::Aggregation), FoldError>>
243    where
244        P: Interpolation<F::Sample>,
245    {
246        self.interval.fold_and_get_expirations(tick, sample).flat_map(move |expiration| {
247            expiration.fold_and_get_aggregation(&mut self.statistic, interpolation).transpose()
248        })
249    }
250
251    /// Gets the sampling interval of the series.
252    pub fn interval(&self) -> &SamplingInterval {
253        &self.interval
254    }
255}
256
257impl<F, R, A> TimeSeries<PostAggregation<F, R>>
258where
259    F: Default + Statistic,
260    R: Clone + Fn(F::Aggregation) -> A,
261    A: Clone,
262{
263    pub fn with_transform(interval: SamplingInterval, transform: R) -> Self {
264        TimeSeries { interval, statistic: PostAggregation::from_transform(transform) }
265    }
266}
267
268/// A buffered round-robin statistical time series.
269///
270/// This type composes a [`TimeSeries`] with a round-robin buffer of aggregations and interpolation
271/// state. Aggregations produced by the time series when sampling or interpolating are pushed into
272/// the buffer.
273#[derive(Derivative)]
274#[derivative(
275    Clone(bound = "F: Clone, F::Buffer: Clone, P::Output<F::Sample>: Clone,"),
276    Debug(bound = "F: Debug,
277                   F::Buffer: Debug,
278                   P::Output<F::Sample>: Debug,")
279)]
280struct BufferedTimeSeries<F, P>
281where
282    F: SerialStatistic<P>,
283    P: InterpolationKind,
284{
285    buffer: F::Buffer,
286    interpolation: P::Output<F::Sample>,
287    series: TimeSeries<F>,
288}
289
290impl<F, P> BufferedTimeSeries<F, P>
291where
292    F: SerialStatistic<P>,
293    P: InterpolationKind,
294{
295    pub fn new(interpolation: P::Output<F::Sample>, series: TimeSeries<F>) -> Self {
296        let buffer = F::buffer(&series.interval);
297        BufferedTimeSeries { buffer, interpolation, series }
298    }
299
300    /// Folds interpolations for intervals intersected by the given [`Tick`] and buffers the
301    /// aggregations.
302    ///
303    /// # Errors
304    ///
305    /// Returns an error if sampling fails.
306    ///
307    /// [`Tick`]: crate::experimental::clock::Tick
308    fn interpolate(&mut self, tick: Tick) -> Result<(), FoldError> {
309        for aggregation in
310            self.series.interpolate_and_get_aggregations(&mut self.interpolation, tick)
311        {
312            let (count, aggregation) = aggregation?;
313            if count.get() == 1 {
314                self.buffer.push(aggregation);
315            } else {
316                self.buffer.fill(aggregation, count);
317            }
318        }
319        Ok(())
320    }
321
322    /// Folds the given sample and interpolations for intervals intersected by the given [`Tick`]
323    /// and buffers the aggregations.
324    ///
325    /// # Errors
326    ///
327    /// Returns an error if sampling fails.
328    ///
329    /// [`Tick`]: crate::experimental::clock::Tick
330    fn fold(&mut self, tick: Tick, sample: F::Sample) -> Result<(), FoldError> {
331        for aggregation in
332            self.series.fold_and_get_aggregations(&mut self.interpolation, tick, sample)
333        {
334            let (count, aggregation) = aggregation?;
335            if count.get() == 1 {
336                self.buffer.push(aggregation);
337            } else {
338                self.buffer.fill(aggregation, count);
339            }
340        }
341        Ok(())
342    }
343
344    pub fn serialize_and_get_buffer(&self) -> io::Result<SerializedTimeSeries> {
345        let mut data = vec![];
346        self.buffer.serialize(&mut data)?;
347        Ok(SerializedTimeSeries { interval: *self.series.interval(), data })
348    }
349}
350
351/// A buffer of data from time matrix.
352#[derive(Clone, Debug, PartialEq)]
353pub struct SerializedBuffer {
354    pub data_semantic: String,
355    pub data: Vec<u8>,
356}
357
358/// One or more statistical round-robin time series.
359///
360/// A time matrix is a round-robin multi-resolution time series that samples and interpolates timed
361/// data, computes statistical aggregations for elapsed [sampling intervals][`SamplingInterval`],
362/// and buffers those aggregations. The sample data, statistic, and interpolation of series in a
363/// time matrix must be the same, but the sampling intervals can and should differ.
364#[derive(Derivative)]
365#[derivative(
366    Clone(bound = "F: Clone, F::Buffer: Clone, P::Output<F::Sample>: Clone,"),
367    Debug(bound = "F: Debug,
368                   F::Buffer: Debug,
369                   P::Output<F::Sample>: Debug,")
370)]
371pub struct TimeMatrix<F, P>
372where
373    F: SerialStatistic<P>,
374    P: InterpolationKind,
375{
376    created: Timestamp,
377    last: ObservationTime,
378    buffers: Vec1<BufferedTimeSeries<F, P>>,
379}
380
381impl<F, P> TimeMatrix<F, P>
382where
383    F: SerialStatistic<P>,
384    P: InterpolationKind,
385{
386    fn from_series_with<Q>(series: impl Into<Vec1<TimeSeries<F>>>, mut interpolation: Q) -> Self
387    where
388        Q: FnMut() -> P::Output<F::Sample>,
389    {
390        let buffers =
391            series.into().map_into(|series| BufferedTimeSeries::new((interpolation)(), series));
392        TimeMatrix { created: Timestamp::now(), last: ObservationTime::default(), buffers }
393    }
394
395    /// Constructs a time matrix with the given sampling profile and interpolation.
396    ///
397    /// Statistics are default initialized.
398    pub fn new(profile: impl Into<SamplingProfile>, interpolation: P::Output<F::Sample>) -> Self
399    where
400        F: Default,
401    {
402        let sampling_intervals = profile.into().into_sampling_intervals();
403        TimeMatrix::from_series_with(sampling_intervals.map_into(TimeSeries::new), || {
404            interpolation.clone()
405        })
406    }
407
408    /// Constructs a time matrix with the given statistic.
409    pub fn with_statistic(
410        profile: impl Into<SamplingProfile>,
411        interpolation: P::Output<F::Sample>,
412        statistic: F,
413    ) -> Self {
414        let sampling_intervals = profile.into().into_sampling_intervals();
415        TimeMatrix::from_series_with(
416            sampling_intervals
417                .map_into(|window| TimeSeries::with_statistic(window, statistic.clone())),
418            || interpolation.clone(),
419        )
420    }
421
422    /// Folds the given sample and interpolations and gets the aggregation buffers.
423    ///
424    /// To fold a sample without serializing buffers, use [`Sampler::fold`].
425    ///
426    /// [`Sampler::fold`]: crate::experimental::series::Sampler::fold
427    pub fn fold_and_get_buffers(
428        &mut self,
429        sample: Timed<F::Sample>,
430    ) -> Result<SerializedBuffer, FoldError> {
431        self.fold(sample)?;
432        let series_buffers = self
433            .buffers
434            .try_map_ref(BufferedTimeSeries::serialize_and_get_buffer)
435            .map_err::<FoldError, _>(From::from)?;
436        self.serialize(series_buffers).map_err(From::from)
437    }
438
439    fn serialize(
440        &self,
441        series_buffers: Vec1<SerializedTimeSeries>,
442    ) -> io::Result<SerializedBuffer> {
443        use crate::experimental::clock::DurationExt;
444        use byteorder::{LittleEndian, WriteBytesExt};
445        use std::io::Write;
446
447        let created_timestamp = u32::try_from(self.created.quantize()).unwrap_or(u32::MAX);
448        let end_timestamp =
449            u32::try_from(self.last.last_update_timestamp.quantize()).unwrap_or(u32::MAX);
450
451        let mut buffer = vec![];
452        buffer.write_u8(1)?; // Version number.
453        buffer.write_u32::<LittleEndian>(created_timestamp)?; // Matrix creation time.
454        buffer.write_u32::<LittleEndian>(end_timestamp)?; // Last observed or interpolated sample
455        // time.
456        encoding::serialize_buffer_type_descriptors::<F, P>(&mut buffer)?; // Buffer descriptors.
457
458        for series in series_buffers {
459            const GRANULARITY_FIELD_LEN: usize = 2;
460            let len = u16::try_from(series.data.len() + GRANULARITY_FIELD_LEN).unwrap_or(u16::MAX);
461            let granularity =
462                u16::try_from(series.interval().duration().into_quanta()).unwrap_or(u16::MAX);
463
464            buffer.write_u16::<LittleEndian>(len)?;
465            buffer.write_u16::<LittleEndian>(granularity)?;
466            buffer.write_all(&series.data[..len as usize - GRANULARITY_FIELD_LEN])?;
467        }
468        Ok(SerializedBuffer {
469            data_semantic: format!("{}", <F as Statistic>::Semantic::display()),
470            data: buffer,
471        })
472    }
473}
474
475impl<F, R, P, A> TimeMatrix<PostAggregation<F, R>, P>
476where
477    PostAggregation<F, R>: SerialStatistic<P, Aggregation = A>,
478    F: Default + SerialStatistic<P>,
479    R: Clone + Fn(F::Aggregation) -> A,
480    P: InterpolationKind,
481    A: Clone,
482{
483    /// Constructs a time matrix with the default statistic and given transform for
484    /// post-aggregation.
485    pub fn with_transform(
486        profile: impl Into<SamplingProfile>,
487        interpolation: P::Output<<PostAggregation<F, R> as Statistic>::Sample>,
488        transform: R,
489    ) -> Self
490    where
491        R: Clone,
492    {
493        let sampling_intervals = profile.into().into_sampling_intervals();
494        TimeMatrix::from_series_with(
495            sampling_intervals
496                .map_into(|window| TimeSeries::with_transform(window, transform.clone())),
497            || interpolation.clone(),
498        )
499    }
500}
501
502impl<F, P> Default for TimeMatrix<F, P>
503where
504    F: Default + SerialStatistic<P>,
505    P: InterpolationKind,
506    P::Output<F::Sample>: Default,
507{
508    fn default() -> Self {
509        TimeMatrix::new(SamplingProfile::default(), P::Output::default())
510    }
511}
512
513impl<F, P> Interpolator for TimeMatrix<F, P>
514where
515    F: SerialStatistic<P>,
516    P: InterpolationKind,
517{
518    fn interpolate(&mut self, timestamp: Timestamp) -> Result<(), FoldError> {
519        let tick = self.last.tick(timestamp.into(), false)?;
520        Ok(for buffer in self.buffers.iter_mut() {
521            buffer.interpolate(tick)?;
522        })
523    }
524
525    fn interpolate_and_get_buffers(
526        &mut self,
527        timestamp: Timestamp,
528    ) -> Result<SerializedBuffer, FoldError> {
529        self.interpolate(timestamp)?;
530        let series_buffers = self
531            .buffers
532            .try_map_ref(BufferedTimeSeries::serialize_and_get_buffer)
533            .map_err::<FoldError, _>(From::from)?;
534        self.serialize(series_buffers).map_err(From::from)
535    }
536}
537
538impl<F, P> MatrixSampler<F::Sample> for TimeMatrix<F, P>
539where
540    F: SerialStatistic<P>,
541    P: InterpolationKind,
542{
543    fn fold(&mut self, sample: Timed<F::Sample>) -> Result<(), FoldError> {
544        let (timestamp, sample) = sample.into();
545        let tick = self.last.tick(timestamp, true)?;
546        Ok(for buffer in self.buffers.iter_mut() {
547            buffer.fold(tick, sample.clone())?;
548        })
549    }
550}
551
552#[cfg(test)]
553mod tests {
554    use fuchsia_async as fasync;
555
556    use crate::experimental::clock::{Timed, Timestamp};
557    use crate::experimental::series::interpolation::{ConstantSample, LastSample};
558    use crate::experimental::series::statistic::{
559        ArithmeticMean, LatchMax, Max, PostAggregation, Sum, Transform, Union,
560    };
561    use crate::experimental::series::{Interpolator, MatrixSampler, SamplingProfile, TimeMatrix};
562
563    fn fold_and_interpolate_f32(sampler: &mut impl MatrixSampler<f32>) {
564        sampler.fold(Timed::now(0.0)).unwrap();
565        sampler.fold(Timed::now(1.0)).unwrap();
566        sampler.fold(Timed::now(2.0)).unwrap();
567        let _buffers = sampler.interpolate(Timestamp::now()).unwrap();
568    }
569
570    // TODO(https://fxbug.dev/356218503): Replace this with meaningful unit tests that assert the
571    //                                    outputs of a `TimeMatrix`.
572    // This "test" is considered successful as long as it builds.
573    #[test]
574    fn static_test_define_time_matrix() {
575        type Mean<T> = ArithmeticMean<T>;
576        type MeanTransform<T, F> = Transform<Mean<T>, F>;
577
578        let _exec = fasync::TestExecutor::new_with_fake_time();
579
580        // Arithmetic mean time matrices.
581        let _ = TimeMatrix::<Mean<f32>, ConstantSample>::default();
582        let _ = TimeMatrix::<Mean<f32>, LastSample>::new(
583            SamplingProfile::balanced(),
584            LastSample::or(0.0f32),
585        );
586        let _ = TimeMatrix::<_, ConstantSample>::with_statistic(
587            SamplingProfile::granular(),
588            ConstantSample::default(),
589            Mean::<f32>::default(),
590        );
591
592        // Discrete arithmetic mean time matrices.
593        let mut matrix = TimeMatrix::<MeanTransform<f32, i64>, LastSample>::with_transform(
594            SamplingProfile::highly_granular(),
595            LastSample::or(0.0f32),
596            |aggregation| aggregation.ceil() as i64,
597        );
598        fold_and_interpolate_f32(&mut matrix);
599        // This time matrix is constructed verbosely with no ad-hoc type definitions nor ergonomic
600        // constructors. This is as raw as it gets.
601        let mut matrix = TimeMatrix::<_, ConstantSample>::with_statistic(
602            SamplingProfile::default(),
603            ConstantSample::default(),
604            PostAggregation::<ArithmeticMean<f32>, _>::from_transform(|aggregation: f32| {
605                aggregation.ceil() as i64
606            }),
607        );
608        fold_and_interpolate_f32(&mut matrix);
609    }
610
611    // TODO(https://fxbug.dev/356218503): Replace this with meaningful unit tests that assert the
612    //                                    outputs of a `TimeMatrix`.
613    // This "test" is considered successful as long as it builds.
614    #[test]
615    fn static_test_supported_statistic_and_interpolation_combinations() {
616        let _exec = fasync::TestExecutor::new_with_fake_time();
617
618        let _ = TimeMatrix::<ArithmeticMean<f32>, ConstantSample>::default();
619        let _ = TimeMatrix::<ArithmeticMean<f32>, LastSample>::default();
620        let _ = TimeMatrix::<LatchMax<u64>, LastSample>::default();
621        let _ = TimeMatrix::<Max<u64>, ConstantSample>::default();
622        let _ = TimeMatrix::<Max<u64>, LastSample>::default();
623        let _ = TimeMatrix::<Sum<u64>, ConstantSample>::default();
624        let _ = TimeMatrix::<Sum<u64>, LastSample>::default();
625        let _ = TimeMatrix::<Union<u64>, ConstantSample>::default();
626        let _ = TimeMatrix::<Union<u64>, LastSample>::default();
627    }
628
629    #[test]
630    fn time_matrix_with_uncompressed_buffer() {
631        let exec = fasync::TestExecutor::new_with_fake_time();
632        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
633        let mut time_matrix = TimeMatrix::<ArithmeticMean<f32>, ConstantSample>::new(
634            SamplingProfile::highly_granular(),
635            ConstantSample::default(),
636        );
637        let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
638        assert_eq!(
639            buffer.data,
640            vec![
641                1, // version number
642                3, 0, 0, 0, // created timestamp
643                3, 0, 0, 0, // last timestamp
644                0, 0, // type: uncompressed; subtype: f32
645                4, 0, // series 1: length in bytes
646                10, 0, // series 1 granularity: 10s
647                0, 0, // number of elements
648                4, 0, // series 2: length in bytes
649                60, 0, // series 2 granularity: 60s
650                0, 0, // number of elements
651            ]
652        );
653
654        time_matrix.fold(Timed::now(f32::from_bits(42u32))).unwrap();
655        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
656        let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
657        assert_eq!(
658            buffer.data,
659            vec![
660                1, // version number
661                3, 0, 0, 0, // created timestamp
662                10, 0, 0, 0, // last timestamp
663                0, 0, // type: uncompressed; subtype: f32
664                8, 0, // series 1: length in bytes
665                10, 0, // series 1 granularity: 10s
666                1, 0, // number of elements
667                42, 0, 0, 0, // item 1
668                4, 0, // series 2: length in bytes
669                60, 0, // series 2 granularity: 60s
670                0, 0, // number of elements
671            ]
672        );
673
674        // Advance several time steps to test ring buffer's `fill`
675        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(50_000_000_000));
676        let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
677        assert_eq!(
678            buffer.data,
679            vec![
680                1, // version number
681                3, 0, 0, 0, // created timestamp
682                50, 0, 0, 0, // last timestamp
683                0, 0, // type: uncompressed; subtype: f32
684                24, 0, // series 1: length in bytes
685                10, 0, // series 1 granularity: 10s
686                5, 0, // number of elements
687                42, 0, 0, 0, // item 1
688                0, 0, 0, 0, // item 2
689                0, 0, 0, 0, // item 3
690                0, 0, 0, 0, // item 4
691                0, 0, 0, 0, // item 5
692                4, 0, // series 2: length in bytes
693                60, 0, // series 2 granularity: 60s
694                0, 0, // number of elements
695            ]
696        );
697    }
698
699    #[test]
700    fn time_matrix_with_simple8b_rle_buffer() {
701        let exec = fasync::TestExecutor::new_with_fake_time();
702        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
703        let mut time_matrix = TimeMatrix::<Max<u64>, ConstantSample>::new(
704            SamplingProfile::highly_granular(),
705            ConstantSample::default(),
706        );
707        let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
708        assert_eq!(
709            buffer.data,
710            vec![
711                1, // version number
712                3, 0, 0, 0, // created timestamp
713                3, 0, 0, 0, // last timestamp
714                1, 0, // type: simple8b RLE; subtype: unsigned
715                7, 0, // series 1: length in bytes
716                10, 0, // series 1 granularity: 10s
717                0, 0, // number of selector elements and value blocks
718                0, 0, // head selector index
719                0, // number of values in last block
720                7, 0, // series 2: length in bytes
721                60, 0, // series 2 granularity: 60s
722                0, 0, // number of selector elements and value blocks
723                0, 0, // head selector index
724                0, // number of values in last block
725            ]
726        );
727
728        time_matrix.fold(Timed::now(15)).unwrap();
729        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
730        let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
731        assert_eq!(
732            buffer.data,
733            vec![
734                1, // version number
735                3, 0, 0, 0, // created timestamp
736                10, 0, 0, 0, // last timestamp
737                1, 0, // type: simple8b RLE; subtype: unsigned
738                16, 0, // series 1: length in bytes
739                10, 0, // series 1 granularity: 10s
740                1, 0, // number of selector elements and value blocks
741                0, 0,    // head selector index
742                1,    // number of values in last block
743                0x0f, // RLE selector
744                15, 0, 0, 0, 0, 0, 1, 0, // value 15 appears 1 time
745                7, 0, // series 2: length in bytes
746                60, 0, // series 2 granularity: 60s
747                0, 0, // number of selector elements and value blocks
748                0, 0, // head selector index
749                0, // number of values in last block
750            ]
751        );
752
753        // Advance several time steps to test ring buffer's `fill`
754        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(50_000_000_000));
755        let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
756        assert_eq!(
757            buffer.data,
758            vec![
759                1, // version number
760                3, 0, 0, 0, // created timestamp
761                50, 0, 0, 0, // last timestamp
762                1, 0, // type: simple8b RLE; subtype: unsigned
763                16, 0, // series 1: length in bytes
764                10, 0, // series 1 granularity: 10s
765                1, 0, // number of selector elements and value blocks
766                0, 0,    // head selector index
767                5,    // number of values in last block
768                0x03, // 4-bit selector
769                0x0f, 0, 0, 0, 0, 0, 0, 0, // values 15, 0, 0, 0, 0
770                7, 0, // series 2: length in bytes
771                60, 0, // series 2 granularity: 60s
772                0, 0, // number of selector elements and value blocks
773                0, 0, // head selector index
774                0, // number of values in last block
775            ]
776        );
777    }
778
779    #[test]
780    fn time_matrix_with_zigzag_simple8b_rle_buffer() {
781        let exec = fasync::TestExecutor::new_with_fake_time();
782        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
783        let mut time_matrix = TimeMatrix::<Max<i64>, ConstantSample>::new(
784            SamplingProfile::highly_granular(),
785            ConstantSample::default(),
786        );
787        let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
788        assert_eq!(
789            buffer.data,
790            vec![
791                1, // version number
792                3, 0, 0, 0, // created timestamp
793                3, 0, 0, 0, // last timestamp
794                1, 1, // type: simple8b RLE; subtype: signed (zigzag encoded)
795                7, 0, // series 1: length in bytes
796                10, 0, // series 1 granularity: 10s
797                0, 0, // number of selector elements and value blocks
798                0, 0, // head selector index
799                0, // number of values in last block
800                7, 0, // series 2: length in bytes
801                60, 0, // series 2 granularity: 60s
802                0, 0, // number of selector elements and value blocks
803                0, 0, // head selector index
804                0, // number of values in last block
805            ]
806        );
807
808        time_matrix.fold(Timed::now(-8)).unwrap();
809        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
810        let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
811        assert_eq!(
812            buffer.data,
813            vec![
814                1, // version number
815                3, 0, 0, 0, // created timestamp
816                10, 0, 0, 0, // last timestamp
817                1, 1, // type: simple8b RLE; subtype: signed (zigzag encoded)
818                16, 0, // series 1: length in bytes
819                10, 0, // series 1 granularity: 10s
820                1, 0, // number of selector elements and value blocks
821                0, 0,    // head selector index
822                1,    // number of values in last block
823                0x0f, // RLE selector
824                15, 0, 0, 0, 0, 0, 1, 0, // value -8 (encoded as 15) appears 1 time
825                7, 0, // series 2: length in bytes
826                60, 0, // series 2 granularity: 60s
827                0, 0, // number of selector elements and value blocks
828                0, 0, // head selector index
829                0, // number of values in last block
830            ]
831        );
832
833        // Advance several time steps to test ring buffer's `fill`
834        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(50_000_000_000));
835        let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
836        assert_eq!(
837            buffer.data,
838            vec![
839                1, // version number
840                3, 0, 0, 0, // created timestamp
841                50, 0, 0, 0, // last timestamp
842                1, 1, // type: simple8b RLE; subtype: signed (zigzag encoded)
843                16, 0, // series 1: length in bytes
844                10, 0, // series 1 granularity: 10s
845                1, 0, // number of selector elements and value blocks
846                0, 0,    // head selector index
847                5,    // number of values in last block
848                0x03, // 4-bit selector
849                0x0f, 0, 0, 0, 0, 0, 0, 0, // values -8 (encoded as 15), 0, 0, 0, 0
850                7, 0, // series 2: length in bytes
851                60, 0, // series 2 granularity: 60s
852                0, 0, // number of selector elements and value blocks
853                0, 0, // head selector index
854                0, // number of values in last block
855            ]
856        );
857    }
858
859    #[test]
860    fn time_matrix_with_delta_simple8b_rle_buffer() {
861        let exec = fasync::TestExecutor::new_with_fake_time();
862        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
863        let mut time_matrix = TimeMatrix::<LatchMax<u64>, LastSample>::new(
864            SamplingProfile::highly_granular(),
865            LastSample::or(0),
866        );
867        let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
868        assert_eq!(
869            buffer.data,
870            vec![
871                1, // version number
872                3, 0, 0, 0, // created timestamp
873                3, 0, 0, 0, // last timestamp
874                2, 0, // type: delta simple8b RLE; subtype: unsigned
875                7, 0, // series 1: length in bytes
876                10, 0, // series 1 granularity: 10s
877                0, 0, // number of base value + selector elements or value blocks
878                0, 0, // head selector index
879                0, // number of values in last block
880                7, 0, // series 2: length in bytes
881                60, 0, // series 2 granularity: 60s
882                0, 0, // number of base value + selector elements or value blocks
883                0, 0, // head selector index
884                0, // number of values in last block
885            ]
886        );
887
888        time_matrix.fold(Timed::now(42)).unwrap();
889        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
890        let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
891        assert_eq!(
892            buffer.data,
893            vec![
894                1, // version number
895                3, 0, 0, 0, // created timestamp
896                10, 0, 0, 0, // last timestamp
897                2, 0, // type: delta simple8b RLE; subtype: unsigned
898                15, 0, // series 1: length in bytes
899                10, 0, // series 1 granularity: 10s
900                1, 0, // number of base value + selector elements or value blocks
901                0, 0, // head selector index
902                0, // number of values in last block
903                42, 0, 0, 0, 0, 0, 0, 0, // base value
904                7, 0, // series 2: length in bytes
905                60, 0, // series 2 granularity: 60s
906                0, 0, // number of base value + selector elements or value blocks
907                0, 0, // head selector index
908                0, // number of values in last block
909            ]
910        );
911
912        time_matrix.fold(Timed::now(57)).unwrap();
913        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(20_000_000_000));
914        let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
915        assert_eq!(
916            buffer.data,
917            vec![
918                1, // version number
919                3, 0, 0, 0, // created timestamp
920                20, 0, 0, 0, // last timestamp
921                2, 0, // type: delta simple8b RLE; subtype: unsigned
922                24, 0, // series 1: length in bytes
923                10, 0, // series 1 granularity: 10s
924                2, 0, // number of base value + selector elements or value blocks
925                0, 0, // head selector index
926                1, // number of values in last block
927                42, 0, 0, 0, 0, 0, 0, 0,    // base value
928                0x0f, // RLE selector
929                15, 0, 0, 0, 0, 0, 1, 0, // value 15 (delta) appears 1 time
930                7, 0, // series 2: length in bytes
931                60, 0, // series 2 granularity: 60s
932                0, 0, // number of base value + selector elements or value blocks
933                0, 0, // head selector index
934                0, // number of values in last block
935            ]
936        );
937
938        // Advance several time steps to test ring buffer's `fill`
939        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(50_000_000_000));
940        let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
941        assert_eq!(
942            buffer.data,
943            vec![
944                1, // version number
945                3, 0, 0, 0, // created timestamp
946                50, 0, 0, 0, // last timestamp
947                2, 0, // type: delta simple8b RLE; subtype: unsigned
948                24, 0, // series 1: length in bytes
949                10, 0, // series 1 granularity: 10s
950                2, 0, // number of base value + selector elements or value blocks
951                0, 0, // head selector index
952                4, // number of values in last block
953                42, 0, 0, 0, 0, 0, 0, 0,    // base value
954                0x03, // 4-bit selector
955                0x0f, 0, 0, 0, 0, 0, 0, 0, // values 15, 0, 0, 0
956                7, 0, // series 2: length in bytes
957                60, 0, // series 2 granularity: 60s
958                0, 0, // number of base value + selector elements or value blocks
959                0, 0, // head selector index
960                0, // number of values in last block
961            ]
962        );
963    }
964
965    #[test]
966    fn time_matrix_with_delta_zigzag_simple8b_rle_buffer_i64() {
967        let exec = fasync::TestExecutor::new_with_fake_time();
968        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
969        let mut time_matrix = TimeMatrix::<Max<i64>, LastSample>::new(
970            SamplingProfile::highly_granular(),
971            LastSample::or(0),
972        );
973        let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
974        assert_eq!(
975            buffer.data,
976            vec![
977                1, // version number
978                3, 0, 0, 0, // created timestamp
979                3, 0, 0, 0, // last timestamp
980                2, 1, // type: delta simple8b RLE; subtype: signed
981                7, 0, // series 1: length in bytes
982                10, 0, // series 1 granularity: 10s
983                0, 0, // number of base value + selector elements or value blocks
984                0, 0, // head selector index
985                0, // number of values in last block
986                7, 0, // series 2: length in bytes
987                60, 0, // series 2 granularity: 60s
988                0, 0, // number of base value + selector elements or value blocks
989                0, 0, // head selector index
990                0, // number of values in last block
991            ]
992        );
993
994        time_matrix.fold(Timed::now(42)).unwrap();
995        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
996        let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
997        assert_eq!(
998            buffer.data,
999            vec![
1000                1, // version number
1001                3, 0, 0, 0, // created timestamp
1002                10, 0, 0, 0, // last timestamp
1003                2, 1, // type: delta simple8b RLE; subtype: signed
1004                15, 0, // series 1: length in bytes
1005                10, 0, // series 1 granularity: 10s
1006                1, 0, // number of base value + selector elements or value blocks
1007                0, 0, // head selector index
1008                0, // number of values in last block
1009                42, 0, 0, 0, 0, 0, 0, 0, // base value
1010                7, 0, // series 2: length in bytes
1011                60, 0, // series 2 granularity: 60s
1012                0, 0, // number of base value + selector elements or value blocks
1013                0, 0, // head selector index
1014                0, // number of values in last block
1015            ]
1016        );
1017
1018        time_matrix.fold(Timed::now(34)).unwrap();
1019        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(20_000_000_000));
1020        let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
1021        assert_eq!(
1022            buffer.data,
1023            vec![
1024                1, // version number
1025                3, 0, 0, 0, // created timestamp
1026                20, 0, 0, 0, // last timestamp
1027                2, 1, // type: delta simple8b RLE; subtype: signed
1028                24, 0, // series 1: length in bytes
1029                10, 0, // series 1 granularity: 10s
1030                2, 0, // number of base value + selector elements or value blocks
1031                0, 0, // head selector index
1032                1, // number of values in last block
1033                42, 0, 0, 0, 0, 0, 0, 0,    // base value
1034                0x0f, // RLE selector
1035                15, 0, 0, 0, 0, 0, 1, 0, // value -8 (delta) encoded as 15, appearing 1 time
1036                7, 0, // series 2: length in bytes
1037                60, 0, // series 2 granularity: 60s
1038                0, 0, // number of base value + selector elements or value blocks
1039                0, 0, // head selector index
1040                0, // number of values in last block
1041            ]
1042        );
1043
1044        // Advance several time steps to test ring buffer's `fill`
1045        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(50_000_000_000));
1046        let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
1047        assert_eq!(
1048            buffer.data,
1049            vec![
1050                1, // version number
1051                3, 0, 0, 0, // created timestamp
1052                50, 0, 0, 0, // last timestamp
1053                2, 1, // type: delta simple8b RLE; subtype: signed
1054                24, 0, // series 1: length in bytes
1055                10, 0, // series 1 granularity: 10s
1056                2, 0, // number of base value + selector elements or value blocks
1057                0, 0, // head selector index
1058                4, // number of values in last block
1059                42, 0, 0, 0, 0, 0, 0, 0,    // base value
1060                0x03, // 4-bit selector
1061                0x0f, 0, 0, 0, 0, 0, 0, 0, // diff values -8 (encoded as 15), 0, 0, 0
1062                7, 0, // series 2: length in bytes
1063                60, 0, // series 2 granularity: 60s
1064                0, 0, // number of base value + selector elements or value blocks
1065                0, 0, // head selector index
1066                0, // number of values in last block
1067            ]
1068        );
1069    }
1070
1071    #[test]
1072    fn time_matrix_with_delta_zigzag_simple8b_rle_buffer_u64() {
1073        let exec = fasync::TestExecutor::new_with_fake_time();
1074        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
1075        let mut time_matrix = TimeMatrix::<Max<u64>, LastSample>::new(
1076            SamplingProfile::highly_granular(),
1077            LastSample::or(0),
1078        );
1079        let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
1080        assert_eq!(
1081            buffer.data,
1082            vec![
1083                1, // version number
1084                3, 0, 0, 0, // created timestamp
1085                3, 0, 0, 0, // last timestamp
1086                2, 2, // type: delta simple8b RLE; subtype: unsigned with signed diff
1087                7, 0, // series 1: length in bytes
1088                10, 0, // series 1 granularity: 10s
1089                0, 0, // number of base value + selector elements or value blocks
1090                0, 0, // head selector index
1091                0, // number of values in last block
1092                7, 0, // series 2: length in bytes
1093                60, 0, // series 2 granularity: 60s
1094                0, 0, // number of base value + selector elements or value blocks
1095                0, 0, // head selector index
1096                0, // number of values in last block
1097            ]
1098        );
1099
1100        time_matrix.fold(Timed::now(1)).unwrap();
1101        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
1102        let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
1103        assert_eq!(
1104            buffer.data,
1105            vec![
1106                1, // version number
1107                3, 0, 0, 0, // created timestamp
1108                10, 0, 0, 0, // last timestamp
1109                2, 2, // type: delta simple8b RLE; subtype: unsigned with signed diff
1110                15, 0, // series 1: length in bytes
1111                10, 0, // series 1 granularity: 10s
1112                1, 0, // number of base value + selector elements or value blocks
1113                0, 0, // head selector index
1114                0, // number of values in last block
1115                1, 0, 0, 0, 0, 0, 0, 0, // base value
1116                7, 0, // series 2: length in bytes
1117                60, 0, // series 2 granularity: 60s
1118                0, 0, // number of base value + selector elements or value blocks
1119                0, 0, // head selector index
1120                0, // number of values in last block
1121            ]
1122        );
1123
1124        time_matrix.fold(Timed::now(u64::MAX)).unwrap();
1125        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(20_000_000_000));
1126        let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
1127        assert_eq!(
1128            buffer.data,
1129            vec![
1130                1, // version number
1131                3, 0, 0, 0, // created timestamp
1132                20, 0, 0, 0, // last timestamp
1133                2, 2, // type: delta simple8b RLE; subtype: unsigned with signed diff
1134                24, 0, // series 1: length in bytes
1135                10, 0, // series 1 granularity: 10s
1136                2, 0, // number of base value + selector elements or value blocks
1137                0, 0, // head selector index
1138                1, // number of values in last block
1139                1, 0, 0, 0, 0, 0, 0, 0,    // base value
1140                0x0f, // RLE selector
1141                3, 0, 0, 0, 0, 0, 1, 0, // value -2 (delta) encoded as 3, appearing 1 time
1142                7, 0, // series 2: length in bytes
1143                60, 0, // series 2 granularity: 60s
1144                0, 0, // number of base value + selector elements or value blocks
1145                0, 0, // head selector index
1146                0, // number of values in last block
1147            ]
1148        );
1149
1150        // Advance several time steps to test ring buffer's `fill`
1151        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(50_000_000_000));
1152        let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
1153        assert_eq!(
1154            buffer.data,
1155            vec![
1156                1, // version number
1157                3, 0, 0, 0, // created timestamp
1158                50, 0, 0, 0, // last timestamp
1159                2, 2, // type: delta simple8b RLE; subtype: unsigned with signed diff
1160                24, 0, // series 1: length in bytes
1161                10, 0, // series 1 granularity: 10s
1162                2, 0, // number of base value + selector elements or value blocks
1163                0, 0, // head selector index
1164                4, // number of values in last block
1165                1, 0, 0, 0, 0, 0, 0, 0,    // base value
1166                0x01, // 2-bit selector
1167                3, 0, 0, 0, 0, 0, 0, 0, // diff values -2 (encoded as 3), 0, 0, 0
1168                7, 0, // series 2: length in bytes
1169                60, 0, // series 2 granularity: 60s
1170                0, 0, // number of base value + selector elements or value blocks
1171                0, 0, // head selector index
1172                0, // number of values in last block
1173            ]
1174        );
1175    }
1176}