windowed_stats/experimental/series/mod.rs
1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Round-robin multi-resolution time series.
6
7mod interval;
8
9pub(crate) mod buffer;
10
11pub mod interpolation;
12pub mod metadata;
13pub mod statistic;
14
15use derivative::Derivative;
16use std::fmt::{Debug, Display};
17use std::io;
18use std::marker::PhantomData;
19use std::num::NonZeroUsize;
20
21use crate::experimental::Vec1;
22use crate::experimental::clock::{ObservationTime, Tick, Timed, Timestamp, TimestampExt};
23use crate::experimental::series::buffer::{
24 BufferStrategy, DeltaSimple8bRle, DeltaZigzagSimple8bRle, RingBuffer, Simple8bRle,
25 Uncompressed, ZigzagSimple8bRle, encoding,
26};
27use crate::experimental::series::interpolation::{
28 ConstantSample, Interpolation, InterpolationKind, LastSample,
29};
30use crate::experimental::series::metadata::{BitSetIndex, Metadata};
31use crate::experimental::series::statistic::{
32 FoldError, PostAggregation, SerialStatistic, Statistic,
33};
34
35pub use crate::experimental::series::buffer::Capacity;
36pub use crate::experimental::series::interval::{SamplingInterval, SamplingProfile};
37
38/// A [`TimeMatrix`] type that can be advanced forward in time.
39///
40/// This trait provides tick operations for `[TimeMatrix`] types. Ticking a [`TimeMatrix`] causes
41/// sample interpolation within and aggregation propagation across [`SamplingInterval`]s.
42///
43/// Importantly, this trait is `dyn` compatible and type erased; it can be used to tick a
44/// [`TimeMatrix`] regardless of its input type parameters (sample type, interpolation type, etc.).
45///
46/// See also the [`TimeMatrixFold`] subtrait.
47pub trait TimeMatrixTick {
48 fn tick(&mut self, timestamp: Timestamp) -> Result<(), FoldError>;
49
50 fn tick_and_get_buffers(&mut self, timestamp: Timestamp)
51 -> Result<SerializedBuffer, FoldError>;
52}
53
54/// A [`TimeMatrix`] type that can sample data.
55///
56/// This trait provides fold operations for `TimeMatrix` types. Folding samples updates
57/// aggregations and advances a [`TimeMatrix`] forward in time.
58///
59/// See also the [`TimeMatrixTick`] supertrait. This trait supports both ticking and sampling, but
60/// is not completely type erased: the sample input type parameter `T` is needed.
61pub trait TimeMatrixFold<T>: TimeMatrixTick {
62 fn fold(&mut self, sample: Timed<T>) -> Result<(), FoldError>;
63}
64
65/// A type that describes the semantics of data folded by `Sampler`s.
66///
67/// Data semantics determine how statistics are interpreted and time series are aggregated and
68/// buffered.
69pub trait DataSemantic {
70 type Metadata: Metadata;
71
72 fn display() -> impl Display;
73}
74
75/// A continually increasing value.
76///
77/// Counters are analogous to an odometer in a vehicle.
78#[derive(Debug)]
79pub enum Counter {}
80
81impl BufferStrategy<u64, LastSample> for Counter {
82 type Buffer = DeltaSimple8bRle;
83}
84
85impl DataSemantic for Counter {
86 type Metadata = ();
87
88 fn display() -> impl Display {
89 "counter"
90 }
91}
92
93/// A fluctuating value.
94///
95/// Gauges are analogous to a speedometer in a vehicle.
96#[derive(Debug)]
97pub enum Gauge {}
98
99impl<P> BufferStrategy<f32, P> for Gauge
100where
101 P: InterpolationKind,
102{
103 type Buffer = Uncompressed<f32>;
104}
105
106impl BufferStrategy<i64, ConstantSample> for Gauge {
107 type Buffer = ZigzagSimple8bRle;
108}
109
110impl BufferStrategy<i64, LastSample> for Gauge {
111 type Buffer = DeltaZigzagSimple8bRle<i64>;
112}
113
114impl BufferStrategy<u64, ConstantSample> for Gauge {
115 type Buffer = Simple8bRle;
116}
117
118impl BufferStrategy<u64, LastSample> for Gauge {
119 type Buffer = DeltaZigzagSimple8bRle<u64>;
120}
121
122impl DataSemantic for Gauge {
123 type Metadata = ();
124
125 fn display() -> impl Display {
126 "gauge"
127 }
128}
129
130/// A semantic like `Gauge` that avoids [`DeltaZigzagSimple8bRle`] until we fix
131/// some other issues.
132///
133/// TODO(https://fxbug.dev/436253782): Delete this type when the viewer can
134/// decode `DeltaZigzagSimple8bRle`.
135///
136/// OR
137///
138/// TODO(https://fxbug.dev/457443158): Delete this type when
139/// `ConstantAggregation` is introduced and netstack's time series is changed to
140/// `TimeMatrix<Diff<u64>, ConstantAggregation>`.
141///
142/// Whichever happens first.
143pub enum GaugeForceSimple8bRle {}
144
145impl<T: Into<u64>> BufferStrategy<T, LastSample> for GaugeForceSimple8bRle {
146 type Buffer = Simple8bRle;
147}
148
149impl DataSemantic for GaugeForceSimple8bRle {
150 type Metadata = ();
151
152 fn display() -> impl Display {
153 "gauge"
154 }
155}
156
157// TODO(https://fxbug.dev/375255178): Spell "bit set" consistently. `BitSet` suggests `bit_set` and
158// "bit set", but there are notably some instances of "bitset",
159// which instead suggests `Bitset` and `bitset`.
160/// A set of Boolean values.
161///
162/// Bit sets are analogous to indicator lamps in a vehicle.
163#[derive(Debug)]
164pub enum BitSet {}
165
166impl<A, P> BufferStrategy<A, P> for BitSet
167where
168 Simple8bRle: RingBuffer<A>,
169 P: InterpolationKind,
170{
171 type Buffer = Simple8bRle;
172}
173
174impl DataSemantic for BitSet {
175 type Metadata = BitSetIndex;
176
177 fn display() -> impl Display {
178 "bitset"
179 }
180}
181
182/// A buffer of serialized data from a time series.
183#[derive(Clone, Debug)]
184struct SerializedTimeSeries {
185 interval: SamplingInterval,
186 data: Vec<u8>,
187}
188
189impl SerializedTimeSeries {
190 /// Gets the sampling interval for the aggregations in the buffer.
191 pub fn interval(&self) -> &SamplingInterval {
192 &self.interval
193 }
194
195 /// Gets the serialized data.
196 pub fn data(&self) -> &[u8] {
197 self.data.as_slice()
198 }
199}
200
201/// An unbuffered statistical time series specification.
202///
203/// This type samples and interpolates timed data and produces aggregations per its statistic and
204/// sampling interval. It is a specification insofar that it does **not** buffer the series of
205/// aggregations.
206#[derive(Clone, Debug)]
207struct TimeSeries<F>
208where
209 F: Statistic,
210{
211 interval: SamplingInterval,
212 statistic: F,
213}
214
215impl<F> TimeSeries<F>
216where
217 F: Statistic,
218{
219 pub fn new(interval: SamplingInterval) -> Self
220 where
221 F: Default,
222 {
223 TimeSeries { interval, statistic: F::default() }
224 }
225
226 pub const fn with_statistic(interval: SamplingInterval, statistic: F) -> Self {
227 TimeSeries { interval, statistic }
228 }
229
230 /// Folds interpolations for intervals intersected by the given [`Tick`] and gets the
231 /// aggregations.
232 ///
233 /// The returned iterator performs the computation and so it must be consumed to change the
234 /// state of the statistic.
235 ///
236 /// [`Tick`]: crate::experimental::clock::Tick
237 #[must_use]
238 fn interpolate_and_get_aggregations<'i, P>(
239 &'i mut self,
240 interpolation: &'i mut P,
241 tick: Tick,
242 ) -> impl 'i + Iterator<Item = Result<(NonZeroUsize, F::Aggregation), FoldError>>
243 where
244 P: Interpolation<F::Sample>,
245 {
246 self.interval.fold_and_get_expirations(tick, PhantomData::<F::Sample>).flat_map(
247 move |expiration| {
248 expiration
249 .interpolate_and_get_aggregation(&mut self.statistic, interpolation)
250 .transpose()
251 },
252 )
253 }
254
255 /// Folds the given sample and interpolations for intervals intersected by the given [`Tick`]
256 /// and gets the aggregations.
257 ///
258 /// The returned iterator performs the computation and so it must be consumed to change the
259 /// state of the statistic.
260 ///
261 /// [`Tick`]: crate::experimental::clock::Tick
262 #[must_use]
263 fn fold_and_get_aggregations<'i, P>(
264 &'i mut self,
265 interpolation: &'i mut P,
266 tick: Tick,
267 sample: F::Sample,
268 ) -> impl 'i + Iterator<Item = Result<(NonZeroUsize, F::Aggregation), FoldError>>
269 where
270 P: Interpolation<F::Sample>,
271 {
272 self.interval.fold_and_get_expirations(tick, sample).flat_map(move |expiration| {
273 expiration.fold_and_get_aggregation(&mut self.statistic, interpolation).transpose()
274 })
275 }
276
277 /// Gets the sampling interval of the series.
278 pub fn interval(&self) -> &SamplingInterval {
279 &self.interval
280 }
281}
282
283impl<F, R, A> TimeSeries<PostAggregation<F, R>>
284where
285 F: Default + Statistic,
286 R: Clone + Fn(F::Aggregation) -> A,
287 A: Clone,
288{
289 pub fn with_transform(interval: SamplingInterval, transform: R) -> Self {
290 TimeSeries { interval, statistic: PostAggregation::from_transform(transform) }
291 }
292}
293
294/// A buffered round-robin statistical time series.
295///
296/// This type composes a [`TimeSeries`] with a round-robin buffer of aggregations and interpolation
297/// state. Aggregations produced by the time series when sampling or interpolating are pushed into
298/// the buffer.
299#[derive(Derivative)]
300#[derivative(
301 Clone(bound = "F: Clone, F::Buffer: Clone, P::Output<F::Sample>: Clone,"),
302 Debug(bound = "F: Debug,
303 F::Buffer: Debug,
304 P::Output<F::Sample>: Debug,")
305)]
306struct BufferedTimeSeries<F, P>
307where
308 F: SerialStatistic<P>,
309 P: InterpolationKind,
310{
311 buffer: F::Buffer,
312 interpolation: P::Output<F::Sample>,
313 series: TimeSeries<F>,
314}
315
316impl<F, P> BufferedTimeSeries<F, P>
317where
318 F: SerialStatistic<P>,
319 P: InterpolationKind,
320{
321 pub fn new(interpolation: P::Output<F::Sample>, series: TimeSeries<F>) -> Self {
322 let buffer = F::buffer(&series.interval);
323 BufferedTimeSeries { buffer, interpolation, series }
324 }
325
326 /// Folds interpolations for intervals intersected by the given [`Tick`] and buffers the
327 /// aggregations.
328 ///
329 /// # Errors
330 ///
331 /// Returns an error if sampling fails.
332 ///
333 /// [`Tick`]: crate::experimental::clock::Tick
334 fn interpolate(&mut self, tick: Tick) -> Result<(), FoldError> {
335 for aggregation in
336 self.series.interpolate_and_get_aggregations(&mut self.interpolation, tick)
337 {
338 let (count, aggregation) = aggregation?;
339 if count.get() == 1 {
340 self.buffer.push(aggregation);
341 } else {
342 self.buffer.fill(aggregation, count);
343 }
344 }
345 Ok(())
346 }
347
348 /// Folds the given sample and interpolations for intervals intersected by the given [`Tick`]
349 /// and buffers the aggregations.
350 ///
351 /// # Errors
352 ///
353 /// Returns an error if sampling fails.
354 ///
355 /// [`Tick`]: crate::experimental::clock::Tick
356 fn fold(&mut self, tick: Tick, sample: F::Sample) -> Result<(), FoldError> {
357 for aggregation in
358 self.series.fold_and_get_aggregations(&mut self.interpolation, tick, sample)
359 {
360 let (count, aggregation) = aggregation?;
361 if count.get() == 1 {
362 self.buffer.push(aggregation);
363 } else {
364 self.buffer.fill(aggregation, count);
365 }
366 }
367 Ok(())
368 }
369
370 pub fn serialize_and_get_buffer(&self) -> io::Result<SerializedTimeSeries> {
371 let mut data = vec![];
372 self.buffer.serialize(&mut data)?;
373 Ok(SerializedTimeSeries { interval: *self.series.interval(), data })
374 }
375}
376
377/// A buffer of data from time matrix.
378#[derive(Clone, Debug, PartialEq)]
379pub struct SerializedBuffer {
380 pub data_semantic: String,
381 pub data: Vec<u8>,
382}
383
384impl SerializedBuffer {
385 /// Records the current state of this `TimeMatrix` into `node`.
386 pub fn write_to_inspect(self, node: &fuchsia_inspect::Node) {
387 let Self { data_semantic, data } = self;
388 node.record_string("type", data_semantic);
389 node.record_bytes("data", data);
390 }
391
392 /// Records an attempt at retrieving a serialized buffer to inspect.
393 pub fn write_to_inspect_or_error<E: Debug>(
394 result: Result<Self, E>,
395 node: &fuchsia_inspect::Node,
396 ) {
397 match result {
398 Ok(b) => b.write_to_inspect(node),
399 Err(e) => node.record_string("type", format!("error: {:?}", e)),
400 }
401 }
402}
403
404/// One or more statistical round-robin time series.
405///
406/// A time matrix is a round-robin multi-resolution time series that samples and interpolates timed
407/// data, computes statistical aggregations for elapsed [sampling intervals][`SamplingInterval`],
408/// and buffers those aggregations. The sample data, statistic, and interpolation of series in a
409/// time matrix must be the same, but the sampling intervals can and should differ.
410#[derive(Derivative)]
411#[derivative(
412 Clone(bound = "F: Clone, F::Buffer: Clone, P::Output<F::Sample>: Clone,"),
413 Debug(bound = "F: Debug,
414 F::Buffer: Debug,
415 P::Output<F::Sample>: Debug,")
416)]
417pub struct TimeMatrix<F, P>
418where
419 F: SerialStatistic<P>,
420 P: InterpolationKind,
421{
422 created: Timestamp,
423 last: ObservationTime,
424 buffers: Vec1<BufferedTimeSeries<F, P>>,
425}
426
427impl<F, P> TimeMatrix<F, P>
428where
429 F: SerialStatistic<P>,
430 P: InterpolationKind,
431{
432 fn from_series_with<Q>(series: impl Into<Vec1<TimeSeries<F>>>, mut interpolation: Q) -> Self
433 where
434 Q: FnMut() -> P::Output<F::Sample>,
435 {
436 let buffers =
437 series.into().map_into(|series| BufferedTimeSeries::new((interpolation)(), series));
438 TimeMatrix { created: Timestamp::now(), last: ObservationTime::default(), buffers }
439 }
440
441 /// Constructs a time matrix with the given sampling profile and interpolation.
442 ///
443 /// Statistics are default initialized.
444 pub fn new(profile: impl Into<SamplingProfile>, interpolation: P::Output<F::Sample>) -> Self
445 where
446 F: Default,
447 {
448 let sampling_intervals = profile.into().into_sampling_intervals();
449 TimeMatrix::from_series_with(sampling_intervals.map_into(TimeSeries::new), || {
450 interpolation.clone()
451 })
452 }
453
454 /// Constructs a time matrix with the given statistic.
455 pub fn with_statistic(
456 profile: impl Into<SamplingProfile>,
457 interpolation: P::Output<F::Sample>,
458 statistic: F,
459 ) -> Self {
460 let sampling_intervals = profile.into().into_sampling_intervals();
461 TimeMatrix::from_series_with(
462 sampling_intervals
463 .map_into(|window| TimeSeries::with_statistic(window, statistic.clone())),
464 || interpolation.clone(),
465 )
466 }
467
468 /// Folds the given sample and interpolations and gets the aggregation buffers.
469 ///
470 /// To fold a sample without serializing buffers, use [`Sampler::fold`].
471 ///
472 /// [`Sampler::fold`]: crate::experimental::series::Sampler::fold
473 pub fn fold_and_get_buffers(
474 &mut self,
475 sample: Timed<F::Sample>,
476 ) -> Result<SerializedBuffer, FoldError> {
477 self.fold(sample)?;
478 let series_buffers = self
479 .buffers
480 .try_map_ref(BufferedTimeSeries::serialize_and_get_buffer)
481 .map_err::<FoldError, _>(From::from)?;
482 self.serialize(series_buffers).map_err(From::from)
483 }
484
485 fn serialize(
486 &self,
487 series_buffers: Vec1<SerializedTimeSeries>,
488 ) -> io::Result<SerializedBuffer> {
489 use crate::experimental::clock::DurationExt;
490 use byteorder::{LittleEndian, WriteBytesExt};
491 use std::io::Write;
492
493 let created_timestamp = u32::try_from(self.created.quantize()).unwrap_or(u32::MAX);
494 let end_timestamp =
495 u32::try_from(self.last.last_update_timestamp.quantize()).unwrap_or(u32::MAX);
496
497 let mut buffer = vec![];
498 buffer.write_u8(1)?; // Version number.
499 buffer.write_u32::<LittleEndian>(created_timestamp)?; // Matrix creation time.
500 buffer.write_u32::<LittleEndian>(end_timestamp)?; // Last observed or interpolated sample
501 // time.
502 encoding::serialize_buffer_type_descriptors::<F, P>(&mut buffer)?; // Buffer descriptors.
503
504 for series in series_buffers {
505 const GRANULARITY_FIELD_LEN: usize = 2;
506 let len = u16::try_from(series.data.len() + GRANULARITY_FIELD_LEN).unwrap_or(u16::MAX);
507 let granularity =
508 u16::try_from(series.interval().duration().into_quanta()).unwrap_or(u16::MAX);
509
510 buffer.write_u16::<LittleEndian>(len)?;
511 buffer.write_u16::<LittleEndian>(granularity)?;
512 buffer.write_all(&series.data[..len as usize - GRANULARITY_FIELD_LEN])?;
513 }
514 Ok(SerializedBuffer {
515 data_semantic: format!("{}", <F as Statistic>::Semantic::display()),
516 data: buffer,
517 })
518 }
519}
520
521impl<F, R, P, A> TimeMatrix<PostAggregation<F, R>, P>
522where
523 PostAggregation<F, R>: SerialStatistic<P, Aggregation = A>,
524 F: Default + SerialStatistic<P>,
525 R: Clone + Fn(F::Aggregation) -> A,
526 P: InterpolationKind,
527 A: Clone,
528{
529 /// Constructs a time matrix with the default statistic and given transform for
530 /// post-aggregation.
531 pub fn with_transform(
532 profile: impl Into<SamplingProfile>,
533 interpolation: P::Output<<PostAggregation<F, R> as Statistic>::Sample>,
534 transform: R,
535 ) -> Self
536 where
537 R: Clone,
538 {
539 let sampling_intervals = profile.into().into_sampling_intervals();
540 TimeMatrix::from_series_with(
541 sampling_intervals
542 .map_into(|window| TimeSeries::with_transform(window, transform.clone())),
543 || interpolation.clone(),
544 )
545 }
546}
547
548impl<F, P> Default for TimeMatrix<F, P>
549where
550 F: Default + SerialStatistic<P>,
551 P: InterpolationKind,
552 P::Output<F::Sample>: Default,
553{
554 fn default() -> Self {
555 TimeMatrix::new(SamplingProfile::default(), P::Output::default())
556 }
557}
558
559impl<F, P> TimeMatrixFold<F::Sample> for TimeMatrix<F, P>
560where
561 F: SerialStatistic<P>,
562 P: InterpolationKind,
563{
564 fn fold(&mut self, sample: Timed<F::Sample>) -> Result<(), FoldError> {
565 let (timestamp, sample) = sample.into();
566 let tick = self.last.tick(timestamp, true)?;
567 Ok(for buffer in self.buffers.iter_mut() {
568 buffer.fold(tick, sample.clone())?;
569 })
570 }
571}
572
573impl<F, P> TimeMatrixTick for TimeMatrix<F, P>
574where
575 F: SerialStatistic<P>,
576 P: InterpolationKind,
577{
578 fn tick(&mut self, timestamp: Timestamp) -> Result<(), FoldError> {
579 let tick = self.last.tick(timestamp.into(), false)?;
580 Ok(for buffer in self.buffers.iter_mut() {
581 buffer.interpolate(tick)?;
582 })
583 }
584
585 fn tick_and_get_buffers(
586 &mut self,
587 timestamp: Timestamp,
588 ) -> Result<SerializedBuffer, FoldError> {
589 self.tick(timestamp)?;
590 let series_buffers = self
591 .buffers
592 .try_map_ref(BufferedTimeSeries::serialize_and_get_buffer)
593 .map_err::<FoldError, _>(From::from)?;
594 self.serialize(series_buffers).map_err(From::from)
595 }
596}
597
598#[cfg(test)]
599mod tests {
600 use fuchsia_async as fasync;
601
602 use crate::experimental::clock::{Timed, Timestamp};
603 use crate::experimental::series::interpolation::{ConstantSample, LastSample};
604 use crate::experimental::series::statistic::{
605 ArithmeticMean, LatchMax, Max, PostAggregation, Sum, Transform, Union,
606 };
607 use crate::experimental::series::{
608 SamplingProfile, TimeMatrix, TimeMatrixFold, TimeMatrixTick,
609 };
610
611 fn fold_and_interpolate_f32(matrix: &mut impl TimeMatrixFold<f32>) {
612 matrix.fold(Timed::now(0.0)).unwrap();
613 matrix.fold(Timed::now(1.0)).unwrap();
614 matrix.fold(Timed::now(2.0)).unwrap();
615 matrix.tick(Timestamp::now()).unwrap();
616 }
617
618 // TODO(https://fxbug.dev/356218503): Replace this with meaningful unit tests that assert the
619 // outputs of a `TimeMatrix`.
620 // This "test" is considered successful as long as it builds.
621 #[test]
622 fn static_test_define_time_matrix() {
623 type Mean<T> = ArithmeticMean<T>;
624 type MeanTransform<T, F> = Transform<Mean<T>, F>;
625
626 let _exec = fasync::TestExecutor::new_with_fake_time();
627
628 // Arithmetic mean time matrices.
629 let _ = TimeMatrix::<Mean<f32>, ConstantSample>::default();
630 let _ = TimeMatrix::<Mean<f32>, LastSample>::new(
631 SamplingProfile::balanced(),
632 LastSample::or(0.0f32),
633 );
634 let _ = TimeMatrix::<_, ConstantSample>::with_statistic(
635 SamplingProfile::granular(),
636 ConstantSample::default(),
637 Mean::<f32>::default(),
638 );
639
640 // Discrete arithmetic mean time matrices.
641 let mut matrix = TimeMatrix::<MeanTransform<f32, i64>, LastSample>::with_transform(
642 SamplingProfile::highly_granular(),
643 LastSample::or(0.0f32),
644 |aggregation| aggregation.ceil() as i64,
645 );
646 fold_and_interpolate_f32(&mut matrix);
647 // This time matrix is constructed verbosely with no ad-hoc type definitions nor ergonomic
648 // constructors. This is as raw as it gets.
649 let mut matrix = TimeMatrix::<_, ConstantSample>::with_statistic(
650 SamplingProfile::default(),
651 ConstantSample::default(),
652 PostAggregation::<ArithmeticMean<f32>, _>::from_transform(|aggregation: f32| {
653 aggregation.ceil() as i64
654 }),
655 );
656 fold_and_interpolate_f32(&mut matrix);
657 }
658
659 // TODO(https://fxbug.dev/356218503): Replace this with meaningful unit tests that assert the
660 // outputs of a `TimeMatrix`.
661 // This "test" is considered successful as long as it builds.
662 #[test]
663 fn static_test_supported_statistic_and_interpolation_combinations() {
664 let _exec = fasync::TestExecutor::new_with_fake_time();
665
666 let _ = TimeMatrix::<ArithmeticMean<f32>, ConstantSample>::default();
667 let _ = TimeMatrix::<ArithmeticMean<f32>, LastSample>::default();
668 let _ = TimeMatrix::<LatchMax<u64>, LastSample>::default();
669 let _ = TimeMatrix::<Max<u64>, ConstantSample>::default();
670 let _ = TimeMatrix::<Max<u64>, LastSample>::default();
671 let _ = TimeMatrix::<Sum<u64>, ConstantSample>::default();
672 let _ = TimeMatrix::<Sum<u64>, LastSample>::default();
673 let _ = TimeMatrix::<Union<u64>, ConstantSample>::default();
674 let _ = TimeMatrix::<Union<u64>, LastSample>::default();
675 }
676
677 #[test]
678 fn time_matrix_with_uncompressed_buffer() {
679 let exec = fasync::TestExecutor::new_with_fake_time();
680 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
681 let mut time_matrix = TimeMatrix::<ArithmeticMean<f32>, ConstantSample>::new(
682 SamplingProfile::highly_granular(),
683 ConstantSample::default(),
684 );
685 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
686 assert_eq!(
687 buffer.data,
688 vec![
689 1, // version number
690 3, 0, 0, 0, // created timestamp
691 3, 0, 0, 0, // last timestamp
692 0, 0, // type: uncompressed; subtype: f32
693 4, 0, // series 1: length in bytes
694 10, 0, // series 1 granularity: 10s
695 0, 0, // number of elements
696 4, 0, // series 2: length in bytes
697 60, 0, // series 2 granularity: 60s
698 0, 0, // number of elements
699 ]
700 );
701
702 time_matrix.fold(Timed::now(f32::from_bits(42u32))).unwrap();
703 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
704 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
705 assert_eq!(
706 buffer.data,
707 vec![
708 1, // version number
709 3, 0, 0, 0, // created timestamp
710 10, 0, 0, 0, // last timestamp
711 0, 0, // type: uncompressed; subtype: f32
712 8, 0, // series 1: length in bytes
713 10, 0, // series 1 granularity: 10s
714 1, 0, // number of elements
715 42, 0, 0, 0, // item 1
716 4, 0, // series 2: length in bytes
717 60, 0, // series 2 granularity: 60s
718 0, 0, // number of elements
719 ]
720 );
721
722 // Advance several time steps to test ring buffer's `fill`
723 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(50_000_000_000));
724 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
725 assert_eq!(
726 buffer.data,
727 vec![
728 1, // version number
729 3, 0, 0, 0, // created timestamp
730 50, 0, 0, 0, // last timestamp
731 0, 0, // type: uncompressed; subtype: f32
732 24, 0, // series 1: length in bytes
733 10, 0, // series 1 granularity: 10s
734 5, 0, // number of elements
735 42, 0, 0, 0, // item 1
736 0, 0, 0, 0, // item 2
737 0, 0, 0, 0, // item 3
738 0, 0, 0, 0, // item 4
739 0, 0, 0, 0, // item 5
740 4, 0, // series 2: length in bytes
741 60, 0, // series 2 granularity: 60s
742 0, 0, // number of elements
743 ]
744 );
745 }
746
747 #[test]
748 fn time_matrix_with_simple8b_rle_buffer() {
749 let exec = fasync::TestExecutor::new_with_fake_time();
750 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
751 let mut time_matrix = TimeMatrix::<Max<u64>, ConstantSample>::new(
752 SamplingProfile::highly_granular(),
753 ConstantSample::default(),
754 );
755 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
756 assert_eq!(
757 buffer.data,
758 vec![
759 1, // version number
760 3, 0, 0, 0, // created timestamp
761 3, 0, 0, 0, // last timestamp
762 1, 0, // type: simple8b RLE; subtype: unsigned
763 7, 0, // series 1: length in bytes
764 10, 0, // series 1 granularity: 10s
765 0, 0, // number of selector elements and value blocks
766 0, 0, // head selector index
767 0, // number of values in last block
768 7, 0, // series 2: length in bytes
769 60, 0, // series 2 granularity: 60s
770 0, 0, // number of selector elements and value blocks
771 0, 0, // head selector index
772 0, // number of values in last block
773 ]
774 );
775
776 time_matrix.fold(Timed::now(15)).unwrap();
777 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
778 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
779 assert_eq!(
780 buffer.data,
781 vec![
782 1, // version number
783 3, 0, 0, 0, // created timestamp
784 10, 0, 0, 0, // last timestamp
785 1, 0, // type: simple8b RLE; subtype: unsigned
786 16, 0, // series 1: length in bytes
787 10, 0, // series 1 granularity: 10s
788 1, 0, // number of selector elements and value blocks
789 0, 0, // head selector index
790 1, // number of values in last block
791 0x0f, // RLE selector
792 15, 0, 0, 0, 0, 0, 1, 0, // value 15 appears 1 time
793 7, 0, // series 2: length in bytes
794 60, 0, // series 2 granularity: 60s
795 0, 0, // number of selector elements and value blocks
796 0, 0, // head selector index
797 0, // number of values in last block
798 ]
799 );
800
801 // Advance several time steps to test ring buffer's `fill`
802 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(50_000_000_000));
803 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
804 assert_eq!(
805 buffer.data,
806 vec![
807 1, // version number
808 3, 0, 0, 0, // created timestamp
809 50, 0, 0, 0, // last timestamp
810 1, 0, // type: simple8b RLE; subtype: unsigned
811 16, 0, // series 1: length in bytes
812 10, 0, // series 1 granularity: 10s
813 1, 0, // number of selector elements and value blocks
814 0, 0, // head selector index
815 5, // number of values in last block
816 0x03, // 4-bit selector
817 0x0f, 0, 0, 0, 0, 0, 0, 0, // values 15, 0, 0, 0, 0
818 7, 0, // series 2: length in bytes
819 60, 0, // series 2 granularity: 60s
820 0, 0, // number of selector elements and value blocks
821 0, 0, // head selector index
822 0, // number of values in last block
823 ]
824 );
825 }
826
827 #[test]
828 fn time_matrix_with_zigzag_simple8b_rle_buffer() {
829 let exec = fasync::TestExecutor::new_with_fake_time();
830 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
831 let mut time_matrix = TimeMatrix::<Max<i64>, ConstantSample>::new(
832 SamplingProfile::highly_granular(),
833 ConstantSample::default(),
834 );
835 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
836 assert_eq!(
837 buffer.data,
838 vec![
839 1, // version number
840 3, 0, 0, 0, // created timestamp
841 3, 0, 0, 0, // last timestamp
842 1, 1, // type: simple8b RLE; subtype: signed (zigzag encoded)
843 7, 0, // series 1: length in bytes
844 10, 0, // series 1 granularity: 10s
845 0, 0, // number of selector elements and value blocks
846 0, 0, // head selector index
847 0, // number of values in last block
848 7, 0, // series 2: length in bytes
849 60, 0, // series 2 granularity: 60s
850 0, 0, // number of selector elements and value blocks
851 0, 0, // head selector index
852 0, // number of values in last block
853 ]
854 );
855
856 time_matrix.fold(Timed::now(-8)).unwrap();
857 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
858 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
859 assert_eq!(
860 buffer.data,
861 vec![
862 1, // version number
863 3, 0, 0, 0, // created timestamp
864 10, 0, 0, 0, // last timestamp
865 1, 1, // type: simple8b RLE; subtype: signed (zigzag encoded)
866 16, 0, // series 1: length in bytes
867 10, 0, // series 1 granularity: 10s
868 1, 0, // number of selector elements and value blocks
869 0, 0, // head selector index
870 1, // number of values in last block
871 0x0f, // RLE selector
872 15, 0, 0, 0, 0, 0, 1, 0, // value -8 (encoded as 15) appears 1 time
873 7, 0, // series 2: length in bytes
874 60, 0, // series 2 granularity: 60s
875 0, 0, // number of selector elements and value blocks
876 0, 0, // head selector index
877 0, // number of values in last block
878 ]
879 );
880
881 // Advance several time steps to test ring buffer's `fill`
882 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(50_000_000_000));
883 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
884 assert_eq!(
885 buffer.data,
886 vec![
887 1, // version number
888 3, 0, 0, 0, // created timestamp
889 50, 0, 0, 0, // last timestamp
890 1, 1, // type: simple8b RLE; subtype: signed (zigzag encoded)
891 16, 0, // series 1: length in bytes
892 10, 0, // series 1 granularity: 10s
893 1, 0, // number of selector elements and value blocks
894 0, 0, // head selector index
895 5, // number of values in last block
896 0x03, // 4-bit selector
897 0x0f, 0, 0, 0, 0, 0, 0, 0, // values -8 (encoded as 15), 0, 0, 0, 0
898 7, 0, // series 2: length in bytes
899 60, 0, // series 2 granularity: 60s
900 0, 0, // number of selector elements and value blocks
901 0, 0, // head selector index
902 0, // number of values in last block
903 ]
904 );
905 }
906
907 #[test]
908 fn time_matrix_with_delta_simple8b_rle_buffer() {
909 let exec = fasync::TestExecutor::new_with_fake_time();
910 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
911 let mut time_matrix = TimeMatrix::<LatchMax<u64>, LastSample>::new(
912 SamplingProfile::highly_granular(),
913 LastSample::or(0),
914 );
915 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
916 assert_eq!(
917 buffer.data,
918 vec![
919 1, // version number
920 3, 0, 0, 0, // created timestamp
921 3, 0, 0, 0, // last timestamp
922 2, 0, // type: delta simple8b RLE; subtype: unsigned
923 7, 0, // series 1: length in bytes
924 10, 0, // series 1 granularity: 10s
925 0, 0, // number of base value + selector elements or value blocks
926 0, 0, // head selector index
927 0, // number of values in last block
928 7, 0, // series 2: length in bytes
929 60, 0, // series 2 granularity: 60s
930 0, 0, // number of base value + selector elements or value blocks
931 0, 0, // head selector index
932 0, // number of values in last block
933 ]
934 );
935
936 time_matrix.fold(Timed::now(42)).unwrap();
937 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
938 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
939 assert_eq!(
940 buffer.data,
941 vec![
942 1, // version number
943 3, 0, 0, 0, // created timestamp
944 10, 0, 0, 0, // last timestamp
945 2, 0, // type: delta simple8b RLE; subtype: unsigned
946 15, 0, // series 1: length in bytes
947 10, 0, // series 1 granularity: 10s
948 1, 0, // number of base value + selector elements or value blocks
949 0, 0, // head selector index
950 0, // number of values in last block
951 42, 0, 0, 0, 0, 0, 0, 0, // base value
952 7, 0, // series 2: length in bytes
953 60, 0, // series 2 granularity: 60s
954 0, 0, // number of base value + selector elements or value blocks
955 0, 0, // head selector index
956 0, // number of values in last block
957 ]
958 );
959
960 time_matrix.fold(Timed::now(57)).unwrap();
961 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(20_000_000_000));
962 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
963 assert_eq!(
964 buffer.data,
965 vec![
966 1, // version number
967 3, 0, 0, 0, // created timestamp
968 20, 0, 0, 0, // last timestamp
969 2, 0, // type: delta simple8b RLE; subtype: unsigned
970 24, 0, // series 1: length in bytes
971 10, 0, // series 1 granularity: 10s
972 2, 0, // number of base value + selector elements or value blocks
973 0, 0, // head selector index
974 1, // number of values in last block
975 42, 0, 0, 0, 0, 0, 0, 0, // base value
976 0x0f, // RLE selector
977 15, 0, 0, 0, 0, 0, 1, 0, // value 15 (delta) appears 1 time
978 7, 0, // series 2: length in bytes
979 60, 0, // series 2 granularity: 60s
980 0, 0, // number of base value + selector elements or value blocks
981 0, 0, // head selector index
982 0, // number of values in last block
983 ]
984 );
985
986 // Advance several time steps to test ring buffer's `fill`
987 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(50_000_000_000));
988 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
989 assert_eq!(
990 buffer.data,
991 vec![
992 1, // version number
993 3, 0, 0, 0, // created timestamp
994 50, 0, 0, 0, // last timestamp
995 2, 0, // type: delta simple8b RLE; subtype: unsigned
996 24, 0, // series 1: length in bytes
997 10, 0, // series 1 granularity: 10s
998 2, 0, // number of base value + selector elements or value blocks
999 0, 0, // head selector index
1000 4, // number of values in last block
1001 42, 0, 0, 0, 0, 0, 0, 0, // base value
1002 0x03, // 4-bit selector
1003 0x0f, 0, 0, 0, 0, 0, 0, 0, // values 15, 0, 0, 0
1004 7, 0, // series 2: length in bytes
1005 60, 0, // series 2 granularity: 60s
1006 0, 0, // number of base value + selector elements or value blocks
1007 0, 0, // head selector index
1008 0, // number of values in last block
1009 ]
1010 );
1011 }
1012
1013 #[test]
1014 fn time_matrix_with_delta_zigzag_simple8b_rle_buffer_i64() {
1015 let exec = fasync::TestExecutor::new_with_fake_time();
1016 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
1017 let mut time_matrix = TimeMatrix::<Max<i64>, LastSample>::new(
1018 SamplingProfile::highly_granular(),
1019 LastSample::or(0),
1020 );
1021 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
1022 assert_eq!(
1023 buffer.data,
1024 vec![
1025 1, // version number
1026 3, 0, 0, 0, // created timestamp
1027 3, 0, 0, 0, // last timestamp
1028 2, 1, // type: delta simple8b RLE; subtype: signed
1029 7, 0, // series 1: length in bytes
1030 10, 0, // series 1 granularity: 10s
1031 0, 0, // number of base value + selector elements or value blocks
1032 0, 0, // head selector index
1033 0, // number of values in last block
1034 7, 0, // series 2: length in bytes
1035 60, 0, // series 2 granularity: 60s
1036 0, 0, // number of base value + selector elements or value blocks
1037 0, 0, // head selector index
1038 0, // number of values in last block
1039 ]
1040 );
1041
1042 time_matrix.fold(Timed::now(42)).unwrap();
1043 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
1044 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
1045 assert_eq!(
1046 buffer.data,
1047 vec![
1048 1, // version number
1049 3, 0, 0, 0, // created timestamp
1050 10, 0, 0, 0, // last timestamp
1051 2, 1, // type: delta simple8b RLE; subtype: signed
1052 15, 0, // series 1: length in bytes
1053 10, 0, // series 1 granularity: 10s
1054 1, 0, // number of base value + selector elements or value blocks
1055 0, 0, // head selector index
1056 0, // number of values in last block
1057 42, 0, 0, 0, 0, 0, 0, 0, // base value
1058 7, 0, // series 2: length in bytes
1059 60, 0, // series 2 granularity: 60s
1060 0, 0, // number of base value + selector elements or value blocks
1061 0, 0, // head selector index
1062 0, // number of values in last block
1063 ]
1064 );
1065
1066 time_matrix.fold(Timed::now(34)).unwrap();
1067 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(20_000_000_000));
1068 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
1069 assert_eq!(
1070 buffer.data,
1071 vec![
1072 1, // version number
1073 3, 0, 0, 0, // created timestamp
1074 20, 0, 0, 0, // last timestamp
1075 2, 1, // type: delta simple8b RLE; subtype: signed
1076 24, 0, // series 1: length in bytes
1077 10, 0, // series 1 granularity: 10s
1078 2, 0, // number of base value + selector elements or value blocks
1079 0, 0, // head selector index
1080 1, // number of values in last block
1081 42, 0, 0, 0, 0, 0, 0, 0, // base value
1082 0x0f, // RLE selector
1083 15, 0, 0, 0, 0, 0, 1, 0, // value -8 (delta) encoded as 15, appearing 1 time
1084 7, 0, // series 2: length in bytes
1085 60, 0, // series 2 granularity: 60s
1086 0, 0, // number of base value + selector elements or value blocks
1087 0, 0, // head selector index
1088 0, // number of values in last block
1089 ]
1090 );
1091
1092 // Advance several time steps to test ring buffer's `fill`
1093 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(50_000_000_000));
1094 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
1095 assert_eq!(
1096 buffer.data,
1097 vec![
1098 1, // version number
1099 3, 0, 0, 0, // created timestamp
1100 50, 0, 0, 0, // last timestamp
1101 2, 1, // type: delta simple8b RLE; subtype: signed
1102 24, 0, // series 1: length in bytes
1103 10, 0, // series 1 granularity: 10s
1104 2, 0, // number of base value + selector elements or value blocks
1105 0, 0, // head selector index
1106 4, // number of values in last block
1107 42, 0, 0, 0, 0, 0, 0, 0, // base value
1108 0x03, // 4-bit selector
1109 0x0f, 0, 0, 0, 0, 0, 0, 0, // diff values -8 (encoded as 15), 0, 0, 0
1110 7, 0, // series 2: length in bytes
1111 60, 0, // series 2 granularity: 60s
1112 0, 0, // number of base value + selector elements or value blocks
1113 0, 0, // head selector index
1114 0, // number of values in last block
1115 ]
1116 );
1117 }
1118
1119 #[test]
1120 fn time_matrix_with_delta_zigzag_simple8b_rle_buffer_u64() {
1121 let exec = fasync::TestExecutor::new_with_fake_time();
1122 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(3_000_000_000));
1123 let mut time_matrix = TimeMatrix::<Max<u64>, LastSample>::new(
1124 SamplingProfile::highly_granular(),
1125 LastSample::or(0),
1126 );
1127 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
1128 assert_eq!(
1129 buffer.data,
1130 vec![
1131 1, // version number
1132 3, 0, 0, 0, // created timestamp
1133 3, 0, 0, 0, // last timestamp
1134 2, 2, // type: delta simple8b RLE; subtype: unsigned with signed diff
1135 7, 0, // series 1: length in bytes
1136 10, 0, // series 1 granularity: 10s
1137 0, 0, // number of base value + selector elements or value blocks
1138 0, 0, // head selector index
1139 0, // number of values in last block
1140 7, 0, // series 2: length in bytes
1141 60, 0, // series 2 granularity: 60s
1142 0, 0, // number of base value + selector elements or value blocks
1143 0, 0, // head selector index
1144 0, // number of values in last block
1145 ]
1146 );
1147
1148 time_matrix.fold(Timed::now(1)).unwrap();
1149 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(10_000_000_000));
1150 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
1151 assert_eq!(
1152 buffer.data,
1153 vec![
1154 1, // version number
1155 3, 0, 0, 0, // created timestamp
1156 10, 0, 0, 0, // last timestamp
1157 2, 2, // type: delta simple8b RLE; subtype: unsigned with signed diff
1158 15, 0, // series 1: length in bytes
1159 10, 0, // series 1 granularity: 10s
1160 1, 0, // number of base value + selector elements or value blocks
1161 0, 0, // head selector index
1162 0, // number of values in last block
1163 1, 0, 0, 0, 0, 0, 0, 0, // base value
1164 7, 0, // series 2: length in bytes
1165 60, 0, // series 2 granularity: 60s
1166 0, 0, // number of base value + selector elements or value blocks
1167 0, 0, // head selector index
1168 0, // number of values in last block
1169 ]
1170 );
1171
1172 time_matrix.fold(Timed::now(u64::MAX)).unwrap();
1173 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(20_000_000_000));
1174 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
1175 assert_eq!(
1176 buffer.data,
1177 vec![
1178 1, // version number
1179 3, 0, 0, 0, // created timestamp
1180 20, 0, 0, 0, // last timestamp
1181 2, 2, // type: delta simple8b RLE; subtype: unsigned with signed diff
1182 24, 0, // series 1: length in bytes
1183 10, 0, // series 1 granularity: 10s
1184 2, 0, // number of base value + selector elements or value blocks
1185 0, 0, // head selector index
1186 1, // number of values in last block
1187 1, 0, 0, 0, 0, 0, 0, 0, // base value
1188 0x0f, // RLE selector
1189 3, 0, 0, 0, 0, 0, 1, 0, // value -2 (delta) encoded as 3, appearing 1 time
1190 7, 0, // series 2: length in bytes
1191 60, 0, // series 2 granularity: 60s
1192 0, 0, // number of base value + selector elements or value blocks
1193 0, 0, // head selector index
1194 0, // number of values in last block
1195 ]
1196 );
1197
1198 // Advance several time steps to test ring buffer's `fill`
1199 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(50_000_000_000));
1200 let buffer = time_matrix.tick_and_get_buffers(Timestamp::now()).unwrap();
1201 assert_eq!(
1202 buffer.data,
1203 vec![
1204 1, // version number
1205 3, 0, 0, 0, // created timestamp
1206 50, 0, 0, 0, // last timestamp
1207 2, 2, // type: delta simple8b RLE; subtype: unsigned with signed diff
1208 24, 0, // series 1: length in bytes
1209 10, 0, // series 1 granularity: 10s
1210 2, 0, // number of base value + selector elements or value blocks
1211 0, 0, // head selector index
1212 4, // number of values in last block
1213 1, 0, 0, 0, 0, 0, 0, 0, // base value
1214 0x01, // 2-bit selector
1215 3, 0, 0, 0, 0, 0, 0, 0, // diff values -2 (encoded as 3), 0, 0, 0
1216 7, 0, // series 2: length in bytes
1217 60, 0, // series 2 granularity: 60s
1218 0, 0, // number of base value + selector elements or value blocks
1219 0, 0, // head selector index
1220 0, // number of values in last block
1221 ]
1222 );
1223 }
1224}