1use itertools::Itertools;
8use num::Integer;
9use std::cmp;
10use std::fmt::{self, Debug, Display, Formatter};
11use std::marker::PhantomData;
12use std::num::NonZeroUsize;
13
14use crate::experimental::Vec1;
15use crate::experimental::clock::{Duration, DurationExt as _, Quanta, QuantaExt as _, Tick};
16use crate::experimental::series::buffer::Capacity;
17use crate::experimental::series::interpolation::Interpolation;
18use crate::experimental::series::statistic::{FoldError, Statistic, StatisticExt as _};
19
20#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
24pub enum ElapsedInterval {
25 Vacant {
26 interval_count: NonZeroUsize,
29 fill_sample_count: NonZeroUsize,
30 },
31 Occupied {
32 fill_sample_count: Option<NonZeroUsize>,
33 },
34}
35
36impl ElapsedInterval {
37 fn interpolate_and_get_aggregation<F, P>(
43 self,
44 statistic: &mut F,
45 interpolation: &mut P,
46 ) -> Result<Option<(NonZeroUsize, F::Aggregation)>, FoldError>
47 where
48 F: Statistic,
49 P: Interpolation<F::Sample>,
50 {
51 let (interval_count, fill_sample_count) = match self {
52 ElapsedInterval::Vacant { interval_count, fill_sample_count } => {
53 (interval_count, Some(fill_sample_count))
54 }
55 ElapsedInterval::Occupied { fill_sample_count } => {
56 (NonZeroUsize::MIN, fill_sample_count)
57 }
58 };
59
60 if let Some(fill_sample_count) = fill_sample_count {
61 interpolation.interpolate(statistic, fill_sample_count)?;
62 }
63 Ok(statistic.get_aggregation_and_reset().map(|aggregation| (interval_count, aggregation)))
64 }
65}
66
67#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
71pub struct PendingInterval<T> {
72 fill_sample_count: Option<NonZeroUsize>,
74 sample: T,
76}
77
78impl<T> PendingInterval<T> {
79 fn fold<F, P>(self, statistic: &mut F, interpolation: &mut P) -> Result<(), FoldError>
85 where
86 T: Clone,
87 F: Statistic<Sample = T>,
88 P: Interpolation<T>,
89 {
90 let PendingInterval { fill_sample_count, sample } = self;
91
92 if let Some(fill_sample_count) = fill_sample_count {
93 interpolation.interpolate(statistic, fill_sample_count)?;
94 }
95 statistic.fold(sample.clone())?;
96 interpolation.observe(sample);
97 Ok(())
98 }
99}
100
101impl<T> PendingInterval<PhantomData<T>> {
104 fn interpolate<F, P>(self, statistic: &mut F, interpolation: &mut P) -> Result<(), FoldError>
110 where
111 T: Clone,
112 F: Statistic<Sample = T>,
113 P: Interpolation<T>,
114 {
115 let PendingInterval { fill_sample_count, .. } = self;
116
117 if let Some(fill_sample_count) = fill_sample_count {
118 interpolation.interpolate(statistic, fill_sample_count)
119 } else {
120 Ok(())
121 }
122 }
123}
124
125#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
130pub enum IntervalExpiration<T> {
131 Elapsed(ElapsedInterval),
132 Pending(PendingInterval<T>),
133}
134
135impl<T> IntervalExpiration<T> {
136 pub(crate) fn fold_and_get_aggregation<F, P>(
137 self,
138 statistic: &mut F,
139 interpolation: &mut P,
140 ) -> Result<Option<(NonZeroUsize, F::Aggregation)>, FoldError>
141 where
142 T: Clone,
143 F: Statistic<Sample = T>,
144 P: Interpolation<T>,
145 {
146 match self {
147 IntervalExpiration::Elapsed(elapsed) => {
148 elapsed.interpolate_and_get_aggregation(statistic, interpolation)
149 }
150 IntervalExpiration::Pending(pending) => {
151 pending.fold(statistic, interpolation).map(|_| None)
152 }
153 }
154 }
155}
156
157impl<T> IntervalExpiration<PhantomData<T>> {
158 pub(crate) fn interpolate_and_get_aggregation<F, P>(
159 self,
160 statistic: &mut F,
161 interpolation: &mut P,
162 ) -> Result<Option<(NonZeroUsize, F::Aggregation)>, FoldError>
163 where
164 T: Clone,
165 F: Statistic<Sample = T>,
166 P: Interpolation<T>,
167 {
168 match self {
169 IntervalExpiration::Elapsed(elapsed) => {
170 elapsed.interpolate_and_get_aggregation(statistic, interpolation)
171 }
172 IntervalExpiration::Pending(pending) => {
173 pending.interpolate(statistic, interpolation).map(|_| None)
174 }
175 }
176 }
177}
178
179#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
201pub struct SamplingInterval {
202 capacity: Capacity,
203 sampling_period_count: u32,
204 max_sampling_period: Quanta,
205}
206
207impl SamplingInterval {
208 pub(crate) fn new(
209 capacity: Capacity,
210 sampling_period_count: u32,
211 max_sampling_period: impl Into<Duration>,
212 ) -> Self {
213 SamplingInterval {
214 capacity,
215 sampling_period_count: cmp::max(1, sampling_period_count),
216 max_sampling_period: cmp::max(1, max_sampling_period.into().into_quanta().abs()),
217 }
218 }
219
220 pub fn duration(&self) -> Duration {
222 self.max_sampling_period() * self.sampling_period_count
223 }
224
225 pub fn max_sampling_period(&self) -> Duration {
227 Duration::from_quanta(self.max_sampling_period)
228 }
229
230 pub(crate) fn capacity(&self) -> Capacity {
231 self.capacity
232 }
233
234 pub(crate) fn fold_and_get_expirations<T>(
243 &self,
244 tick: Tick,
245 sample: T,
246 ) -> impl Clone + Iterator<Item = IntervalExpiration<T>>
247 where
248 T: Clone,
249 {
250 let interval = self.max_sampling_period * Quanta::from(self.sampling_period_count);
251 let (start, end) = tick.quantize();
252 let start_has_sample = tick.start_has_sample(self.max_sampling_period);
253
254 let resumed_interval_has_elapsed = (end / interval) > (start / interval);
276 let num_skipped_intervals =
277 usize::try_from((end / interval) - (start / interval) - 1).unwrap_or(0);
278
279 let pending_interval_fill_sample_count = if resumed_interval_has_elapsed {
280 (end % interval) / self.max_sampling_period
283 } else {
284 let num_elapsed_sampling_periods =
287 (end / self.max_sampling_period) - (start / self.max_sampling_period);
288 num_elapsed_sampling_periods - if start_has_sample { 1 } else { 0 }
293 };
294 itertools::chain!(
295 resumed_interval_has_elapsed.then(|| {
296 let resumed_interval_remaining = interval - (start % interval);
300 let resumed_interval_fill_sample_count =
301 Integer::div_ceil(&resumed_interval_remaining, &self.max_sampling_period)
302 - if start_has_sample { 1 } else { 0 };
303 IntervalExpiration::Elapsed(ElapsedInterval::Occupied {
304 fill_sample_count: NonZeroUsize::new(
305 usize::try_from(resumed_interval_fill_sample_count).unwrap_or(0),
306 ),
307 })
308 }),
309 (num_skipped_intervals > 0).then(|| IntervalExpiration::Elapsed(
310 ElapsedInterval::Vacant {
311 interval_count: NonZeroUsize::new(num_skipped_intervals).unwrap(),
313 fill_sample_count: NonZeroUsize::new(
314 usize::try_from(self.sampling_period_count).unwrap_or(usize::MAX)
315 )
316 .unwrap_or(NonZeroUsize::MIN),
317 }
318 )),
319 Some(IntervalExpiration::Pending(PendingInterval {
322 fill_sample_count: NonZeroUsize::new(
323 usize::try_from(pending_interval_fill_sample_count).unwrap_or(0)
324 ),
325 sample,
326 })),
327 )
328 }
329}
330
331impl Display for SamplingInterval {
332 fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
333 write!(
334 formatter,
335 "{}x{}x{}",
336 self.capacity,
337 self.sampling_period_count,
338 self.max_sampling_period.into_nearest_unit_display(),
339 )
340 }
341}
342
343#[derive(Clone, Debug)]
345pub struct SamplingProfile(Vec1<SamplingInterval>);
346
347impl SamplingProfile {
348 fn from_sampling_intervals<I>(intervals: I) -> Self
349 where
350 Vec1<SamplingInterval>: From<I>,
351 {
352 SamplingProfile(intervals.into())
353 }
354
355 pub fn highly_granular() -> Self {
359 SamplingProfile::from_sampling_intervals([
360 SamplingInterval::new(Capacity::from_min_samples(720), 1, Duration::from_seconds(10)),
362 SamplingInterval::new(Capacity::from_min_samples(3600), 1, Duration::from_minutes(1)),
364 ])
365 }
366
367 pub fn granular() -> Self {
369 SamplingProfile::from_sampling_intervals([
370 SamplingInterval::new(Capacity::from_min_samples(720), 1, Duration::from_seconds(10)),
372 SamplingInterval::new(Capacity::from_min_samples(600), 1, Duration::from_minutes(1)),
374 SamplingInterval::new(Capacity::from_min_samples(720), 1, Duration::from_minutes(5)),
376 ])
377 }
378
379 pub fn balanced() -> Self {
382 SamplingProfile::from_sampling_intervals([
383 SamplingInterval::new(Capacity::from_min_samples(120), 1, Duration::from_seconds(10)),
385 SamplingInterval::new(Capacity::from_min_samples(120), 1, Duration::from_minutes(1)),
387 SamplingInterval::new(Capacity::from_min_samples(120), 1, Duration::from_minutes(5)),
389 SamplingInterval::new(Capacity::from_min_samples(120), 1, Duration::from_minutes(30)),
391 ])
392 }
393
394 pub fn granularity(&self) -> Duration {
396 self.0.iter().map(SamplingInterval::max_sampling_period).min().unwrap()
397 }
398
399 pub fn duration(&self) -> Duration {
401 self.0.iter().map(SamplingInterval::duration).max().unwrap()
402 }
403
404 pub(crate) fn into_sampling_intervals(self) -> Vec1<SamplingInterval> {
405 self.0
406 }
407}
408
409impl Default for SamplingProfile {
410 fn default() -> Self {
411 SamplingProfile::balanced()
412 }
413}
414
415impl Display for SamplingProfile {
416 fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
417 use itertools::Position::{First, Last, Middle, Only};
418
419 write!(
421 formatter,
422 "{}..{}: ",
423 self.granularity().into_quanta().into_nearest_unit_display(),
424 self.duration().into_quanta().into_nearest_unit_display(),
425 )?;
426 for (pos, interval) in self.0.iter().with_position() {
427 match pos {
428 First | Middle => write!(formatter, "{} + ", interval),
429 Only | Last => write!(formatter, "{}", interval),
430 }?;
431 }
432 Ok(())
433 }
434}
435
436impl From<SamplingInterval> for SamplingProfile {
437 fn from(interval: SamplingInterval) -> Self {
438 SamplingProfile(Vec1::from_item(interval))
439 }
440}
441
442#[cfg(test)]
443mod tests {
444 use super::*;
445
446 use std::sync::mpsc::{self, Receiver, Sender};
447
448 use crate::experimental::clock::{ObservationTime, Timestamp};
449 use crate::experimental::series::Counter;
450 use crate::experimental::series::buffer::Capacity;
451
452 const SAMPLE: u64 = 1337u64;
453
454 #[test]
455 fn sampling_interval_fold_and_get_expirations() {
456 let sampling_interval =
457 SamplingInterval::new(Capacity::from_min_samples(120), 6, Duration::from_seconds(10));
458 let mut last = ObservationTime {
459 last_update_timestamp: Timestamp::from_nanos(71_000_000_000),
460 last_sample_timestamp: None,
461 };
462
463 let tick = last.tick(Timestamp::from_nanos(75_000_000_000), true).unwrap();
465 let expirations: Vec<_> =
466 sampling_interval.fold_and_get_expirations(tick, SAMPLE).collect();
467 assert_eq!(
468 expirations,
469 vec![IntervalExpiration::Pending(PendingInterval {
470 fill_sample_count: None,
471 sample: SAMPLE,
472 })]
473 );
474
475 let tick = last.tick(Timestamp::from_nanos(79_000_000_000), false).unwrap();
478 let expirations: Vec<_> =
479 sampling_interval.fold_and_get_expirations(tick, SAMPLE).collect();
480 assert_eq!(
481 expirations,
482 vec![IntervalExpiration::Pending(PendingInterval {
483 fill_sample_count: None,
484 sample: SAMPLE,
485 })]
486 );
487
488 let tick = last.tick(Timestamp::from_nanos(83_000_000_000), false).unwrap();
491 let expirations: Vec<_> =
492 sampling_interval.fold_and_get_expirations(tick, SAMPLE).collect();
493 assert_eq!(
494 expirations,
495 vec![IntervalExpiration::Pending(PendingInterval {
496 fill_sample_count: None,
497 sample: SAMPLE,
498 })]
499 );
500
501 let tick = last.tick(Timestamp::from_nanos(91_000_000_000), true).unwrap();
504 let expirations: Vec<_> =
505 sampling_interval.fold_and_get_expirations(tick, SAMPLE).collect();
506 assert_eq!(
507 expirations,
508 vec![IntervalExpiration::Pending(PendingInterval {
509 fill_sample_count: NonZeroUsize::new(1),
510 sample: SAMPLE,
511 })]
512 );
513
514 let tick = last.tick(Timestamp::from_nanos(133_000_000_000), false).unwrap();
517 let expirations: Vec<_> =
518 sampling_interval.fold_and_get_expirations(tick, SAMPLE).collect();
519 let expected = vec![
520 IntervalExpiration::Elapsed(ElapsedInterval::Occupied {
521 fill_sample_count: NonZeroUsize::new(2),
522 }),
523 IntervalExpiration::Pending(PendingInterval {
524 fill_sample_count: NonZeroUsize::new(1),
525 sample: SAMPLE,
526 }),
527 ];
528 assert_eq!(expirations, expected);
529
530 let tick = last.tick(Timestamp::from_nanos(240_000_000_000), false).unwrap();
533 let expirations: Vec<_> =
534 sampling_interval.fold_and_get_expirations(tick, SAMPLE).collect();
535 let expected = vec![
536 IntervalExpiration::Elapsed(ElapsedInterval::Occupied {
537 fill_sample_count: NonZeroUsize::new(5),
538 }),
539 IntervalExpiration::Elapsed(ElapsedInterval::Vacant {
540 interval_count: NonZeroUsize::new(1).unwrap(),
541 fill_sample_count: NonZeroUsize::new(6).unwrap(),
542 }),
543 IntervalExpiration::Pending(PendingInterval {
544 fill_sample_count: None,
545 sample: SAMPLE,
546 }),
547 ];
548 assert_eq!(expirations, expected);
549 }
550
551 #[test]
552 fn sampling_interval_fold_and_get_expirations_at_time_boundary() {
553 let sampling_interval =
554 SamplingInterval::new(Capacity::from_min_samples(120), 1, Duration::from_seconds(10));
555 let mut last = ObservationTime {
556 last_update_timestamp: Timestamp::from_nanos(0),
557 last_sample_timestamp: None,
558 };
559
560 let tick = last.tick(Timestamp::from_nanos(10_000_000_000), false).unwrap();
562 let expirations: Vec<_> =
563 sampling_interval.fold_and_get_expirations(tick, PhantomData::<u64>).collect();
564 let expected = vec![
565 IntervalExpiration::Elapsed(ElapsedInterval::Occupied {
566 fill_sample_count: NonZeroUsize::new(1),
567 }),
568 IntervalExpiration::Pending(PendingInterval {
569 fill_sample_count: None,
570 sample: PhantomData::<u64>,
571 }),
572 ];
573 assert_eq!(expirations, expected);
574 }
575
576 #[derive(Clone, Debug, PartialEq)]
577 enum MockStatisticCall {
578 Fill { sample: u64, n: usize },
581 Fold { sample: u64 },
582 Reset,
583 Aggregation,
584 }
585
586 #[derive(Clone, Debug)]
587 struct MockStatistic(Sender<MockStatisticCall>);
588
589 impl MockStatistic {
590 pub fn channel() -> (Self, Receiver<MockStatisticCall>) {
591 let (tx, rx) = mpsc::channel();
592 (Self(tx), rx)
593 }
594 }
595
596 impl Statistic for MockStatistic {
597 type Semantic = Counter;
598 type Sample = u64;
599 type Aggregation = u64;
600
601 fn fold(&mut self, sample: u64) -> Result<(), FoldError> {
602 self.0.send(MockStatisticCall::Fold { sample }).unwrap();
603 Ok(())
604 }
605
606 fn fill(&mut self, sample: u64, n: NonZeroUsize) -> Result<(), FoldError> {
607 self.0.send(MockStatisticCall::Fill { sample, n: n.get() }).unwrap();
609 Ok(())
610 }
611
612 fn reset(&mut self) {
613 self.0.send(MockStatisticCall::Reset).unwrap();
614 }
615
616 fn aggregation(&self) -> Option<Self::Aggregation> {
617 self.0.send(MockStatisticCall::Aggregation).unwrap();
618 Some(100)
619 }
620 }
621
622 #[derive(Clone, Debug, Eq, PartialEq)]
623 enum MockInterpolationCall {
624 Interpolate { n: usize },
627 Observe { sample: u64 },
628 }
629
630 #[derive(Clone, Debug)]
631 struct MockInterpolation(Sender<MockInterpolationCall>);
632
633 impl MockInterpolation {
634 pub fn channel() -> (Self, Receiver<MockInterpolationCall>) {
635 let (tx, rx) = mpsc::channel();
636 (Self(tx), rx)
637 }
638 }
639
640 impl Interpolation<u64> for MockInterpolation {
641 fn interpolate<F>(&self, statistic: &mut F, n: NonZeroUsize) -> Result<(), FoldError>
642 where
643 F: Statistic<Sample = u64>,
644 {
645 self.0.send(MockInterpolationCall::Interpolate { n: n.get() }).unwrap();
647 statistic.fill(SAMPLE, n)
648 }
649
650 fn observe(&mut self, sample: u64) {
651 self.0.send(MockInterpolationCall::Observe { sample }).unwrap();
652 }
653 }
654
655 #[test]
656 fn elapsed_interval_interpolate_and_get_aggregation() {
657 let interval = ElapsedInterval::Occupied { fill_sample_count: NonZeroUsize::new(6) };
658 let mut statistic = MockStatistic::channel();
659 let mut interpolation = MockInterpolation::channel();
660 let result =
661 interval.interpolate_and_get_aggregation(&mut statistic.0, &mut interpolation.0);
662 assert!(result.is_ok());
663 assert_eq!(
664 statistic.1.try_iter().collect::<Vec<_>>(),
665 &[
666 MockStatisticCall::Fill { sample: SAMPLE, n: 6 },
667 MockStatisticCall::Aggregation,
668 MockStatisticCall::Reset,
669 ],
670 );
671 assert_eq!(
672 interpolation.1.try_iter().collect::<Vec<_>>(),
673 &[MockInterpolationCall::Interpolate { n: 6 }],
674 );
675 }
676
677 #[test]
678 fn pending_interval_fold() {
679 let interval = PendingInterval { fill_sample_count: NonZeroUsize::new(6), sample: 50u64 };
680 let mut statistic = MockStatistic::channel();
681 let mut interpolation = MockInterpolation::channel();
682 let result = interval.fold(&mut statistic.0, &mut interpolation.0);
683 assert!(result.is_ok());
684 assert_eq!(
685 statistic.1.try_iter().collect::<Vec<_>>(),
686 &[
687 MockStatisticCall::Fill { sample: SAMPLE, n: 6 },
688 MockStatisticCall::Fold { sample: 50 },
689 ],
690 );
691 assert_eq!(
692 interpolation.1.try_iter().collect::<Vec<_>>(),
693 &[
694 MockInterpolationCall::Interpolate { n: 6 },
695 MockInterpolationCall::Observe { sample: 50 },
696 ],
697 );
698 }
699}