windowed_stats/experimental/series/mod.rs
1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Round-robin multi-resolution time series.
6
7mod interval;
8
9pub(crate) mod buffer;
10
11pub mod interpolation;
12pub mod metadata;
13pub mod statistic;
14
15use derivative::Derivative;
16use std::fmt::{Debug, Display};
17use std::io;
18use std::marker::PhantomData;
19use std::num::NonZeroUsize;
20
21use crate::experimental::Vec1;
22use crate::experimental::clock::{ObservationTime, Tick, Timed, Timestamp, TimestampExt};
23use crate::experimental::series::buffer::{
24 BufferStrategy, DeltaSimple8bRle, DeltaZigzagSimple8bRle, RingBuffer, Simple8bRle,
25 Uncompressed, ZigzagSimple8bRle, encoding,
26};
27use crate::experimental::series::interpolation::{
28 ConstantSample, Interpolation, InterpolationKind, LastSample,
29};
30use crate::experimental::series::metadata::{BitSetIndex, Metadata};
31use crate::experimental::series::statistic::{
32 FoldError, PostAggregation, SerialStatistic, Statistic,
33};
34
35pub use crate::experimental::series::buffer::Capacity;
36pub use crate::experimental::series::interval::{SamplingInterval, SamplingProfile};
37
38pub trait Interpolator {
39 /// Interpolates samples to the given timestamp.
40 ///
41 /// This function queries the aggregations of the series. Typically, the timestamp is the
42 /// current time.
43 fn interpolate(&mut self, timestamp: Timestamp) -> Result<(), FoldError>;
44
45 /// Interpolates samples to the given timestamp and gets the serialized aggregation buffers.
46 ///
47 /// This function queries the aggregations of the series. Typically, the timestamp is the
48 /// current time.
49 fn interpolate_and_get_buffers(
50 &mut self,
51 timestamp: Timestamp,
52 ) -> Result<SerializedBuffer, FoldError>;
53}
54
55/// A buffered round-robin sampler over [timed samples][`Timed`] (e.g., a [`TimeMatrix`]).
56///
57/// Round-robin samplers aggregate samples into buffered time series and produce a serialized
58/// buffer of aggregations per series.
59///
60/// [`Timed`]: crate::experimental::clock::Timed
61/// [`TimeMatrix`]: crate::experimental::series::TimeMatrix
62pub trait MatrixSampler<T>: Interpolator {
63 fn fold(&mut self, sample: Timed<T>) -> Result<(), FoldError>;
64}
65
66/// A type that describes the semantics of data folded by `Sampler`s.
67///
68/// Data semantics determine how statistics are interpreted and time series are aggregated and
69/// buffered.
70pub trait DataSemantic {
71 type Metadata: Metadata;
72
73 fn display() -> impl Display;
74}
75
76/// A continually increasing value.
77///
78/// Counters are analogous to an odometer in a vehicle.
79#[derive(Debug)]
80pub enum Counter {}
81
82impl BufferStrategy<u64, LastSample> for Counter {
83 type Buffer = DeltaSimple8bRle;
84}
85
86impl DataSemantic for Counter {
87 type Metadata = ();
88
89 fn display() -> impl Display {
90 "counter"
91 }
92}
93
94/// A fluctuating value.
95///
96/// Gauges are analogous to a speedometer in a vehicle.
97#[derive(Debug)]
98pub enum Gauge {}
99
100impl<P> BufferStrategy<f32, P> for Gauge
101where
102 P: InterpolationKind,
103{
104 type Buffer = Uncompressed<f32>;
105}
106
107impl BufferStrategy<i64, ConstantSample> for Gauge {
108 type Buffer = ZigzagSimple8bRle;
109}
110
111impl BufferStrategy<i64, LastSample> for Gauge {
112 type Buffer = DeltaZigzagSimple8bRle<i64>;
113}
114
115impl BufferStrategy<u64, ConstantSample> for Gauge {
116 type Buffer = Simple8bRle;
117}
118
119impl BufferStrategy<u64, LastSample> for Gauge {
120 type Buffer = DeltaZigzagSimple8bRle<u64>;
121}
122
123impl DataSemantic for Gauge {
124 type Metadata = ();
125
126 fn display() -> impl Display {
127 "gauge"
128 }
129}
130
131// TODO(https://fxbug.dev/375255178): Spell "bit set" consistently. `BitSet` suggests `bit_set` and
132// "bit set", but there are notably some instances of "bitset",
133// which instead suggests `Bitset` and `bitset`.
134/// A set of Boolean values.
135///
136/// Bit sets are analogous to indicator lamps in a vehicle.
137#[derive(Debug)]
138pub enum BitSet {}
139
140impl<A, P> BufferStrategy<A, P> for BitSet
141where
142 Simple8bRle: RingBuffer<A>,
143 P: InterpolationKind,
144{
145 type Buffer = Simple8bRle;
146}
147
148impl DataSemantic for BitSet {
149 type Metadata = BitSetIndex;
150
151 fn display() -> impl Display {
152 "bitset"
153 }
154}
155
156/// A buffer of serialized data from a time series.
157#[derive(Clone, Debug)]
158struct SerializedTimeSeries {
159 interval: SamplingInterval,
160 data: Vec<u8>,
161}
162
163impl SerializedTimeSeries {
164 /// Gets the sampling interval for the aggregations in the buffer.
165 pub fn interval(&self) -> &SamplingInterval {
166 &self.interval
167 }
168
169 /// Gets the serialized data.
170 pub fn data(&self) -> &[u8] {
171 self.data.as_slice()
172 }
173}
174
175/// An unbuffered statistical time series specification.
176///
177/// This type samples and interpolates timed data and produces aggregations per its statistic and
178/// sampling interval. It is a specification insofar that it does **not** buffer the series of
179/// aggregations.
180#[derive(Clone, Debug)]
181struct TimeSeries<F>
182where
183 F: Statistic,
184{
185 interval: SamplingInterval,
186 statistic: F,
187}
188
189impl<F> TimeSeries<F>
190where
191 F: Statistic,
192{
193 pub fn new(interval: SamplingInterval) -> Self
194 where
195 F: Default,
196 {
197 TimeSeries { interval, statistic: F::default() }
198 }
199
200 pub const fn with_statistic(interval: SamplingInterval, statistic: F) -> Self {
201 TimeSeries { interval, statistic }
202 }
203
204 /// Folds interpolations for intervals intersected by the given [`Tick`] and gets the
205 /// aggregations.
206 ///
207 /// The returned iterator performs the computation and so it must be consumed to change the
208 /// state of the statistic.
209 ///
210 /// [`Tick`]: crate::experimental::clock::Tick
211 #[must_use]
212 fn interpolate_and_get_aggregations<'i, P>(
213 &'i mut self,
214 interpolation: &'i mut P,
215 tick: Tick,
216 ) -> impl 'i + Iterator<Item = Result<(NonZeroUsize, F::Aggregation), FoldError>>
217 where
218 P: Interpolation<F::Sample>,
219 {
220 self.interval.fold_and_get_expirations(tick, PhantomData::<F::Sample>).flat_map(
221 move |expiration| {
222 expiration
223 .interpolate_and_get_aggregation(&mut self.statistic, interpolation)
224 .transpose()
225 },
226 )
227 }
228
229 /// Folds the given sample and interpolations for intervals intersected by the given [`Tick`]
230 /// and gets the aggregations.
231 ///
232 /// The returned iterator performs the computation and so it must be consumed to change the
233 /// state of the statistic.
234 ///
235 /// [`Tick`]: crate::experimental::clock::Tick
236 #[must_use]
237 fn fold_and_get_aggregations<'i, P>(
238 &'i mut self,
239 interpolation: &'i mut P,
240 tick: Tick,
241 sample: F::Sample,
242 ) -> impl 'i + Iterator<Item = Result<(NonZeroUsize, F::Aggregation), FoldError>>
243 where
244 P: Interpolation<F::Sample>,
245 {
246 self.interval.fold_and_get_expirations(tick, sample).flat_map(move |expiration| {
247 expiration.fold_and_get_aggregation(&mut self.statistic, interpolation).transpose()
248 })
249 }
250
251 /// Gets the sampling interval of the series.
252 pub fn interval(&self) -> &SamplingInterval {
253 &self.interval
254 }
255}
256
257impl<F, R, A> TimeSeries<PostAggregation<F, R>>
258where
259 F: Default + Statistic,
260 R: Clone + Fn(F::Aggregation) -> A,
261 A: Clone,
262{
263 pub fn with_transform(interval: SamplingInterval, transform: R) -> Self {
264 TimeSeries { interval, statistic: PostAggregation::from_transform(transform) }
265 }
266}
267
268/// A buffered round-robin statistical time series.
269///
270/// This type composes a [`TimeSeries`] with a round-robin buffer of aggregations and interpolation
271/// state. Aggregations produced by the time series when sampling or interpolating are pushed into
272/// the buffer.
273#[derive(Derivative)]
274#[derivative(
275 Clone(bound = "F: Clone, F::Buffer: Clone, P::Output<F::Sample>: Clone,"),
276 Debug(bound = "F: Debug,
277 F::Buffer: Debug,
278 P::Output<F::Sample>: Debug,")
279)]
280struct BufferedTimeSeries<F, P>
281where
282 F: SerialStatistic<P>,
283 P: InterpolationKind,
284{
285 buffer: F::Buffer,
286 interpolation: P::Output<F::Sample>,
287 series: TimeSeries<F>,
288}
289
290impl<F, P> BufferedTimeSeries<F, P>
291where
292 F: SerialStatistic<P>,
293 P: InterpolationKind,
294{
295 pub fn new(interpolation: P::Output<F::Sample>, series: TimeSeries<F>) -> Self {
296 let buffer = F::buffer(&series.interval);
297 BufferedTimeSeries { buffer, interpolation, series }
298 }
299
300 /// Folds interpolations for intervals intersected by the given [`Tick`] and buffers the
301 /// aggregations.
302 ///
303 /// # Errors
304 ///
305 /// Returns an error if sampling fails.
306 ///
307 /// [`Tick`]: crate::experimental::clock::Tick
308 fn interpolate(&mut self, tick: Tick) -> Result<(), FoldError> {
309 for aggregation in
310 self.series.interpolate_and_get_aggregations(&mut self.interpolation, tick)
311 {
312 let (count, aggregation) = aggregation?;
313 if count.get() == 1 {
314 self.buffer.push(aggregation);
315 } else {
316 self.buffer.fill(aggregation, count);
317 }
318 }
319 Ok(())
320 }
321
322 /// Folds the given sample and interpolations for intervals intersected by the given [`Tick`]
323 /// and buffers the aggregations.
324 ///
325 /// # Errors
326 ///
327 /// Returns an error if sampling fails.
328 ///
329 /// [`Tick`]: crate::experimental::clock::Tick
330 fn fold(&mut self, tick: Tick, sample: F::Sample) -> Result<(), FoldError> {
331 for aggregation in
332 self.series.fold_and_get_aggregations(&mut self.interpolation, tick, sample)
333 {
334 let (count, aggregation) = aggregation?;
335 if count.get() == 1 {
336 self.buffer.push(aggregation);
337 } else {
338 self.buffer.fill(aggregation, count);
339 }
340 }
341 Ok(())
342 }
343
344 pub fn serialize_and_get_buffer(&self) -> io::Result<SerializedTimeSeries> {
345 let mut data = vec![];
346 self.buffer.serialize(&mut data)?;
347 Ok(SerializedTimeSeries { interval: *self.series.interval(), data })
348 }
349}
350
351/// A buffer of data from time matrix.
352#[derive(Clone, Debug, PartialEq)]
353pub struct SerializedBuffer {
354 pub data_semantic: String,
355 pub data: Vec<u8>,
356}
357
358/// One or more statistical round-robin time series.
359///
360/// A time matrix is a round-robin multi-resolution time series that samples and interpolates timed
361/// data, computes statistical aggregations for elapsed [sampling intervals][`SamplingInterval`],
362/// and buffers those aggregations. The sample data, statistic, and interpolation of series in a
363/// time matrix must be the same, but the sampling intervals can and should differ.
364#[derive(Derivative)]
365#[derivative(
366 Clone(bound = "F: Clone, F::Buffer: Clone, P::Output<F::Sample>: Clone,"),
367 Debug(bound = "F: Debug,
368 F::Buffer: Debug,
369 P::Output<F::Sample>: Debug,")
370)]
371pub struct TimeMatrix<F, P>
372where
373 F: SerialStatistic<P>,
374 P: InterpolationKind,
375{
376 created: Timestamp,
377 last: ObservationTime,
378 buffers: Vec1<BufferedTimeSeries<F, P>>,
379}
380
381impl<F, P> TimeMatrix<F, P>
382where
383 F: SerialStatistic<P>,
384 P: InterpolationKind,
385{
386 fn from_series_with<Q>(series: impl Into<Vec1<TimeSeries<F>>>, mut interpolation: Q) -> Self
387 where
388 Q: FnMut() -> P::Output<F::Sample>,
389 {
390 let buffers =
391 series.into().map_into(|series| BufferedTimeSeries::new((interpolation)(), series));
392 TimeMatrix { created: Timestamp::now(), last: ObservationTime::default(), buffers }
393 }
394
395 /// Constructs a time matrix with the given sampling profile and interpolation.
396 ///
397 /// Statistics are default initialized.
398 pub fn new(profile: impl Into<SamplingProfile>, interpolation: P::Output<F::Sample>) -> Self
399 where
400 F: Default,
401 {
402 let sampling_intervals = profile.into().into_sampling_intervals();
403 TimeMatrix::from_series_with(sampling_intervals.map_into(TimeSeries::new), || {
404 interpolation.clone()
405 })
406 }
407
408 /// Constructs a time matrix with the given statistic.
409 pub fn with_statistic(
410 profile: impl Into<SamplingProfile>,
411 interpolation: P::Output<F::Sample>,
412 statistic: F,
413 ) -> Self {
414 let sampling_intervals = profile.into().into_sampling_intervals();
415 TimeMatrix::from_series_with(
416 sampling_intervals
417 .map_into(|window| TimeSeries::with_statistic(window, statistic.clone())),
418 || interpolation.clone(),
419 )
420 }
421
422 /// Folds the given sample and interpolations and gets the aggregation buffers.
423 ///
424 /// To fold a sample without serializing buffers, use [`Sampler::fold`].
425 ///
426 /// [`Sampler::fold`]: crate::experimental::series::Sampler::fold
427 pub fn fold_and_get_buffers(
428 &mut self,
429 sample: Timed<F::Sample>,
430 ) -> Result<SerializedBuffer, FoldError> {
431 self.fold(sample)?;
432 let series_buffers = self
433 .buffers
434 .try_map_ref(BufferedTimeSeries::serialize_and_get_buffer)
435 .map_err::<FoldError, _>(From::from)?;
436 self.serialize(series_buffers).map_err(From::from)
437 }
438
439 fn serialize(
440 &self,
441 series_buffers: Vec1<SerializedTimeSeries>,
442 ) -> io::Result<SerializedBuffer> {
443 use crate::experimental::clock::DurationExt;
444 use byteorder::{LittleEndian, WriteBytesExt};
445 use std::io::Write;
446
447 let created_timestamp = u32::try_from(self.created.quantize()).unwrap_or(u32::MAX);
448 let end_timestamp =
449 u32::try_from(self.last.last_update_timestamp.quantize()).unwrap_or(u32::MAX);
450
451 let mut buffer = vec![];
452 buffer.write_u8(1)?; // Version number.
453 buffer.write_u32::<LittleEndian>(created_timestamp)?; // Matrix creation time.
454 buffer.write_u32::<LittleEndian>(end_timestamp)?; // Last observed or interpolated sample
455 // time.
456 encoding::serialize_buffer_type_descriptors::<F, P>(&mut buffer)?; // Buffer descriptors.
457
458 for series in series_buffers {
459 const GRANULARITY_FIELD_LEN: usize = 2;
460 let len = u16::try_from(series.data.len() + GRANULARITY_FIELD_LEN).unwrap_or(u16::MAX);
461 let granularity =
462 u16::try_from(series.interval().duration().into_quanta()).unwrap_or(u16::MAX);
463
464 buffer.write_u16::<LittleEndian>(len)?;
465 buffer.write_u16::<LittleEndian>(granularity)?;
466 buffer.write_all(&series.data[..len as usize - GRANULARITY_FIELD_LEN])?;
467 }
468 Ok(SerializedBuffer {
469 data_semantic: format!("{}", <F as Statistic>::Semantic::display()),
470 data: buffer,
471 })
472 }
473}
474
475impl<F, R, P, A> TimeMatrix<PostAggregation<F, R>, P>
476where
477 PostAggregation<F, R>: SerialStatistic<P, Aggregation = A>,
478 F: Default + SerialStatistic<P>,
479 R: Clone + Fn(F::Aggregation) -> A,
480 P: InterpolationKind,
481 A: Clone,
482{
483 /// Constructs a time matrix with the default statistic and given transform for
484 /// post-aggregation.
485 pub fn with_transform(
486 profile: impl Into<SamplingProfile>,
487 interpolation: P::Output<<PostAggregation<F, R> as Statistic>::Sample>,
488 transform: R,
489 ) -> Self
490 where
491 R: Clone,
492 {
493 let sampling_intervals = profile.into().into_sampling_intervals();
494 TimeMatrix::from_series_with(
495 sampling_intervals
496 .map_into(|window| TimeSeries::with_transform(window, transform.clone())),
497 || interpolation.clone(),
498 )
499 }
500}
501
502impl<F, P> Default for TimeMatrix<F, P>
503where
504 F: Default + SerialStatistic<P>,
505 P: InterpolationKind,
506 P::Output<F::Sample>: Default,
507{
508 fn default() -> Self {
509 TimeMatrix::new(SamplingProfile::default(), P::Output::default())
510 }
511}
512
513impl<F, P> Interpolator for TimeMatrix<F, P>
514where
515 F: SerialStatistic<P>,
516 P: InterpolationKind,
517{
518 fn interpolate(&mut self, timestamp: Timestamp) -> Result<(), FoldError> {
519 let tick = self.last.tick(timestamp.into(), false)?;
520 Ok(for buffer in self.buffers.iter_mut() {
521 buffer.interpolate(tick)?;
522 })
523 }
524
525 fn interpolate_and_get_buffers(
526 &mut self,
527 timestamp: Timestamp,
528 ) -> Result<SerializedBuffer, FoldError> {
529 self.interpolate(timestamp)?;
530 let series_buffers = self
531 .buffers
532 .try_map_ref(BufferedTimeSeries::serialize_and_get_buffer)
533 .map_err::<FoldError, _>(From::from)?;
534 self.serialize(series_buffers).map_err(From::from)
535 }
536}
537
538impl<F, P> MatrixSampler<F::Sample> for TimeMatrix<F, P>
539where
540 F: SerialStatistic<P>,
541 P: InterpolationKind,
542{
543 fn fold(&mut self, sample: Timed<F::Sample>) -> Result<(), FoldError> {
544 let (timestamp, sample) = sample.into();
545 let tick = self.last.tick(timestamp, true)?;
546 Ok(for buffer in self.buffers.iter_mut() {
547 buffer.fold(tick, sample.clone())?;
548 })
549 }
550}
551
552#[cfg(test)]
553mod tests {
554 use fuchsia_async as fasync;
555
556 use crate::experimental::clock::{Timed, Timestamp};
557 use crate::experimental::series::interpolation::{ConstantSample, LastSample};
558 use crate::experimental::series::statistic::{
559 ArithmeticMean, LatchMax, Max, PostAggregation, Sum, Transform, Union,
560 };
561 use crate::experimental::series::{Interpolator, MatrixSampler, SamplingProfile, TimeMatrix};
562
563 fn fold_and_interpolate_f32(sampler: &mut impl MatrixSampler<f32>) {
564 sampler.fold(Timed::now(0.0)).unwrap();
565 sampler.fold(Timed::now(1.0)).unwrap();
566 sampler.fold(Timed::now(2.0)).unwrap();
567 let _buffers = sampler.interpolate(Timestamp::now()).unwrap();
568 }
569
570 // TODO(https://fxbug.dev/356218503): Replace this with meaningful unit tests that assert the
571 // outputs of a `TimeMatrix`.
572 // This "test" is considered successful as long as it builds.
573 #[test]
574 fn static_test_define_time_matrix() {
575 type Mean<T> = ArithmeticMean<T>;
576 type MeanTransform<T, F> = Transform<Mean<T>, F>;
577
578 let _exec = fasync::TestExecutor::new_with_fake_time();
579
580 // Arithmetic mean time matrices.
581 let _ = TimeMatrix::<Mean<f32>, ConstantSample>::default();
582 let _ = TimeMatrix::<Mean<f32>, LastSample>::new(
583 SamplingProfile::balanced(),
584 LastSample::or(0.0f32),
585 );
586 let _ = TimeMatrix::<_, ConstantSample>::with_statistic(
587 SamplingProfile::granular(),
588 ConstantSample::default(),
589 Mean::<f32>::default(),
590 );
591
592 // Discrete arithmetic mean time matrices.
593 let mut matrix = TimeMatrix::<MeanTransform<f32, i64>, LastSample>::with_transform(
594 SamplingProfile::highly_granular(),
595 LastSample::or(0.0f32),
596 |aggregation| aggregation.ceil() as i64,
597 );
598 fold_and_interpolate_f32(&mut matrix);
599 // This time matrix is constructed verbosely with no ad-hoc type definitions nor ergonomic
600 // constructors. This is as raw as it gets.
601 let mut matrix = TimeMatrix::<_, ConstantSample>::with_statistic(
602 SamplingProfile::default(),
603 ConstantSample::default(),
604 PostAggregation::<ArithmeticMean<f32>, _>::from_transform(|aggregation: f32| {
605 aggregation.ceil() as i64
606 }),
607 );
608 fold_and_interpolate_f32(&mut matrix);
609 }
610
611 // TODO(https://fxbug.dev/356218503): Replace this with meaningful unit tests that assert the
612 // outputs of a `TimeMatrix`.
613 // This "test" is considered successful as long as it builds.
614 #[test]
615 fn static_test_supported_statistic_and_interpolation_combinations() {
616 let _exec = fasync::TestExecutor::new_with_fake_time();
617
618 let _ = TimeMatrix::<ArithmeticMean<f32>, ConstantSample>::default();
619 let _ = TimeMatrix::<ArithmeticMean<f32>, LastSample>::default();
620 let _ = TimeMatrix::<LatchMax<u64>, LastSample>::default();
621 let _ = TimeMatrix::<Max<u64>, ConstantSample>::default();
622 let _ = TimeMatrix::<Max<u64>, LastSample>::default();
623 let _ = TimeMatrix::<Sum<u64>, ConstantSample>::default();
624 let _ = TimeMatrix::<Sum<u64>, LastSample>::default();
625 let _ = TimeMatrix::<Union<u64>, ConstantSample>::default();
626 let _ = TimeMatrix::<Union<u64>, LastSample>::default();
627 }
628
629 #[test]
630 fn time_matrix_with_uncompressed_buffer() {
631 let exec = fasync::TestExecutor::new_with_fake_time();
632 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
633 let mut time_matrix = TimeMatrix::<ArithmeticMean<f32>, ConstantSample>::new(
634 SamplingProfile::highly_granular(),
635 ConstantSample::default(),
636 );
637 let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
638 assert_eq!(
639 buffer.data,
640 vec![
641 1, // version number
642 3, 0, 0, 0, // created timestamp
643 3, 0, 0, 0, // last timestamp
644 0, 0, // type: uncompressed; subtype: f32
645 4, 0, // series 1: length in bytes
646 10, 0, // series 1 granularity: 10s
647 0, 0, // number of elements
648 4, 0, // series 2: length in bytes
649 60, 0, // series 2 granularity: 60s
650 0, 0, // number of elements
651 ]
652 );
653
654 time_matrix.fold(Timed::now(f32::from_bits(42u32))).unwrap();
655 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
656 let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
657 assert_eq!(
658 buffer.data,
659 vec![
660 1, // version number
661 3, 0, 0, 0, // created timestamp
662 10, 0, 0, 0, // last timestamp
663 0, 0, // type: uncompressed; subtype: f32
664 8, 0, // series 1: length in bytes
665 10, 0, // series 1 granularity: 10s
666 1, 0, // number of elements
667 42, 0, 0, 0, // item 1
668 4, 0, // series 2: length in bytes
669 60, 0, // series 2 granularity: 60s
670 0, 0, // number of elements
671 ]
672 );
673
674 // Advance several time steps to test ring buffer's `fill`
675 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(50_000_000_000));
676 let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
677 assert_eq!(
678 buffer.data,
679 vec![
680 1, // version number
681 3, 0, 0, 0, // created timestamp
682 50, 0, 0, 0, // last timestamp
683 0, 0, // type: uncompressed; subtype: f32
684 24, 0, // series 1: length in bytes
685 10, 0, // series 1 granularity: 10s
686 5, 0, // number of elements
687 42, 0, 0, 0, // item 1
688 0, 0, 0, 0, // item 2
689 0, 0, 0, 0, // item 3
690 0, 0, 0, 0, // item 4
691 0, 0, 0, 0, // item 5
692 4, 0, // series 2: length in bytes
693 60, 0, // series 2 granularity: 60s
694 0, 0, // number of elements
695 ]
696 );
697 }
698
699 #[test]
700 fn time_matrix_with_simple8b_rle_buffer() {
701 let exec = fasync::TestExecutor::new_with_fake_time();
702 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
703 let mut time_matrix = TimeMatrix::<Max<u64>, ConstantSample>::new(
704 SamplingProfile::highly_granular(),
705 ConstantSample::default(),
706 );
707 let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
708 assert_eq!(
709 buffer.data,
710 vec![
711 1, // version number
712 3, 0, 0, 0, // created timestamp
713 3, 0, 0, 0, // last timestamp
714 1, 0, // type: simple8b RLE; subtype: unsigned
715 7, 0, // series 1: length in bytes
716 10, 0, // series 1 granularity: 10s
717 0, 0, // number of selector elements and value blocks
718 0, 0, // head selector index
719 0, // number of values in last block
720 7, 0, // series 2: length in bytes
721 60, 0, // series 2 granularity: 60s
722 0, 0, // number of selector elements and value blocks
723 0, 0, // head selector index
724 0, // number of values in last block
725 ]
726 );
727
728 time_matrix.fold(Timed::now(15)).unwrap();
729 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
730 let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
731 assert_eq!(
732 buffer.data,
733 vec![
734 1, // version number
735 3, 0, 0, 0, // created timestamp
736 10, 0, 0, 0, // last timestamp
737 1, 0, // type: simple8b RLE; subtype: unsigned
738 16, 0, // series 1: length in bytes
739 10, 0, // series 1 granularity: 10s
740 1, 0, // number of selector elements and value blocks
741 0, 0, // head selector index
742 1, // number of values in last block
743 0x0f, // RLE selector
744 15, 0, 0, 0, 0, 0, 1, 0, // value 15 appears 1 time
745 7, 0, // series 2: length in bytes
746 60, 0, // series 2 granularity: 60s
747 0, 0, // number of selector elements and value blocks
748 0, 0, // head selector index
749 0, // number of values in last block
750 ]
751 );
752
753 // Advance several time steps to test ring buffer's `fill`
754 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(50_000_000_000));
755 let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
756 assert_eq!(
757 buffer.data,
758 vec![
759 1, // version number
760 3, 0, 0, 0, // created timestamp
761 50, 0, 0, 0, // last timestamp
762 1, 0, // type: simple8b RLE; subtype: unsigned
763 16, 0, // series 1: length in bytes
764 10, 0, // series 1 granularity: 10s
765 1, 0, // number of selector elements and value blocks
766 0, 0, // head selector index
767 5, // number of values in last block
768 0x03, // 4-bit selector
769 0x0f, 0, 0, 0, 0, 0, 0, 0, // values 15, 0, 0, 0, 0
770 7, 0, // series 2: length in bytes
771 60, 0, // series 2 granularity: 60s
772 0, 0, // number of selector elements and value blocks
773 0, 0, // head selector index
774 0, // number of values in last block
775 ]
776 );
777 }
778
779 #[test]
780 fn time_matrix_with_zigzag_simple8b_rle_buffer() {
781 let exec = fasync::TestExecutor::new_with_fake_time();
782 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
783 let mut time_matrix = TimeMatrix::<Max<i64>, ConstantSample>::new(
784 SamplingProfile::highly_granular(),
785 ConstantSample::default(),
786 );
787 let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
788 assert_eq!(
789 buffer.data,
790 vec![
791 1, // version number
792 3, 0, 0, 0, // created timestamp
793 3, 0, 0, 0, // last timestamp
794 1, 1, // type: simple8b RLE; subtype: signed (zigzag encoded)
795 7, 0, // series 1: length in bytes
796 10, 0, // series 1 granularity: 10s
797 0, 0, // number of selector elements and value blocks
798 0, 0, // head selector index
799 0, // number of values in last block
800 7, 0, // series 2: length in bytes
801 60, 0, // series 2 granularity: 60s
802 0, 0, // number of selector elements and value blocks
803 0, 0, // head selector index
804 0, // number of values in last block
805 ]
806 );
807
808 time_matrix.fold(Timed::now(-8)).unwrap();
809 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
810 let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
811 assert_eq!(
812 buffer.data,
813 vec![
814 1, // version number
815 3, 0, 0, 0, // created timestamp
816 10, 0, 0, 0, // last timestamp
817 1, 1, // type: simple8b RLE; subtype: signed (zigzag encoded)
818 16, 0, // series 1: length in bytes
819 10, 0, // series 1 granularity: 10s
820 1, 0, // number of selector elements and value blocks
821 0, 0, // head selector index
822 1, // number of values in last block
823 0x0f, // RLE selector
824 15, 0, 0, 0, 0, 0, 1, 0, // value -8 (encoded as 15) appears 1 time
825 7, 0, // series 2: length in bytes
826 60, 0, // series 2 granularity: 60s
827 0, 0, // number of selector elements and value blocks
828 0, 0, // head selector index
829 0, // number of values in last block
830 ]
831 );
832
833 // Advance several time steps to test ring buffer's `fill`
834 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(50_000_000_000));
835 let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
836 assert_eq!(
837 buffer.data,
838 vec![
839 1, // version number
840 3, 0, 0, 0, // created timestamp
841 50, 0, 0, 0, // last timestamp
842 1, 1, // type: simple8b RLE; subtype: signed (zigzag encoded)
843 16, 0, // series 1: length in bytes
844 10, 0, // series 1 granularity: 10s
845 1, 0, // number of selector elements and value blocks
846 0, 0, // head selector index
847 5, // number of values in last block
848 0x03, // 4-bit selector
849 0x0f, 0, 0, 0, 0, 0, 0, 0, // values -8 (encoded as 15), 0, 0, 0, 0
850 7, 0, // series 2: length in bytes
851 60, 0, // series 2 granularity: 60s
852 0, 0, // number of selector elements and value blocks
853 0, 0, // head selector index
854 0, // number of values in last block
855 ]
856 );
857 }
858
859 #[test]
860 fn time_matrix_with_delta_simple8b_rle_buffer() {
861 let exec = fasync::TestExecutor::new_with_fake_time();
862 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
863 let mut time_matrix = TimeMatrix::<LatchMax<u64>, LastSample>::new(
864 SamplingProfile::highly_granular(),
865 LastSample::or(0),
866 );
867 let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
868 assert_eq!(
869 buffer.data,
870 vec![
871 1, // version number
872 3, 0, 0, 0, // created timestamp
873 3, 0, 0, 0, // last timestamp
874 2, 0, // type: delta simple8b RLE; subtype: unsigned
875 7, 0, // series 1: length in bytes
876 10, 0, // series 1 granularity: 10s
877 0, 0, // number of base value + selector elements or value blocks
878 0, 0, // head selector index
879 0, // number of values in last block
880 7, 0, // series 2: length in bytes
881 60, 0, // series 2 granularity: 60s
882 0, 0, // number of base value + selector elements or value blocks
883 0, 0, // head selector index
884 0, // number of values in last block
885 ]
886 );
887
888 time_matrix.fold(Timed::now(42)).unwrap();
889 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
890 let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
891 assert_eq!(
892 buffer.data,
893 vec![
894 1, // version number
895 3, 0, 0, 0, // created timestamp
896 10, 0, 0, 0, // last timestamp
897 2, 0, // type: delta simple8b RLE; subtype: unsigned
898 15, 0, // series 1: length in bytes
899 10, 0, // series 1 granularity: 10s
900 1, 0, // number of base value + selector elements or value blocks
901 0, 0, // head selector index
902 0, // number of values in last block
903 42, 0, 0, 0, 0, 0, 0, 0, // base value
904 7, 0, // series 2: length in bytes
905 60, 0, // series 2 granularity: 60s
906 0, 0, // number of base value + selector elements or value blocks
907 0, 0, // head selector index
908 0, // number of values in last block
909 ]
910 );
911
912 time_matrix.fold(Timed::now(57)).unwrap();
913 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(20_000_000_000));
914 let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
915 assert_eq!(
916 buffer.data,
917 vec![
918 1, // version number
919 3, 0, 0, 0, // created timestamp
920 20, 0, 0, 0, // last timestamp
921 2, 0, // type: delta simple8b RLE; subtype: unsigned
922 24, 0, // series 1: length in bytes
923 10, 0, // series 1 granularity: 10s
924 2, 0, // number of base value + selector elements or value blocks
925 0, 0, // head selector index
926 1, // number of values in last block
927 42, 0, 0, 0, 0, 0, 0, 0, // base value
928 0x0f, // RLE selector
929 15, 0, 0, 0, 0, 0, 1, 0, // value 15 (delta) appears 1 time
930 7, 0, // series 2: length in bytes
931 60, 0, // series 2 granularity: 60s
932 0, 0, // number of base value + selector elements or value blocks
933 0, 0, // head selector index
934 0, // number of values in last block
935 ]
936 );
937
938 // Advance several time steps to test ring buffer's `fill`
939 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(50_000_000_000));
940 let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
941 assert_eq!(
942 buffer.data,
943 vec![
944 1, // version number
945 3, 0, 0, 0, // created timestamp
946 50, 0, 0, 0, // last timestamp
947 2, 0, // type: delta simple8b RLE; subtype: unsigned
948 24, 0, // series 1: length in bytes
949 10, 0, // series 1 granularity: 10s
950 2, 0, // number of base value + selector elements or value blocks
951 0, 0, // head selector index
952 4, // number of values in last block
953 42, 0, 0, 0, 0, 0, 0, 0, // base value
954 0x03, // 4-bit selector
955 0x0f, 0, 0, 0, 0, 0, 0, 0, // values 15, 0, 0, 0
956 7, 0, // series 2: length in bytes
957 60, 0, // series 2 granularity: 60s
958 0, 0, // number of base value + selector elements or value blocks
959 0, 0, // head selector index
960 0, // number of values in last block
961 ]
962 );
963 }
964
965 #[test]
966 fn time_matrix_with_delta_zigzag_simple8b_rle_buffer_i64() {
967 let exec = fasync::TestExecutor::new_with_fake_time();
968 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
969 let mut time_matrix = TimeMatrix::<Max<i64>, LastSample>::new(
970 SamplingProfile::highly_granular(),
971 LastSample::or(0),
972 );
973 let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
974 assert_eq!(
975 buffer.data,
976 vec![
977 1, // version number
978 3, 0, 0, 0, // created timestamp
979 3, 0, 0, 0, // last timestamp
980 2, 1, // type: delta simple8b RLE; subtype: signed
981 7, 0, // series 1: length in bytes
982 10, 0, // series 1 granularity: 10s
983 0, 0, // number of base value + selector elements or value blocks
984 0, 0, // head selector index
985 0, // number of values in last block
986 7, 0, // series 2: length in bytes
987 60, 0, // series 2 granularity: 60s
988 0, 0, // number of base value + selector elements or value blocks
989 0, 0, // head selector index
990 0, // number of values in last block
991 ]
992 );
993
994 time_matrix.fold(Timed::now(42)).unwrap();
995 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
996 let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
997 assert_eq!(
998 buffer.data,
999 vec![
1000 1, // version number
1001 3, 0, 0, 0, // created timestamp
1002 10, 0, 0, 0, // last timestamp
1003 2, 1, // type: delta simple8b RLE; subtype: signed
1004 15, 0, // series 1: length in bytes
1005 10, 0, // series 1 granularity: 10s
1006 1, 0, // number of base value + selector elements or value blocks
1007 0, 0, // head selector index
1008 0, // number of values in last block
1009 42, 0, 0, 0, 0, 0, 0, 0, // base value
1010 7, 0, // series 2: length in bytes
1011 60, 0, // series 2 granularity: 60s
1012 0, 0, // number of base value + selector elements or value blocks
1013 0, 0, // head selector index
1014 0, // number of values in last block
1015 ]
1016 );
1017
1018 time_matrix.fold(Timed::now(34)).unwrap();
1019 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(20_000_000_000));
1020 let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
1021 assert_eq!(
1022 buffer.data,
1023 vec![
1024 1, // version number
1025 3, 0, 0, 0, // created timestamp
1026 20, 0, 0, 0, // last timestamp
1027 2, 1, // type: delta simple8b RLE; subtype: signed
1028 24, 0, // series 1: length in bytes
1029 10, 0, // series 1 granularity: 10s
1030 2, 0, // number of base value + selector elements or value blocks
1031 0, 0, // head selector index
1032 1, // number of values in last block
1033 42, 0, 0, 0, 0, 0, 0, 0, // base value
1034 0x0f, // RLE selector
1035 15, 0, 0, 0, 0, 0, 1, 0, // value -8 (delta) encoded as 15, appearing 1 time
1036 7, 0, // series 2: length in bytes
1037 60, 0, // series 2 granularity: 60s
1038 0, 0, // number of base value + selector elements or value blocks
1039 0, 0, // head selector index
1040 0, // number of values in last block
1041 ]
1042 );
1043
1044 // Advance several time steps to test ring buffer's `fill`
1045 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(50_000_000_000));
1046 let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
1047 assert_eq!(
1048 buffer.data,
1049 vec![
1050 1, // version number
1051 3, 0, 0, 0, // created timestamp
1052 50, 0, 0, 0, // last timestamp
1053 2, 1, // type: delta simple8b RLE; subtype: signed
1054 24, 0, // series 1: length in bytes
1055 10, 0, // series 1 granularity: 10s
1056 2, 0, // number of base value + selector elements or value blocks
1057 0, 0, // head selector index
1058 4, // number of values in last block
1059 42, 0, 0, 0, 0, 0, 0, 0, // base value
1060 0x03, // 4-bit selector
1061 0x0f, 0, 0, 0, 0, 0, 0, 0, // diff values -8 (encoded as 15), 0, 0, 0
1062 7, 0, // series 2: length in bytes
1063 60, 0, // series 2 granularity: 60s
1064 0, 0, // number of base value + selector elements or value blocks
1065 0, 0, // head selector index
1066 0, // number of values in last block
1067 ]
1068 );
1069 }
1070
1071 #[test]
1072 fn time_matrix_with_delta_zigzag_simple8b_rle_buffer_u64() {
1073 let exec = fasync::TestExecutor::new_with_fake_time();
1074 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
1075 let mut time_matrix = TimeMatrix::<Max<u64>, LastSample>::new(
1076 SamplingProfile::highly_granular(),
1077 LastSample::or(0),
1078 );
1079 let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
1080 assert_eq!(
1081 buffer.data,
1082 vec![
1083 1, // version number
1084 3, 0, 0, 0, // created timestamp
1085 3, 0, 0, 0, // last timestamp
1086 2, 2, // type: delta simple8b RLE; subtype: unsigned with signed diff
1087 7, 0, // series 1: length in bytes
1088 10, 0, // series 1 granularity: 10s
1089 0, 0, // number of base value + selector elements or value blocks
1090 0, 0, // head selector index
1091 0, // number of values in last block
1092 7, 0, // series 2: length in bytes
1093 60, 0, // series 2 granularity: 60s
1094 0, 0, // number of base value + selector elements or value blocks
1095 0, 0, // head selector index
1096 0, // number of values in last block
1097 ]
1098 );
1099
1100 time_matrix.fold(Timed::now(1)).unwrap();
1101 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
1102 let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
1103 assert_eq!(
1104 buffer.data,
1105 vec![
1106 1, // version number
1107 3, 0, 0, 0, // created timestamp
1108 10, 0, 0, 0, // last timestamp
1109 2, 2, // type: delta simple8b RLE; subtype: unsigned with signed diff
1110 15, 0, // series 1: length in bytes
1111 10, 0, // series 1 granularity: 10s
1112 1, 0, // number of base value + selector elements or value blocks
1113 0, 0, // head selector index
1114 0, // number of values in last block
1115 1, 0, 0, 0, 0, 0, 0, 0, // base value
1116 7, 0, // series 2: length in bytes
1117 60, 0, // series 2 granularity: 60s
1118 0, 0, // number of base value + selector elements or value blocks
1119 0, 0, // head selector index
1120 0, // number of values in last block
1121 ]
1122 );
1123
1124 time_matrix.fold(Timed::now(u64::MAX)).unwrap();
1125 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(20_000_000_000));
1126 let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
1127 assert_eq!(
1128 buffer.data,
1129 vec![
1130 1, // version number
1131 3, 0, 0, 0, // created timestamp
1132 20, 0, 0, 0, // last timestamp
1133 2, 2, // type: delta simple8b RLE; subtype: unsigned with signed diff
1134 24, 0, // series 1: length in bytes
1135 10, 0, // series 1 granularity: 10s
1136 2, 0, // number of base value + selector elements or value blocks
1137 0, 0, // head selector index
1138 1, // number of values in last block
1139 1, 0, 0, 0, 0, 0, 0, 0, // base value
1140 0x0f, // RLE selector
1141 3, 0, 0, 0, 0, 0, 1, 0, // value -2 (delta) encoded as 3, appearing 1 time
1142 7, 0, // series 2: length in bytes
1143 60, 0, // series 2 granularity: 60s
1144 0, 0, // number of base value + selector elements or value blocks
1145 0, 0, // head selector index
1146 0, // number of values in last block
1147 ]
1148 );
1149
1150 // Advance several time steps to test ring buffer's `fill`
1151 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(50_000_000_000));
1152 let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
1153 assert_eq!(
1154 buffer.data,
1155 vec![
1156 1, // version number
1157 3, 0, 0, 0, // created timestamp
1158 50, 0, 0, 0, // last timestamp
1159 2, 2, // type: delta simple8b RLE; subtype: unsigned with signed diff
1160 24, 0, // series 1: length in bytes
1161 10, 0, // series 1 granularity: 10s
1162 2, 0, // number of base value + selector elements or value blocks
1163 0, 0, // head selector index
1164 4, // number of values in last block
1165 1, 0, 0, 0, 0, 0, 0, 0, // base value
1166 0x01, // 2-bit selector
1167 3, 0, 0, 0, 0, 0, 0, 0, // diff values -2 (encoded as 3), 0, 0, 0
1168 7, 0, // series 2: length in bytes
1169 60, 0, // series 2 granularity: 60s
1170 0, 0, // number of base value + selector elements or value blocks
1171 0, 0, // head selector index
1172 0, // number of values in last block
1173 ]
1174 );
1175 }
1176}