windowed_stats/experimental/series/
interval.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Sampling rate and aggregation intervals.
6
7use itertools::Itertools;
8use num::Integer;
9use std::cmp;
10use std::fmt::{self, Debug, Display, Formatter};
11use std::marker::PhantomData;
12use std::num::NonZeroUsize;
13
14use crate::experimental::Vec1;
15use crate::experimental::clock::{Duration, DurationExt as _, Quanta, QuantaExt as _, Tick};
16use crate::experimental::series::buffer::Capacity;
17use crate::experimental::series::interpolation::Interpolation;
18use crate::experimental::series::statistic::{FoldError, Statistic, StatisticExt as _};
19
20/// An interval that has elapsed during a [`Tick`].
21///
22/// [`Tick`]: crate::experimental::clock::Tick;
23#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
24pub enum ElapsedInterval {
25    Vacant {
26        // When no samples have been observed, contiguous elapsed intervals can
27        // be represented by a count (rather than one or more instances of `ElapsedInterval`).
28        interval_count: NonZeroUsize,
29        fill_sample_count: NonZeroUsize,
30    },
31    Occupied {
32        fill_sample_count: Option<NonZeroUsize>,
33    },
34}
35
36impl ElapsedInterval {
37    /// Fills the given [`Statistic`] with interpolated samples using the given
38    /// [interpolation][`Interpolation`] and then computes the aggregation for the interval.
39    ///
40    /// [`Interpolation`]: crate::experimental::series::interpolation::Interpolation
41    /// [`Statistic`]: crate::experimental::series::statistic::Statistic
42    fn interpolate_and_get_aggregation<F, P>(
43        self,
44        statistic: &mut F,
45        interpolation: &mut P,
46    ) -> Result<Option<(NonZeroUsize, F::Aggregation)>, FoldError>
47    where
48        F: Statistic,
49        P: Interpolation<F::Sample>,
50    {
51        let (interval_count, fill_sample_count) = match self {
52            ElapsedInterval::Vacant { interval_count, fill_sample_count } => {
53                (interval_count, Some(fill_sample_count))
54            }
55            ElapsedInterval::Occupied { fill_sample_count } => {
56                (NonZeroUsize::MIN, fill_sample_count)
57            }
58        };
59
60        if let Some(fill_sample_count) = fill_sample_count {
61            interpolation.interpolate(statistic, fill_sample_count)?;
62        }
63        Ok(statistic.get_aggregation_and_reset().map(|aggregation| (interval_count, aggregation)))
64    }
65}
66
67/// An interval that has been reached but **not** elapsed by a [`Tick`].
68///
69/// [`Tick`]: crate::experimental::clock::Tick;
70#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
71pub struct PendingInterval<T> {
72    /// The number of fill (interpolated) samples required prior to folding the observed sample.
73    fill_sample_count: Option<NonZeroUsize>,
74    /// The sample observed during the tick.
75    sample: T,
76}
77
78impl<T> PendingInterval<T> {
79    /// Folds the observed sample into the given [`Statistic`] using the given
80    /// [interpolation][`Interpolation`].
81    ///
82    /// [`Interpolation`]: crate::experimental::series::interpolation::Interpolation
83    /// [`Statistic`]: crate::experimental::series::statistic::Statistic
84    fn fold<F, P>(self, statistic: &mut F, interpolation: &mut P) -> Result<(), FoldError>
85    where
86        T: Clone,
87        F: Statistic<Sample = T>,
88        P: Interpolation<T>,
89    {
90        let PendingInterval { fill_sample_count, sample } = self;
91
92        if let Some(fill_sample_count) = fill_sample_count {
93            interpolation.interpolate(statistic, fill_sample_count)?;
94        }
95        statistic.fold(sample.clone())?;
96        interpolation.observe(sample);
97        Ok(())
98    }
99}
100
101// A pending interval has no observed sample (`PhantomData<T>` instead of `T`) when an
102// interpolation (rather than a fold) is requested.
103impl<T> PendingInterval<PhantomData<T>> {
104    /// Fills the given [`Statistic`] with interpolated samples using the given
105    /// [interpolation][`Interpolation`].
106    ///
107    /// [`Interpolation`]: crate::experimental::series::interpolation::Interpolation
108    /// [`Statistic`]: crate::experimental::series::statistic::Statistic
109    fn interpolate<F, P>(self, statistic: &mut F, interpolation: &mut P) -> Result<(), FoldError>
110    where
111        T: Clone,
112        F: Statistic<Sample = T>,
113        P: Interpolation<T>,
114    {
115        let PendingInterval { fill_sample_count, .. } = self;
116
117        if let Some(fill_sample_count) = fill_sample_count {
118            interpolation.interpolate(statistic, fill_sample_count)
119        } else {
120            Ok(())
121        }
122    }
123}
124
125/// The expiration of [`SamplingInterval`]s intersected by a [`Tick`].
126///
127/// [`SamplingInterval`]: crate::experimental::series::SamplingInterval
128/// [`Tick`]: crate::experimental::clock::Tick;
129#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
130pub enum IntervalExpiration<T> {
131    Elapsed(ElapsedInterval),
132    Pending(PendingInterval<T>),
133}
134
135impl<T> IntervalExpiration<T> {
136    pub(crate) fn fold_and_get_aggregation<F, P>(
137        self,
138        statistic: &mut F,
139        interpolation: &mut P,
140    ) -> Result<Option<(NonZeroUsize, F::Aggregation)>, FoldError>
141    where
142        T: Clone,
143        F: Statistic<Sample = T>,
144        P: Interpolation<T>,
145    {
146        match self {
147            IntervalExpiration::Elapsed(elapsed) => {
148                elapsed.interpolate_and_get_aggregation(statistic, interpolation)
149            }
150            IntervalExpiration::Pending(pending) => {
151                pending.fold(statistic, interpolation).map(|_| None)
152            }
153        }
154    }
155}
156
157impl<T> IntervalExpiration<PhantomData<T>> {
158    pub(crate) fn interpolate_and_get_aggregation<F, P>(
159        self,
160        statistic: &mut F,
161        interpolation: &mut P,
162    ) -> Result<Option<(NonZeroUsize, F::Aggregation)>, FoldError>
163    where
164        T: Clone,
165        F: Statistic<Sample = T>,
166        P: Interpolation<T>,
167    {
168        match self {
169            IntervalExpiration::Elapsed(elapsed) => {
170                elapsed.interpolate_and_get_aggregation(statistic, interpolation)
171            }
172            IntervalExpiration::Pending(pending) => {
173                pending.interpolate(statistic, interpolation).map(|_| None)
174            }
175        }
176    }
177}
178
179/// A time interval in which samples are folded into an aggregation.
180///
181/// Sampling intervals determine the timing of aggregations and interpolation in time series and
182/// are defined by the following quantities:
183///
184///   1. **Maximum sampling period.** This is the basic unit of time that defines the sampling
185///      interval and represents the maximum duration in which a sample must be observed. For any
186///      such duration in which no sample is observed, an interpolated sample is used instead. This
187///      can also be thought of as its inverse: the minimum sampling frequency.
188///   2. **Sampling period count.** This is the number of sampling periods that form the sampling
189///      interval. This determines the minimum number of samples (interpolated or otherwise) folded
190///      into the aggregation for the sampling interval.
191///   3. **Capacity.** This is the number of sampling intervals (and therefore aggregations) that
192///      must be stored to represent an aggregated series. This quantity is somewhat extrinsic to
193///      the time interval itself, but determines its durability.
194///
195/// These quantities are concatenated into a shorthand to describe sampling intervals, formatted
196/// as `capacity x sampling_period_count x maximum_sampling_period`. For example, a 10x2x5s
197/// sampling interval persists 10 intervals formed from two maximum sampling period of 5s. In such
198/// an interval, there is at least one sample every 5s, an aggregation every 10s, and at most 10
199/// aggregations that represent a 100s period.
200#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
201pub struct SamplingInterval {
202    capacity: Capacity,
203    sampling_period_count: u32,
204    max_sampling_period: Quanta,
205}
206
207impl SamplingInterval {
208    pub(crate) fn new(
209        capacity: Capacity,
210        sampling_period_count: u32,
211        max_sampling_period: impl Into<Duration>,
212    ) -> Self {
213        SamplingInterval {
214            capacity,
215            sampling_period_count: cmp::max(1, sampling_period_count),
216            max_sampling_period: cmp::max(1, max_sampling_period.into().into_quanta().abs()),
217        }
218    }
219
220    /// Gets the duration of the interval (also known as the aggregation period).
221    pub fn duration(&self) -> Duration {
222        self.max_sampling_period() * self.sampling_period_count
223    }
224
225    /// Gets the maximum sampling period of the interval.
226    pub fn max_sampling_period(&self) -> Duration {
227        Duration::from_quanta(self.max_sampling_period)
228    }
229
230    pub(crate) fn capacity(&self) -> Capacity {
231        self.capacity
232    }
233
234    /// Gets the [expirations][`IntervalExpiration`] of intervals intersected by the given
235    /// [`Tick`].
236    ///
237    /// The given sample is always folded into exactly one pending interval that terminates the
238    /// sequence.
239    ///
240    /// [`IntervalExpiration`]: crate::experimental::series::interval::IntervalExpiration
241    /// [`Tick`]: crate::experimental::clock::Tick;
242    pub(crate) fn fold_and_get_expirations<T>(
243        &self,
244        tick: Tick,
245        sample: T,
246    ) -> impl Clone + Iterator<Item = IntervalExpiration<T>>
247    where
248        T: Clone,
249    {
250        let interval = self.max_sampling_period * Quanta::from(self.sampling_period_count);
251        let (start, end) = tick.quantize();
252        let start_has_sample = tick.start_has_sample(self.max_sampling_period);
253
254        // The intervals intersected by this `Tick` are constructed from three groups:
255        //
256        //   - Zero or one _resumed_ intervals. Such an interval was previously the _pending_
257        //     interval (see below), but has been elapsed by this `Tick`.
258        //   - Zero or more _skipped_ intervals. Such intervals were never previously intersected,
259        //     but are elapsed by this `Tick`.
260        //   - Exactly one _pending_ interval. This is the interval intersected by the end
261        //     timestamp of this `Tick`. There is always such an interval and this interval may
262        //     have been pending previously.
263        //
264        // Note that all quantities here are technically periods or durations: start and end
265        // timestamps are durations from zero, for example. Divisions yield unitless quantities
266        // (counts).
267        //
268        // About the below calculation: Buckets are always aligned, so we can simply divide
269        // timestamps by an `interval` or `max_sampling_period` to find out whether they fall into
270        // the same interval or max_sampling_period.
271        //
272        // For example, if the interval is 60s, timestamps at 1s and 30s marks would both into
273        // the [0, 60) interval. OTOH, timestamp at 61s mark would fall into the [60, 120) interval.
274        // We see that `1 / 60 == 30 / 60` (integer division), whereas `1 / 60 != 61 / 60`.
275        let resumed_interval_has_elapsed = (end / interval) > (start / interval);
276        let num_skipped_intervals =
277            usize::try_from((end / interval) - (start / interval) - 1).unwrap_or(0);
278
279        let pending_interval_fill_sample_count = if resumed_interval_has_elapsed {
280            // If the pending interval is a new one, simply calculate how many sampling periods
281            // it covers.
282            (end % interval) / self.max_sampling_period
283        } else {
284            // If the pending interval is an existing one, check how many sampling periods exist
285            // between start and end.
286            let num_elapsed_sampling_periods =
287                (end / self.max_sampling_period) - (start / self.max_sampling_period);
288            // Adjust the result based on whether the sampling period for the start timestamp
289            // already had a sample.
290            // This calculation can yield negative number if `start` and `end` are in the same
291            // sampling period, but we'll change it to 0 when cast to usize later on.
292            num_elapsed_sampling_periods - if start_has_sample { 1 } else { 0 }
293        };
294        itertools::chain!(
295            resumed_interval_has_elapsed.then(|| {
296                // Calculate how many sampling periods the remaining duration of the resumed
297                // interval covers. Adjust the result based on whether the sampling period for
298                // the start timestamp already had a sample.
299                let resumed_interval_remaining = interval - (start % interval);
300                let resumed_interval_fill_sample_count =
301                    Integer::div_ceil(&resumed_interval_remaining, &self.max_sampling_period)
302                        - if start_has_sample { 1 } else { 0 };
303                IntervalExpiration::Elapsed(ElapsedInterval::Occupied {
304                    fill_sample_count: NonZeroUsize::new(
305                        usize::try_from(resumed_interval_fill_sample_count).unwrap_or(0),
306                    ),
307                })
308            }),
309            (num_skipped_intervals > 0).then(|| IntervalExpiration::Elapsed(
310                ElapsedInterval::Vacant {
311                    // Safe to unwrap because we checked that `num_skipped_intervals > 0`
312                    interval_count: NonZeroUsize::new(num_skipped_intervals).unwrap(),
313                    fill_sample_count: NonZeroUsize::new(
314                        usize::try_from(self.sampling_period_count).unwrap_or(usize::MAX)
315                    )
316                    .unwrap_or(NonZeroUsize::MIN),
317                }
318            )),
319            // The pending interval falls on the end timestamp and so includes the associated
320            // sample.
321            Some(IntervalExpiration::Pending(PendingInterval {
322                fill_sample_count: NonZeroUsize::new(
323                    usize::try_from(pending_interval_fill_sample_count).unwrap_or(0)
324                ),
325                sample,
326            })),
327        )
328    }
329}
330
331impl Display for SamplingInterval {
332    fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
333        write!(
334            formatter,
335            "{}x{}x{}",
336            self.capacity,
337            self.sampling_period_count,
338            self.max_sampling_period.into_nearest_unit_display(),
339        )
340    }
341}
342
343/// One or more cooperative [`SamplingInterval`]s.
344#[derive(Clone, Debug)]
345pub struct SamplingProfile(Vec1<SamplingInterval>);
346
347impl SamplingProfile {
348    fn from_sampling_intervals<I>(intervals: I) -> Self
349    where
350        Vec1<SamplingInterval>: From<I>,
351    {
352        SamplingProfile(intervals.into())
353    }
354
355    /// Constructs a highly granular sampling profile with high fidelity.
356    ///
357    /// The minimum granularity is 10s and the maximum durability is 20m.
358    pub fn highly_granular() -> Self {
359        SamplingProfile::from_sampling_intervals([
360            // 720x1x10s
361            SamplingInterval::new(Capacity::from_min_samples(720), 1, Duration::from_seconds(10)),
362            // 3600x1x1m
363            SamplingInterval::new(Capacity::from_min_samples(3600), 1, Duration::from_minutes(1)),
364        ])
365    }
366
367    /// Constructs a granular sampling profile with decently high fidelity.
368    pub fn granular() -> Self {
369        SamplingProfile::from_sampling_intervals([
370            // 720x1x10s
371            SamplingInterval::new(Capacity::from_min_samples(720), 1, Duration::from_seconds(10)),
372            // 600x1x1m
373            SamplingInterval::new(Capacity::from_min_samples(600), 1, Duration::from_minutes(1)),
374            // 360x1x5m
375            SamplingInterval::new(Capacity::from_min_samples(720), 1, Duration::from_minutes(5)),
376        ])
377    }
378
379    /// Constructs a sampling profile with fidelity and durability that is applicable to most
380    /// metrics.
381    pub fn balanced() -> Self {
382        SamplingProfile::from_sampling_intervals([
383            // 120x1x10s
384            SamplingInterval::new(Capacity::from_min_samples(120), 1, Duration::from_seconds(10)),
385            // 120x1x1m
386            SamplingInterval::new(Capacity::from_min_samples(120), 1, Duration::from_minutes(1)),
387            // 120x1x5m
388            SamplingInterval::new(Capacity::from_min_samples(120), 1, Duration::from_minutes(5)),
389            // 120x1x30m
390            SamplingInterval::new(Capacity::from_min_samples(120), 1, Duration::from_minutes(30)),
391        ])
392    }
393
394    /// Gets the minimum granularity of the profile.
395    pub fn granularity(&self) -> Duration {
396        self.0.iter().map(SamplingInterval::max_sampling_period).min().unwrap()
397    }
398
399    /// Gets the maximum duration of the profile.
400    pub fn duration(&self) -> Duration {
401        self.0.iter().map(SamplingInterval::duration).max().unwrap()
402    }
403
404    pub(crate) fn into_sampling_intervals(self) -> Vec1<SamplingInterval> {
405        self.0
406    }
407}
408
409impl Default for SamplingProfile {
410    fn default() -> Self {
411        SamplingProfile::balanced()
412    }
413}
414
415impl Display for SamplingProfile {
416    fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
417        use itertools::Position::{First, Last, Middle, Only};
418
419        // Avoid `join` and other sources of intermediate allocations.
420        write!(
421            formatter,
422            "{}..{}: ",
423            self.granularity().into_quanta().into_nearest_unit_display(),
424            self.duration().into_quanta().into_nearest_unit_display(),
425        )?;
426        for (pos, interval) in self.0.iter().with_position() {
427            match pos {
428                First | Middle => write!(formatter, "{} + ", interval),
429                Only | Last => write!(formatter, "{}", interval),
430            }?;
431        }
432        Ok(())
433    }
434}
435
436impl From<SamplingInterval> for SamplingProfile {
437    fn from(interval: SamplingInterval) -> Self {
438        SamplingProfile(Vec1::from_item(interval))
439    }
440}
441
442#[cfg(test)]
443mod tests {
444    use super::*;
445
446    use std::sync::mpsc::{self, Receiver, Sender};
447
448    use crate::experimental::clock::{ObservationTime, Timestamp};
449    use crate::experimental::series::Counter;
450    use crate::experimental::series::buffer::Capacity;
451
452    const SAMPLE: u64 = 1337u64;
453
454    #[test]
455    fn sampling_interval_fold_and_get_expirations() {
456        let sampling_interval =
457            SamplingInterval::new(Capacity::from_min_samples(120), 6, Duration::from_seconds(10));
458        let mut last = ObservationTime {
459            last_update_timestamp: Timestamp::from_nanos(71_000_000_000),
460            last_sample_timestamp: None,
461        };
462
463        // Tick in the same sampling period that did not have a sample.
464        let tick = last.tick(Timestamp::from_nanos(75_000_000_000), true).unwrap();
465        let expirations: Vec<_> =
466            sampling_interval.fold_and_get_expirations(tick, SAMPLE).collect();
467        assert_eq!(
468            expirations,
469            vec![IntervalExpiration::Pending(PendingInterval {
470                fill_sample_count: None,
471                sample: SAMPLE,
472            })]
473        );
474
475        // Tick in the same sampling period that already had a sample.
476        // (last sample at 75s)
477        let tick = last.tick(Timestamp::from_nanos(79_000_000_000), false).unwrap();
478        let expirations: Vec<_> =
479            sampling_interval.fold_and_get_expirations(tick, SAMPLE).collect();
480        assert_eq!(
481            expirations,
482            vec![IntervalExpiration::Pending(PendingInterval {
483                fill_sample_count: None,
484                sample: SAMPLE,
485            })]
486        );
487
488        // Tick to a new sampling period, but in the same interval. The sampling period
489        // at the start of the tick already had a sample.
490        let tick = last.tick(Timestamp::from_nanos(83_000_000_000), false).unwrap();
491        let expirations: Vec<_> =
492            sampling_interval.fold_and_get_expirations(tick, SAMPLE).collect();
493        assert_eq!(
494            expirations,
495            vec![IntervalExpiration::Pending(PendingInterval {
496                fill_sample_count: None,
497                sample: SAMPLE,
498            })]
499        );
500
501        // Tick to a new sampling period, but in the same interval. The sampling period
502        // at the start of the tick did not have a sample.
503        let tick = last.tick(Timestamp::from_nanos(91_000_000_000), true).unwrap();
504        let expirations: Vec<_> =
505            sampling_interval.fold_and_get_expirations(tick, SAMPLE).collect();
506        assert_eq!(
507            expirations,
508            vec![IntervalExpiration::Pending(PendingInterval {
509                fill_sample_count: NonZeroUsize::new(1),
510                sample: SAMPLE,
511            })]
512        );
513
514        // Tick to a new interval. The sampling period at the start of the tick already
515        // had a sample.
516        let tick = last.tick(Timestamp::from_nanos(133_000_000_000), false).unwrap();
517        let expirations: Vec<_> =
518            sampling_interval.fold_and_get_expirations(tick, SAMPLE).collect();
519        let expected = vec![
520            IntervalExpiration::Elapsed(ElapsedInterval::Occupied {
521                fill_sample_count: NonZeroUsize::new(2),
522            }),
523            IntervalExpiration::Pending(PendingInterval {
524                fill_sample_count: NonZeroUsize::new(1),
525                sample: SAMPLE,
526            }),
527        ];
528        assert_eq!(expirations, expected);
529
530        // Tick to a new interval. The sampling period at the start of the tick did not
531        // have a sample. Additionally, there are some skipped intervals in-between
532        let tick = last.tick(Timestamp::from_nanos(240_000_000_000), false).unwrap();
533        let expirations: Vec<_> =
534            sampling_interval.fold_and_get_expirations(tick, SAMPLE).collect();
535        let expected = vec![
536            IntervalExpiration::Elapsed(ElapsedInterval::Occupied {
537                fill_sample_count: NonZeroUsize::new(5),
538            }),
539            IntervalExpiration::Elapsed(ElapsedInterval::Vacant {
540                interval_count: NonZeroUsize::new(1).unwrap(),
541                fill_sample_count: NonZeroUsize::new(6).unwrap(),
542            }),
543            IntervalExpiration::Pending(PendingInterval {
544                fill_sample_count: None,
545                sample: SAMPLE,
546            }),
547        ];
548        assert_eq!(expirations, expected);
549    }
550
551    #[test]
552    fn sampling_interval_fold_and_get_expirations_at_time_boundary() {
553        let sampling_interval =
554            SamplingInterval::new(Capacity::from_min_samples(120), 1, Duration::from_seconds(10));
555        let mut last = ObservationTime {
556            last_update_timestamp: Timestamp::from_nanos(0),
557            last_sample_timestamp: None,
558        };
559
560        // Tick in the new time interval.
561        let tick = last.tick(Timestamp::from_nanos(10_000_000_000), false).unwrap();
562        let expirations: Vec<_> =
563            sampling_interval.fold_and_get_expirations(tick, PhantomData::<u64>).collect();
564        let expected = vec![
565            IntervalExpiration::Elapsed(ElapsedInterval::Occupied {
566                fill_sample_count: NonZeroUsize::new(1),
567            }),
568            IntervalExpiration::Pending(PendingInterval {
569                fill_sample_count: None,
570                sample: PhantomData::<u64>,
571            }),
572        ];
573        assert_eq!(expirations, expected);
574    }
575
576    #[derive(Clone, Debug, PartialEq)]
577    enum MockStatisticCall {
578        // Though `n` is represented as `NonZeroUsize`, it is represented as `usize` here for more
579        // fluent expressions in assertions.
580        Fill { sample: u64, n: usize },
581        Fold { sample: u64 },
582        Reset,
583        Aggregation,
584    }
585
586    #[derive(Clone, Debug)]
587    struct MockStatistic(Sender<MockStatisticCall>);
588
589    impl MockStatistic {
590        pub fn channel() -> (Self, Receiver<MockStatisticCall>) {
591            let (tx, rx) = mpsc::channel();
592            (Self(tx), rx)
593        }
594    }
595
596    impl Statistic for MockStatistic {
597        type Semantic = Counter;
598        type Sample = u64;
599        type Aggregation = u64;
600
601        fn fold(&mut self, sample: u64) -> Result<(), FoldError> {
602            self.0.send(MockStatisticCall::Fold { sample }).unwrap();
603            Ok(())
604        }
605
606        fn fill(&mut self, sample: u64, n: NonZeroUsize) -> Result<(), FoldError> {
607            // Tests assert against `n` as a `usize` instead of `NonZeroUsize`.
608            self.0.send(MockStatisticCall::Fill { sample, n: n.get() }).unwrap();
609            Ok(())
610        }
611
612        fn reset(&mut self) {
613            self.0.send(MockStatisticCall::Reset).unwrap();
614        }
615
616        fn aggregation(&self) -> Option<Self::Aggregation> {
617            self.0.send(MockStatisticCall::Aggregation).unwrap();
618            Some(100)
619        }
620    }
621
622    #[derive(Clone, Debug, Eq, PartialEq)]
623    enum MockInterpolationCall {
624        // Though `n` is represented as `NonZeroUsize`, it is represented as `usize` here for more
625        // fluent expressions in assertions.
626        Interpolate { n: usize },
627        Observe { sample: u64 },
628    }
629
630    #[derive(Clone, Debug)]
631    struct MockInterpolation(Sender<MockInterpolationCall>);
632
633    impl MockInterpolation {
634        pub fn channel() -> (Self, Receiver<MockInterpolationCall>) {
635            let (tx, rx) = mpsc::channel();
636            (Self(tx), rx)
637        }
638    }
639
640    impl Interpolation<u64> for MockInterpolation {
641        fn interpolate<F>(&self, statistic: &mut F, n: NonZeroUsize) -> Result<(), FoldError>
642        where
643            F: Statistic<Sample = u64>,
644        {
645            // Tests assert against `n` as a `usize` instead of `NonZeroUsize`.
646            self.0.send(MockInterpolationCall::Interpolate { n: n.get() }).unwrap();
647            statistic.fill(SAMPLE, n)
648        }
649
650        fn observe(&mut self, sample: u64) {
651            self.0.send(MockInterpolationCall::Observe { sample }).unwrap();
652        }
653    }
654
655    #[test]
656    fn elapsed_interval_interpolate_and_get_aggregation() {
657        let interval = ElapsedInterval::Occupied { fill_sample_count: NonZeroUsize::new(6) };
658        let mut statistic = MockStatistic::channel();
659        let mut interpolation = MockInterpolation::channel();
660        let result =
661            interval.interpolate_and_get_aggregation(&mut statistic.0, &mut interpolation.0);
662        assert!(result.is_ok());
663        assert_eq!(
664            statistic.1.try_iter().collect::<Vec<_>>(),
665            &[
666                MockStatisticCall::Fill { sample: SAMPLE, n: 6 },
667                MockStatisticCall::Aggregation,
668                MockStatisticCall::Reset,
669            ],
670        );
671        assert_eq!(
672            interpolation.1.try_iter().collect::<Vec<_>>(),
673            &[MockInterpolationCall::Interpolate { n: 6 }],
674        );
675    }
676
677    #[test]
678    fn pending_interval_fold() {
679        let interval = PendingInterval { fill_sample_count: NonZeroUsize::new(6), sample: 50u64 };
680        let mut statistic = MockStatistic::channel();
681        let mut interpolation = MockInterpolation::channel();
682        let result = interval.fold(&mut statistic.0, &mut interpolation.0);
683        assert!(result.is_ok());
684        assert_eq!(
685            statistic.1.try_iter().collect::<Vec<_>>(),
686            &[
687                MockStatisticCall::Fill { sample: SAMPLE, n: 6 },
688                MockStatisticCall::Fold { sample: 50 },
689            ],
690        );
691        assert_eq!(
692            interpolation.1.try_iter().collect::<Vec<_>>(),
693            &[
694                MockInterpolationCall::Interpolate { n: 6 },
695                MockInterpolationCall::Observe { sample: 50 },
696            ],
697        );
698    }
699}