windowed_stats/experimental/series/
statistic.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Statistics and sample aggregation.
6
7use num::{Num, NumCast, Zero};
8use std::convert::Infallible;
9use std::fmt::Debug;
10use std::num::NonZeroUsize;
11use std::ops::BitOr;
12use std::{cmp, io};
13use thiserror::Error;
14
15use crate::experimental::clock::MonotonicityError;
16use crate::experimental::series::buffer::{BufferStrategy, RingBuffer};
17use crate::experimental::series::interpolation::InterpolationKind;
18use crate::experimental::series::interval::SamplingInterval;
19use crate::experimental::series::{BitSet, Counter, DataSemantic, Gauge, GaugeForceSimple8bRle};
20use crate::experimental::vec1::Vec1;
21
22pub mod recipe {
23    //! Type definitions and respellings of common or interesting statistic types.
24
25    use crate::experimental::series::statistic::{ArithmeticMean, Transform};
26
27    pub type ArithmeticMeanTransform<T, A> = Transform<ArithmeticMean<T>, A>;
28}
29
30/// Sample folding error.
31///
32/// Describes errors that can occur when folding a sample into a [`Statistic`].
33#[derive(Debug, Error)]
34#[non_exhaustive]
35pub enum FoldError {
36    #[error(transparent)]
37    Io(#[from] io::Error),
38    #[error(transparent)]
39    Monotonicity(#[from] MonotonicityError),
40    #[error("overflow in statistic computation")]
41    Overflow,
42    // TODO(https://fxbug.dev/432323121): Provide the data that could not be sent into the channel
43    //                                    here. This requires an input type parameter on
44    //                                    `FoldError`, a `dyn Any` trait object, etc.
45    #[error("timed sample dropped: channel is closed or buffer is exhausted")]
46    Buffer,
47    #[error("failed to fold one or more buffered samples")]
48    Flush(Vec1<FoldError>),
49}
50
51impl From<Infallible> for FoldError {
52    fn from(_: Infallible) -> Self {
53        unreachable!()
54    }
55}
56
57/// A statistical function type that folds samples into an aggregation.
58pub trait Statistic: Clone {
59    /// The type of data semantic associated with samples.
60    type Semantic: DataSemantic;
61    /// The type of samples.
62    type Sample: Clone;
63    /// The type of the statistical aggregation.
64    type Aggregation: Clone;
65
66    /// Folds a sample into the aggregation of the statistic.
67    ///
68    /// # Errors
69    ///
70    /// Returns an error if folding the sample causes an overflow.
71    fn fold(&mut self, sample: Self::Sample) -> Result<(), FoldError>;
72
73    /// Folds a sample into the aggregation of a statistic `n` (one or more) times.
74    ///
75    /// For some `Statistic`s, this function is significantly more efficient than multiple calls to
76    /// [`fold`]. For example, [`Max`] need not fold more than once for any given sample regardless
77    /// of `n`. Prefer this function when the same sample must be folded more than once.
78    ///
79    /// # Errors
80    ///
81    /// Returns an error if folding the sample causes an overflow.
82    fn fill(&mut self, sample: Self::Sample, n: NonZeroUsize) -> Result<(), FoldError> {
83        for _ in 0..n.get() {
84            self.fold(sample.clone())?;
85        }
86        Ok(())
87    }
88
89    /// Resets the state (and aggregation) of the statistic.
90    ///
91    /// The state of a statistic after a reset is arbitrary, but most types reset to a reasonable
92    /// initial state via `Default`. Some types do nothing, such as [`LatchMax`], which operates
93    /// across [`SamplingInterval`]s.
94    ///
95    /// Statistics can be configured to reset to any given state via [`Reset`].
96    ///
97    /// [`LatchMax`] crate::experimental::series::statistic::LatchMax
98    /// [`Reset`] crate::experimental::series::statistic::Reset
99    fn reset(&mut self);
100
101    /// Gets the statistical aggregation.
102    ///
103    /// Returns `None` if no aggregation is ready.
104    fn aggregation(&self) -> Option<Self::Aggregation>;
105}
106
107/// Extension methods for `Statistic`s.
108pub trait StatisticExt: Statistic {
109    /// Gets the statistical aggregation and resets the statistic.
110    fn get_aggregation_and_reset(&mut self) -> Option<Self::Aggregation> {
111        let aggregation = self.aggregation();
112        self.reset();
113        aggregation
114    }
115}
116
117impl<F> StatisticExt for F where F: Statistic {}
118
119/// A [`Statistic`] for which an in-memory ring buffer encoding is defined.
120///
121/// A serial statistic can be used with a [`TimeMatrix`]. A series of aggregations are stored in an
122/// instance of the defined buffer, which is typically optimized for the data semantic and data
123/// type of the statistic.
124///
125/// [`TimeMatrix`]: crate::experimental::series::TimeMatrix
126pub trait SerialStatistic<P>: Statistic
127where
128    P: InterpolationKind,
129{
130    type Buffer: Clone + RingBuffer<Self::Aggregation>;
131
132    fn buffer(interval: &SamplingInterval) -> Self::Buffer;
133}
134
135// This blanket implementation essentially forwards the `BufferStrategy` implementation of the
136// associated `DataSemantic`. `DataSemantic` types like `Gauge` provide the core implementation and
137// the `SerialStatistic` trait relates the necessary types and provides a more brief way to express
138// bounds.
139impl<F, P> SerialStatistic<P> for F
140where
141    F: Statistic,
142    // The data semantic must implement `BufferStrategy` for the `Statistic`'s aggregation type and
143    // the given `Interpolation` type `P`.
144    F::Semantic: BufferStrategy<F::Aggregation, P>,
145    P: InterpolationKind,
146{
147    type Buffer = <F::Semantic as BufferStrategy<F::Aggregation, P>>::Buffer;
148
149    fn buffer(interval: &SamplingInterval) -> Self::Buffer {
150        <F::Semantic>::buffer(interval)
151    }
152}
153
154/// The associated data semantic type of a `Statistic`.
155pub type Semantic<F> = <F as Statistic>::Semantic;
156
157/// The associated metadata type of a `Statistic`.
158pub type Metadata<F> = <Semantic<F> as DataSemantic>::Metadata;
159
160/// The associated sample type of a `Statistic`.
161pub type Sample<F> = <F as Statistic>::Sample;
162
163/// The associated aggregation type of a `Statistic`.
164pub type Aggregation<F> = <F as Statistic>::Aggregation;
165
166/// Arithmetic mean statistic.
167///
168/// The arithmetic mean sums samples within their domain and computes the mean as a real number
169/// represented using floating-point. For floating-point samples, `NaN`s are discarded and the sum
170/// saturates to infinity without error.
171///
172/// This statistic is sensitive to overflow in the count of samples.
173#[derive(Clone, Debug)]
174pub struct ArithmeticMean<T> {
175    /// The sum of samples.
176    sum: T,
177    /// The count of samples.
178    n: u64,
179}
180
181impl<T> ArithmeticMean<T> {
182    pub fn with_sum(sum: T) -> Self {
183        ArithmeticMean { sum, n: 0 }
184    }
185
186    fn fill(&mut self, sample: T, n: NonZeroUsize) -> Result<(), FoldError>
187    where
188        Self: Statistic<Sample = T>,
189        T: Clone + Num + NumCast,
190    {
191        Ok(match num::cast::<_, T>(n.get()) {
192            Some(m) => {
193                self.fold(sample * m)?;
194                self.increment((n.get() as u64) - 1)?;
195            }
196            _ => {
197                for _ in 0..n.get() {
198                    self.fold(sample.clone())?;
199                }
200            }
201        })
202    }
203
204    fn increment(&mut self, m: u64) -> Result<(), FoldError> {
205        // TODO(https://fxbug.dev/351848566): On overflow, either saturate `self.n` or leave both
206        //                                    `self.sum` and `self.n` unchanged (here and in
207        //                                    `Sampler::fold` and `Fill::fill` implementations; see
208        //                                    below).
209        self.n.checked_add(m).inspect(|sum| self.n = *sum).map(|_| ()).ok_or(FoldError::Overflow)
210    }
211
212    fn reset(&mut self)
213    where
214        T: Zero,
215    {
216        *self = Default::default()
217    }
218}
219
220impl<T> Default for ArithmeticMean<T>
221where
222    T: Zero,
223{
224    fn default() -> Self {
225        ArithmeticMean::with_sum(T::zero())
226    }
227}
228
229impl Statistic for ArithmeticMean<f32> {
230    type Semantic = Gauge;
231    type Sample = f32;
232    type Aggregation = f32;
233
234    fn fold(&mut self, sample: Self::Sample) -> Result<(), FoldError> {
235        // Discard `NaN` terms and avoid some floating-point exceptions.
236        self.sum = match sample {
237            _ if sample.is_nan() => self.sum,
238            sample if sample.is_infinite() => sample,
239            // Because neither term is `NaN` and `sample` is finite, this saturates at negative or
240            // positive infinity.
241            sample => self.sum + sample,
242        };
243        self.increment(1)
244    }
245
246    fn fill(&mut self, sample: Self::Sample, n: NonZeroUsize) -> Result<(), FoldError> {
247        ArithmeticMean::fill(self, sample, n)
248    }
249
250    fn reset(&mut self) {
251        ArithmeticMean::reset(self)
252    }
253
254    fn aggregation(&self) -> Option<Self::Aggregation> {
255        // This is lossy and lossiness correlates to the magnitude of `n`. See details of
256        // `u64 as f32` casts.
257        (self.n > 0).then(|| self.sum / (self.n as f32))
258    }
259}
260
261impl Statistic for ArithmeticMean<i64> {
262    type Semantic = Gauge;
263    type Sample = i64;
264    type Aggregation = f32;
265
266    fn fold(&mut self, sample: Self::Sample) -> Result<(), FoldError> {
267        // TODO(https://fxbug.dev/351848566): On overflow, either saturate `self.n` or leave both
268        //                                    `self.sum` and `self.n` unchanged (here and in
269        //                                    `Sampler::fold` and `Fill::fill` implementations; see
270        //                                    below).
271        self.sum = self.sum.checked_add(sample).ok_or(FoldError::Overflow)?;
272        self.increment(1)
273    }
274
275    fn fill(&mut self, sample: Self::Sample, n: NonZeroUsize) -> Result<(), FoldError> {
276        ArithmeticMean::fill(self, sample, n)
277    }
278
279    fn reset(&mut self) {
280        ArithmeticMean::reset(self)
281    }
282
283    fn aggregation(&self) -> Option<Self::Aggregation> {
284        // This is lossy and lossiness correlates to the magnitude of `n`. See details of
285        // `i64 as f32` casts.
286        (self.n > 0).then(|| self.sum as f32 / (self.n as f32))
287    }
288}
289
290impl Statistic for ArithmeticMean<u64> {
291    type Semantic = Gauge;
292    type Sample = u64;
293    type Aggregation = f32;
294
295    fn fold(&mut self, sample: Self::Sample) -> Result<(), FoldError> {
296        // TODO(https://fxbug.dev/351848566): On overflow, either saturate `self.n` or leave both
297        //                                    `self.sum` and `self.n` unchanged (here and in
298        //                                    `Sampler::fold` and `Fill::fill` implementations; see
299        //                                    below).
300        self.sum = self.sum.checked_add(sample).ok_or(FoldError::Overflow)?;
301        self.increment(1)
302    }
303
304    fn fill(&mut self, sample: Self::Sample, n: NonZeroUsize) -> Result<(), FoldError> {
305        ArithmeticMean::fill(self, sample, n)
306    }
307
308    fn reset(&mut self) {
309        ArithmeticMean::reset(self)
310    }
311
312    fn aggregation(&self) -> Option<Self::Aggregation> {
313        // This is lossy and lossiness correlates to the magnitude of `n`. See details of
314        // `u64 as f32` casts.
315        (self.n > 0).then(|| self.sum as f32 / (self.n as f32))
316    }
317}
318
319/// Sum statistic.
320///
321/// The sum directly computes the aggregation in the domain of samples.
322///
323/// This statistic is sensitive to overflow in the sum of samples.
324#[derive(Clone, Debug)]
325pub struct Sum<T> {
326    /// The sum of samples.
327    sum: T,
328}
329
330impl<T> Sum<T> {
331    pub fn with_sum(sum: T) -> Self {
332        Sum { sum }
333    }
334
335    fn fill(&mut self, sample: T, n: NonZeroUsize) -> Result<(), FoldError>
336    where
337        Self: Statistic<Sample = T>,
338        T: Clone + Num + NumCast,
339    {
340        if let Some(n) = num::cast::<_, T>(n.get()) {
341            self.fold(sample * n)
342        } else {
343            Ok(for _ in 0..n.get() {
344                self.fold(sample.clone())?;
345            })
346        }
347    }
348
349    fn reset(&mut self)
350    where
351        T: Zero,
352    {
353        *self = Default::default();
354    }
355}
356
357impl<T> Default for Sum<T>
358where
359    T: Zero,
360{
361    fn default() -> Self {
362        Sum::with_sum(T::zero())
363    }
364}
365
366impl Statistic for Sum<u64> {
367    type Semantic = Gauge;
368    type Sample = u64;
369    type Aggregation = u64;
370
371    fn fold(&mut self, sample: Self::Sample) -> Result<(), FoldError> {
372        // TODO(https://fxbug.dev/351848566): Saturate `self.sum` on overflow.
373        self.sum
374            .checked_add(sample)
375            .inspect(|sum| self.sum = *sum)
376            .map(|_| ())
377            .ok_or(FoldError::Overflow)
378    }
379
380    fn fill(&mut self, sample: Self::Sample, n: NonZeroUsize) -> Result<(), FoldError> {
381        Sum::fill(self, sample, n)
382    }
383
384    fn reset(&mut self) {
385        Sum::reset(self)
386    }
387
388    fn aggregation(&self) -> Option<Self::Aggregation> {
389        let sum = self.sum;
390        Some(sum)
391    }
392}
393
394impl Statistic for Sum<i64> {
395    type Semantic = Gauge;
396    type Sample = i64;
397    type Aggregation = i64;
398
399    fn fold(&mut self, sample: Self::Sample) -> Result<(), FoldError> {
400        // TODO(https://fxbug.dev/351848566): Saturate `self.sum` on overflow.
401        self.sum
402            .checked_add(sample)
403            .inspect(|sum| self.sum = *sum)
404            .map(|_| ())
405            .ok_or(FoldError::Overflow)
406    }
407
408    fn fill(&mut self, sample: Self::Sample, n: NonZeroUsize) -> Result<(), FoldError> {
409        Sum::fill(self, sample, n)
410    }
411
412    fn reset(&mut self) {
413        Sum::reset(self)
414    }
415
416    fn aggregation(&self) -> Option<Self::Aggregation> {
417        let sum = self.sum;
418        Some(sum)
419    }
420}
421
422/// Minimum statistic.
423#[derive(Clone, Debug)]
424pub struct Min<T> {
425    /// The minimum of samples.
426    min: Option<T>,
427}
428
429impl<T> Min<T> {
430    pub fn with_min(min: T) -> Self {
431        Min { min: Some(min) }
432    }
433}
434
435impl<T> Default for Min<T> {
436    fn default() -> Self {
437        Min { min: Default::default() }
438    }
439}
440
441impl<T> Statistic for Min<T>
442where
443    T: Ord + Copy + Zero + Num + NumCast,
444{
445    type Semantic = Gauge;
446    type Sample = T;
447    type Aggregation = T;
448
449    fn fold(&mut self, sample: Self::Sample) -> Result<(), FoldError> {
450        self.min = Some(match self.min {
451            Some(min) => cmp::min(min, sample),
452            _ => sample,
453        });
454        Ok(())
455    }
456
457    fn fill(&mut self, sample: Self::Sample, _n: NonZeroUsize) -> Result<(), FoldError> {
458        self.fold(sample)
459    }
460
461    fn reset(&mut self) {
462        *self = Default::default();
463    }
464
465    fn aggregation(&self) -> Option<Self::Aggregation> {
466        self.min
467    }
468}
469
470/// Maximum statistic.
471#[derive(Clone, Debug)]
472pub struct Max<T> {
473    /// The maximum of samples.
474    max: Option<T>,
475}
476
477impl<T> Max<T> {
478    pub fn with_max(max: T) -> Self {
479        Max { max: Some(max) }
480    }
481}
482
483impl<T> Default for Max<T> {
484    fn default() -> Self {
485        Max { max: Default::default() }
486    }
487}
488
489impl<T> Statistic for Max<T>
490where
491    T: Ord + Copy + Zero + Num + NumCast,
492{
493    type Semantic = Gauge;
494    type Sample = T;
495    type Aggregation = T;
496
497    fn fold(&mut self, sample: Self::Sample) -> Result<(), FoldError> {
498        self.max = Some(match self.max {
499            Some(max) => cmp::max(max, sample),
500            _ => sample,
501        });
502        Ok(())
503    }
504
505    fn fill(&mut self, sample: Self::Sample, _n: NonZeroUsize) -> Result<(), FoldError> {
506        self.fold(sample)
507    }
508
509    fn reset(&mut self) {
510        *self = Default::default();
511    }
512
513    fn aggregation(&self) -> Option<Self::Aggregation> {
514        self.max
515    }
516}
517
518/// Statistic for keeping the most recent sample.
519#[derive(Clone, Debug)]
520pub struct Last<T> {
521    /// The most recent sample.
522    last: Option<T>,
523}
524
525impl<T> Last<T> {
526    pub fn with_sample(sample: T) -> Self {
527        Last { last: Some(sample) }
528    }
529}
530
531impl<T> Default for Last<T> {
532    fn default() -> Self {
533        Last { last: Default::default() }
534    }
535}
536
537impl<T> Statistic for Last<T>
538where
539    T: Copy + Zero + Num + NumCast,
540{
541    type Semantic = Gauge;
542    type Sample = T;
543    type Aggregation = T;
544
545    fn fold(&mut self, sample: Self::Sample) -> Result<(), FoldError> {
546        self.last = Some(sample);
547        Ok(())
548    }
549
550    fn fill(&mut self, sample: Self::Sample, _n: NonZeroUsize) -> Result<(), FoldError> {
551        self.fold(sample)
552    }
553
554    fn reset(&mut self) {
555        *self = Default::default();
556    }
557
558    fn aggregation(&self) -> Option<Self::Aggregation> {
559        self.last
560    }
561}
562
563#[derive(Default, Copy, Clone, Debug)]
564pub struct Union<T> {
565    bits: T,
566}
567
568impl<T> Union<T> {
569    pub fn with_bits(bits: T) -> Self {
570        Union { bits }
571    }
572}
573
574impl<T> Statistic for Union<T>
575where
576    T: Into<u64> + BitOr<Output = T> + Copy + Default,
577{
578    type Semantic = BitSet;
579    type Sample = T;
580    type Aggregation = u64;
581
582    fn fold(&mut self, sample: Self::Sample) -> Result<(), FoldError> {
583        self.bits = self.bits | sample;
584        Ok(())
585    }
586
587    fn fill(&mut self, sample: Self::Sample, _n: NonZeroUsize) -> Result<(), FoldError> {
588        self.fold(sample)
589    }
590
591    fn reset(&mut self) {
592        *self = Default::default();
593    }
594
595    fn aggregation(&self) -> Option<Self::Aggregation> {
596        Some(self.bits.into())
597    }
598}
599
600/// Maximum statistic that sums non-monotonic samples into the maximum.
601///
602/// This statistic is sensitive to overflow in the sum of samples with the non-monotonic sum.
603#[derive(Clone, Debug)]
604pub struct LatchMax<T> {
605    /// The last observed sample.
606    last: Option<T>,
607    /// The maximum of samples and the non-monotonic sum.
608    max: Option<T>,
609    /// The sum of non-monotonic samples.
610    sum: T,
611}
612
613impl<T> LatchMax<T> {
614    pub fn with_max(max: T) -> Self
615    where
616        T: Zero,
617    {
618        LatchMax::with_max_and_sum(max, T::zero())
619    }
620
621    pub fn with_max_and_sum(max: T, sum: T) -> Self {
622        LatchMax { last: None, max: Some(max), sum }
623    }
624}
625
626impl<T> Default for LatchMax<T>
627where
628    T: Zero,
629{
630    fn default() -> Self {
631        LatchMax { last: None, max: None, sum: T::zero() }
632    }
633}
634
635impl Statistic for LatchMax<u64> {
636    type Semantic = Counter;
637    type Sample = u64;
638    type Aggregation = u64;
639
640    fn fold(&mut self, sample: Self::Sample) -> Result<(), FoldError> {
641        match self.last {
642            Some(last) if sample < last => {
643                // TODO(https://fxbug.dev/351848566): Saturate `self.sum` on overflow.
644                self.sum = self.sum.checked_add(last).ok_or(FoldError::Overflow)?;
645            }
646            _ => {}
647        }
648        self.last = Some(sample);
649
650        let sum = sample.checked_add(self.sum).ok_or(FoldError::Overflow)?;
651        self.max = Some(match self.max {
652            Some(max) => cmp::max(max, sum),
653            _ => sum,
654        });
655        Ok(())
656    }
657
658    fn fill(&mut self, sample: Self::Sample, _n: NonZeroUsize) -> Result<(), FoldError> {
659        self.fold(sample)
660    }
661
662    fn reset(&mut self) {}
663
664    fn aggregation(&self) -> Option<Self::Aggregation> {
665        self.max
666    }
667}
668
669/// Post-aggregation transform.
670///
671/// A post-aggregation composes a [`Statistic`] and arbitrarily maps its aggregation. For example,
672/// a transform may discretize the continuous aggregation of a statistic.
673///
674/// [`Statistic`]: crate::experimental::series::statistic::Statistic
675#[derive(Clone, Copy, Debug)]
676pub struct PostAggregation<F, R> {
677    statistic: F,
678    transform: R,
679}
680
681/// A post-aggregation of a function pointer.
682///
683/// This type definition describes a stateless and pure transform with a type name that can be
684/// spelled in any context (i.e., no closure or other unnameable types occur).
685pub type Transform<F, A> = PostAggregation<F, fn(Aggregation<F>) -> A>;
686
687impl<F, R> PostAggregation<F, R> {
688    pub fn with(statistic: F, transform: R) -> Self {
689        PostAggregation { statistic, transform }
690    }
691
692    /// Constructs a `PostAggregation` from a transform function.
693    ///
694    /// The default statistic is composed.
695    pub fn from_transform(transform: R) -> Self
696    where
697        F: Default,
698    {
699        PostAggregation { statistic: F::default(), transform }
700    }
701}
702
703impl<A, F, R> Statistic for PostAggregation<F, R>
704where
705    F: Statistic,
706    R: Clone + Fn(F::Aggregation) -> A,
707    A: Clone,
708{
709    type Semantic = F::Semantic;
710    type Sample = F::Sample;
711    type Aggregation = A;
712
713    fn fold(&mut self, sample: F::Sample) -> Result<(), FoldError> {
714        self.statistic.fold(sample)
715    }
716
717    fn fill(&mut self, sample: F::Sample, n: NonZeroUsize) -> Result<(), FoldError> {
718        self.statistic.fill(sample, n)
719    }
720
721    fn reset(&mut self) {
722        self.statistic.reset()
723    }
724
725    fn aggregation(&self) -> Option<Self::Aggregation> {
726        self.statistic.aggregation().map(|aggregation| (self.transform)(aggregation))
727    }
728}
729
730/// Applies an arbitrary [reset function][`reset`] to a [`Statistic`].
731///
732/// [`reset`]: crate::experimental::series::statistic::Statistic::reset
733/// [`Statistic`]: crate::experimental::series::statistic::Statistic
734#[derive(Clone, Copy, Debug)]
735pub struct Reset<F, R> {
736    statistic: F,
737    reset: R,
738}
739
740impl<F, R> Reset<F, R>
741where
742    F: Statistic,
743    R: FnMut() -> F,
744{
745    pub fn with(statistic: F, reset: R) -> Self {
746        Reset { statistic, reset }
747    }
748}
749
750impl<F, R> Statistic for Reset<F, R>
751where
752    F: Statistic,
753    R: Clone + FnMut() -> F,
754{
755    type Semantic = F::Semantic;
756    type Sample = F::Sample;
757    type Aggregation = F::Aggregation;
758
759    fn fold(&mut self, sample: F::Sample) -> Result<(), FoldError> {
760        self.statistic.fold(sample)
761    }
762
763    fn fill(&mut self, sample: F::Sample, n: NonZeroUsize) -> Result<(), FoldError> {
764        self.statistic.fill(sample, n)
765    }
766
767    fn reset(&mut self) {
768        self.statistic = (self.reset)();
769    }
770
771    fn aggregation(&self) -> Option<Self::Aggregation> {
772        self.statistic.aggregation()
773    }
774}
775
776/// A [`Statistic`] that produces an aggregation that is the difference between
777/// the first and last sample of the aggregation interval.
778#[derive(Default, Copy, Clone, Debug)]
779pub struct Diff<T> {
780    left: Option<T>,
781    right: Option<T>,
782}
783
784impl<T: num::CheckedSub + Copy> Statistic for Diff<T> {
785    type Semantic = GaugeForceSimple8bRle;
786    type Sample = T;
787    type Aggregation = T;
788
789    fn fold(&mut self, sample: Self::Sample) -> Result<(), FoldError> {
790        let Self { left, right } = self;
791        let _: &mut T = left.get_or_insert(sample);
792        *right = Some(sample);
793        Ok(())
794    }
795
796    fn reset(&mut self) {
797        let Self { left, right } = self;
798        *left = right.take();
799    }
800
801    fn aggregation(&self) -> Option<Self::Aggregation> {
802        let Self { left, right } = self;
803        left.as_ref().zip(right.as_ref()).and_then(|(l, r)| r.checked_sub(l))
804    }
805}
806
807#[cfg(test)]
808mod tests {
809    use assert_matches::assert_matches;
810    use std::num::NonZeroUsize;
811
812    use crate::experimental::series::statistic::{
813        ArithmeticMean, Diff, FoldError, Last, LatchMax, Max, Min, PostAggregation, Reset,
814        Statistic, Sum, Union,
815    };
816
817    #[test]
818    fn arithmetic_mean_aggregation() {
819        let mut mean = ArithmeticMean::<f32>::default();
820        mean.fold(1.0).unwrap();
821        mean.fold(1.0).unwrap();
822        mean.fold(1.0).unwrap();
823        let aggregation = mean.aggregation().unwrap();
824        assert!(aggregation > 0.99 && aggregation < 1.01); // ~ 1.0
825
826        let mut mean = ArithmeticMean::<f32>::default();
827        mean.fold(0.0).unwrap();
828        mean.fold(1.0).unwrap();
829        mean.fold(2.0).unwrap();
830        let aggregation = mean.aggregation().unwrap();
831        assert!(aggregation > 0.99 && aggregation < 1.01); // ~ 1.0
832    }
833
834    #[test]
835    fn arithmetic_mean_aggregation_fill() {
836        let mut mean = ArithmeticMean::<f32>::default();
837        mean.fill(1.0, NonZeroUsize::new(1000).unwrap()).unwrap();
838        let aggregation = mean.aggregation().unwrap();
839        assert!(aggregation > 0.99 && aggregation < 1.01); // ~ 1.0
840    }
841
842    #[test]
843    fn arithmetic_mean_count_overflow() {
844        let mut mean = ArithmeticMean::<f32> { sum: 1.0, n: u64::MAX };
845        let result = mean.fold(1.0);
846        assert_matches!(result, Err(FoldError::Overflow));
847    }
848
849    #[test]
850    fn arithmetic_mean_i64_aggregation() {
851        let mut mean = ArithmeticMean::<i64>::default();
852        mean.fold(1).unwrap();
853        mean.fold(1).unwrap();
854        mean.fold(1).unwrap();
855        let aggregation = mean.aggregation().unwrap();
856        assert!(aggregation > 0.99 && aggregation < 1.01); // ~ 1.0
857
858        let mut mean = ArithmeticMean::<i64>::default();
859        mean.fold(0).unwrap();
860        mean.fold(1).unwrap();
861        mean.fold(2).unwrap();
862        let aggregation = mean.aggregation().unwrap();
863        assert!(aggregation > 0.99 && aggregation < 1.01); // ~ 1.0
864    }
865
866    #[test]
867    fn arithmetic_mean_i64_aggregation_fill() {
868        let mut mean = ArithmeticMean::<i64>::default();
869        mean.fill(1, NonZeroUsize::new(1000).unwrap()).unwrap();
870        let aggregation = mean.aggregation().unwrap();
871        assert!(aggregation > 0.99 && aggregation < 1.01); // ~ 1.0
872    }
873
874    #[test]
875    fn arithmetic_mean_i64_sum_overflow() {
876        let mut mean = ArithmeticMean::<i64> { sum: i64::MAX, n: 1 };
877        let result = mean.fold(1);
878        assert_matches!(result, Err(FoldError::Overflow));
879    }
880
881    #[test]
882    fn arithmetic_mean_i64_count_overflow() {
883        let mut mean = ArithmeticMean::<i64> { sum: 1, n: u64::MAX };
884        let result = mean.fold(1);
885        assert_matches!(result, Err(FoldError::Overflow));
886    }
887
888    #[test]
889    fn arithmetic_mean_u64_aggregation() {
890        let mut mean = ArithmeticMean::<u64>::default();
891        mean.fold(1).unwrap();
892        mean.fold(1).unwrap();
893        mean.fold(1).unwrap();
894        let aggregation = mean.aggregation().unwrap();
895        assert!(aggregation > 0.99 && aggregation < 1.01); // ~ 1.0
896
897        let mut mean = ArithmeticMean::<u64>::default();
898        mean.fold(0).unwrap();
899        mean.fold(1).unwrap();
900        mean.fold(2).unwrap();
901        let aggregation = mean.aggregation().unwrap();
902        assert!(aggregation > 0.99 && aggregation < 1.01); // ~ 1.0
903    }
904
905    #[test]
906    fn arithmetic_mean_u64_aggregation_fill() {
907        let mut mean = ArithmeticMean::<u64>::default();
908        mean.fill(1, NonZeroUsize::new(1000).unwrap()).unwrap();
909        let aggregation = mean.aggregation().unwrap();
910        assert!(aggregation > 0.99 && aggregation < 1.01); // ~ 1.0
911    }
912
913    #[test]
914    fn arithmetic_mean_u64_sum_overflow() {
915        let mut mean = ArithmeticMean::<u64> { sum: u64::MAX, n: 1 };
916        let result = mean.fold(1);
917        assert_matches!(result, Err(FoldError::Overflow));
918    }
919
920    #[test]
921    fn arithmetic_mean_u64_count_overflow() {
922        let mut mean = ArithmeticMean::<u64> { sum: 1, n: u64::MAX };
923        let result = mean.fold(1);
924        assert_matches!(result, Err(FoldError::Overflow));
925    }
926
927    #[test]
928    fn sum_aggregation() {
929        let mut sum = Sum::<u64>::default();
930        sum.fold(1).unwrap();
931        sum.fold(1).unwrap();
932        sum.fold(1).unwrap();
933        let aggregation = sum.aggregation().unwrap();
934        assert_eq!(aggregation, 3);
935
936        let mut sum = Sum::<u64>::default();
937        sum.fold(0).unwrap();
938        sum.fold(1).unwrap();
939        sum.fold(2).unwrap();
940        let aggregation = sum.aggregation().unwrap();
941        assert_eq!(aggregation, 3);
942    }
943
944    #[test]
945    fn sum_aggregation_fill() {
946        let mut sum = Sum::<u64>::default();
947        sum.fill(10, NonZeroUsize::new(1000).unwrap()).unwrap();
948        let aggregation = sum.aggregation().unwrap();
949        assert_eq!(aggregation, 10_000);
950    }
951
952    #[test]
953    fn sum_overflow() {
954        let mut sum = Sum::<u64> { sum: u64::MAX };
955        let result = sum.fold(1);
956        assert_matches!(result, Err(FoldError::Overflow));
957    }
958
959    #[test]
960    fn sum_i64_aggregation() {
961        let mut sum = Sum::<i64>::default();
962        sum.fold(1).unwrap();
963        sum.fold(1).unwrap();
964        sum.fold(1).unwrap();
965        let aggregation = sum.aggregation().unwrap();
966        assert_eq!(aggregation, 3);
967
968        let mut sum = Sum::<i64>::default();
969        sum.fold(0).unwrap();
970        sum.fold(1).unwrap();
971        sum.fold(2).unwrap();
972        let aggregation = sum.aggregation().unwrap();
973        assert_eq!(aggregation, 3);
974    }
975
976    #[test]
977    fn sum_i64_aggregation_fill() {
978        let mut sum = Sum::<i64>::default();
979        sum.fill(10, NonZeroUsize::new(1000).unwrap()).unwrap();
980        let aggregation = sum.aggregation().unwrap();
981        assert_eq!(aggregation, 10_000);
982    }
983
984    #[test]
985    fn sum_i64_overflow() {
986        let mut sum = Sum::<i64> { sum: i64::MAX };
987        let result = sum.fold(1);
988        assert_matches!(result, Err(FoldError::Overflow));
989    }
990
991    #[test]
992    fn min_aggregation() {
993        let mut min = Min::<u64>::default();
994        min.fold(10).unwrap();
995        min.fold(1337).unwrap();
996        min.fold(42).unwrap();
997        let aggregation = min.aggregation().unwrap();
998        assert_eq!(aggregation, 10);
999    }
1000
1001    #[test]
1002    fn min_aggregation_fill() {
1003        let mut min = Min::<u64>::default();
1004        min.fill(42, NonZeroUsize::new(1000).unwrap()).unwrap();
1005        let aggregation = min.aggregation().unwrap();
1006        assert_eq!(aggregation, 42);
1007    }
1008
1009    #[test]
1010    fn max_aggregation() {
1011        let mut max = Max::<u64>::default();
1012        max.fold(0).unwrap();
1013        max.fold(1337).unwrap();
1014        max.fold(42).unwrap();
1015        let aggregation = max.aggregation().unwrap();
1016        assert_eq!(aggregation, 1337);
1017    }
1018
1019    #[test]
1020    fn max_aggregation_fill() {
1021        let mut max = Max::<u64>::default();
1022        max.fill(42, NonZeroUsize::new(1000).unwrap()).unwrap();
1023        let aggregation = max.aggregation().unwrap();
1024        assert_eq!(aggregation, 42);
1025    }
1026
1027    #[test]
1028    fn last_aggregation() {
1029        let mut last = Last::<u64>::default();
1030        last.fold(0).unwrap();
1031        last.fold(1337).unwrap();
1032        last.fold(42).unwrap();
1033        let aggregation = last.aggregation().unwrap();
1034        assert_eq!(aggregation, 42);
1035    }
1036
1037    #[test]
1038    fn last_aggregation_fill() {
1039        let mut last = Last::<u64>::default();
1040        last.fill(42, NonZeroUsize::new(1000).unwrap()).unwrap();
1041        let aggregation = last.aggregation().unwrap();
1042        assert_eq!(aggregation, 42);
1043    }
1044
1045    #[test]
1046    fn union_aggregation() {
1047        let mut value = Union::<u64>::default();
1048        value.fold(1 << 1).unwrap();
1049        value.fold(1 << 3).unwrap();
1050        value.fold(1 << 5).unwrap();
1051        let aggregation = value.aggregation().unwrap();
1052        assert_eq!(aggregation, 0b101010);
1053    }
1054
1055    #[test]
1056    fn union_aggregation_fill() {
1057        let mut value = Union::<u64>::default();
1058        value.fill(1 << 2, NonZeroUsize::new(1000).unwrap()).unwrap();
1059        let aggregation = value.aggregation().unwrap();
1060        assert_eq!(aggregation, 0b100);
1061    }
1062
1063    #[test]
1064    fn latch_max_aggregation() {
1065        let mut max = LatchMax::<u64>::default();
1066        max.fold(1).unwrap();
1067        max.fold(1).unwrap();
1068        max.fold(1).unwrap();
1069        let aggregation = max.aggregation().unwrap();
1070        assert_eq!(aggregation, 1);
1071
1072        let mut max = LatchMax::<u64>::default();
1073        max.fold(0).unwrap();
1074        max.fold(1).unwrap();
1075        max.fold(2).unwrap();
1076        let aggregation = max.aggregation().unwrap();
1077        assert_eq!(aggregation, 2);
1078
1079        let mut max = LatchMax::<u64>::default();
1080        max.fold(1).unwrap();
1081        max.fold(5).unwrap();
1082        max.fold(6).unwrap();
1083        max.fold(2).unwrap(); // Non-monotonic sum is six.
1084        max.fold(9).unwrap();
1085        max.fold(3).unwrap(); // Non-monotonic sum is 15 (6 + 9).
1086        max.fold(5).unwrap();
1087        let aggregation = max.aggregation().unwrap();
1088        assert_eq!(aggregation, 20);
1089    }
1090
1091    #[test]
1092    fn latch_max_aggregation_fill() {
1093        let mut max = LatchMax::<u64>::default();
1094        max.fill(10, NonZeroUsize::new(1000).unwrap()).unwrap();
1095        let aggregation = max.aggregation().unwrap();
1096        assert_eq!(aggregation, 10);
1097    }
1098
1099    #[test]
1100    fn latch_max_overflow_max() {
1101        let mut max = LatchMax::<u64>::default();
1102        max.fold(1).unwrap();
1103        max.fold(0).unwrap(); // Non-monotonic sum is one.
1104        let result = max.fold(u64::MAX); // Non-monotonic sum of one is added to `u64::MAX`.
1105        assert_matches!(result, Err(FoldError::Overflow));
1106    }
1107
1108    #[test]
1109    fn post_sum_aggregation() {
1110        let mut sum = PostAggregation::<Sum<u64>, _>::from_transform(|sum| sum + 1);
1111        sum.fold(0).unwrap();
1112        sum.fold(1).unwrap();
1113        sum.fold(2).unwrap();
1114        let aggregation = sum.aggregation().unwrap();
1115        assert_eq!(aggregation, 4);
1116    }
1117
1118    #[test]
1119    fn reset_with_function() {
1120        let mut sum = Reset::with(Sum::<u64>::default(), || Sum::with_sum(3));
1121
1122        assert_eq!(sum.aggregation().unwrap(), 0);
1123        sum.fold(1).unwrap();
1124        assert_eq!(sum.aggregation().unwrap(), 1);
1125
1126        sum.reset();
1127        assert_eq!(sum.aggregation().unwrap(), 3);
1128        sum.fold(1).unwrap();
1129        assert_eq!(sum.aggregation().unwrap(), 4);
1130    }
1131
1132    #[test]
1133    fn diff_aggregation() {
1134        let mut diff = Diff::<u64>::default();
1135        assert_eq!(diff.aggregation(), None);
1136        diff.fold(1).unwrap();
1137        assert_eq!(diff.aggregation(), Some(0));
1138        diff.fold(3).unwrap();
1139        assert_eq!(diff.aggregation(), Some(2));
1140        diff.fold(5).unwrap();
1141        assert_eq!(diff.aggregation(), Some(4));
1142        diff.reset();
1143        assert_eq!(diff.aggregation(), None);
1144        diff.fold(7).unwrap();
1145        assert_eq!(diff.aggregation(), Some(2));
1146        diff.fold(7).unwrap();
1147        assert_eq!(diff.aggregation(), Some(2));
1148        diff.fold(4).unwrap();
1149        assert_eq!(diff.aggregation(), None);
1150    }
1151}