use num::{Num, NumCast, Zero};
use std::cmp;
use std::fmt::Debug;
use std::num::NonZeroUsize;
use thiserror::Error;
use crate::experimental::series::buffer::BufferStrategy;
use crate::experimental::series::interpolation::Interpolation;
use crate::experimental::series::{BitSet, Counter, DataSemantic, Fill, Gauge, Sampler};
pub mod recipe {
use crate::experimental::series::statistic::{ArithmeticMean, Transform};
pub type ArithmeticMeanTransform<T, A> = Transform<ArithmeticMean<T>, A>;
}
#[derive(Clone, Copy, Debug, Error, Eq, PartialEq)]
#[error("overflow in statistic computation")]
pub struct OverflowError;
pub trait Statistic: Clone + Fill<Self::Sample> {
type Semantic: DataSemantic;
type Sample: Clone;
type Aggregation: Clone;
fn reset(&mut self);
fn aggregation(&self) -> Option<Self::Aggregation>;
}
pub type Semantic<F> = <F as Statistic>::Semantic;
pub type Sample<F> = <F as Statistic>::Sample;
pub type Aggregation<F> = <F as Statistic>::Aggregation;
impl<F, P> BufferStrategy<F::Aggregation, P> for F
where
F: Statistic,
F::Semantic: BufferStrategy<F::Aggregation, P>,
P: Interpolation,
{
type Buffer = <F::Semantic as BufferStrategy<F::Aggregation, P>>::Buffer;
}
pub trait StatisticFor<P>: BufferStrategy<Self::Aggregation, P> + Statistic
where
P: Interpolation<FillSample<Self> = Self::Sample>,
{
}
impl<F, P> StatisticFor<P> for F
where
F: BufferStrategy<Self::Aggregation, P> + Statistic,
P: Interpolation<FillSample<Self> = Self::Sample>,
{
}
pub trait StatisticExt: Statistic {
fn get_aggregation_and_reset(&mut self) -> Option<Self::Aggregation> {
let aggregation = self.aggregation();
self.reset();
aggregation
}
}
impl<F> StatisticExt for F where F: Statistic {}
#[derive(Clone, Debug)]
pub struct ArithmeticMean<T> {
sum: T,
n: u64,
}
impl<T> ArithmeticMean<T> {
pub fn with_sum(sum: T) -> Self {
ArithmeticMean { sum, n: 0 }
}
fn increment(&mut self, m: u64) -> Result<(), OverflowError> {
self.n.checked_add(m).inspect(|sum| self.n = *sum).map(|_| ()).ok_or(OverflowError)
}
}
impl<T> Default for ArithmeticMean<T>
where
T: Zero,
{
fn default() -> Self {
ArithmeticMean::with_sum(T::zero())
}
}
impl<T> Fill<T> for ArithmeticMean<T>
where
Self: Sampler<T, Error = OverflowError>,
T: Clone + Num + NumCast,
{
fn fill(&mut self, sample: T, n: NonZeroUsize) -> Result<(), Self::Error> {
Ok(match num::cast::<_, T>(n.get()) {
Some(m) => {
self.fold(sample * m)?;
self.increment((n.get() as u64) - 1)?;
}
_ => {
for _ in 0..n.get() {
self.fold(sample.clone())?;
}
}
})
}
}
impl Sampler<f32> for ArithmeticMean<f32> {
type Error = OverflowError;
fn fold(&mut self, sample: f32) -> Result<(), Self::Error> {
self.sum = match sample {
_ if sample.is_nan() => self.sum,
sample if sample.is_infinite() => sample,
sample => self.sum + sample,
};
self.increment(1)
}
}
impl Statistic for ArithmeticMean<f32> {
type Semantic = Gauge;
type Sample = f32;
type Aggregation = f32;
fn reset(&mut self) {
*self = Default::default();
}
fn aggregation(&self) -> Option<Self::Aggregation> {
let aggregation = (self.n > 0).then(|| self.sum / (self.n as f32));
aggregation
}
}
#[derive(Clone, Debug)]
pub struct Sum<T> {
sum: T,
}
impl<T> Sum<T> {
pub fn with_sum(sum: T) -> Self {
Sum { sum }
}
}
impl<T> Default for Sum<T>
where
T: Zero,
{
fn default() -> Self {
Sum::with_sum(T::zero())
}
}
impl<T> Fill<T> for Sum<T>
where
Self: Sampler<T>,
T: Clone + Num + NumCast,
{
fn fill(&mut self, sample: T, n: NonZeroUsize) -> Result<(), Self::Error> {
if let Some(n) = num::cast::<_, T>(n.get()) {
self.fold(sample * n)
} else {
Ok(for _ in 0..n.get() {
self.fold(sample.clone())?;
})
}
}
}
impl Sampler<u64> for Sum<u64> {
type Error = OverflowError;
fn fold(&mut self, sample: u64) -> Result<(), Self::Error> {
self.sum.checked_add(sample).inspect(|sum| self.sum = *sum).map(|_| ()).ok_or(OverflowError)
}
}
impl Statistic for Sum<u64> {
type Semantic = Gauge;
type Sample = u64;
type Aggregation = u64;
fn reset(&mut self) {
*self = Default::default();
}
fn aggregation(&self) -> Option<Self::Aggregation> {
let sum = self.sum;
Some(sum)
}
}
#[derive(Clone, Debug, Default)]
pub struct Max<T> {
max: Option<T>,
}
impl<T> Max<T> {
pub fn with_max(max: T) -> Self {
Max { max: Some(max) }
}
}
impl<T> Fill<T> for Max<T>
where
Self: Sampler<T, Error = OverflowError>,
T: Num + NumCast,
{
fn fill(&mut self, sample: T, _n: NonZeroUsize) -> Result<(), Self::Error> {
self.fold(sample)
}
}
impl<T> Sampler<T> for Max<T>
where
T: Ord + Copy + Num,
{
type Error = OverflowError;
fn fold(&mut self, sample: T) -> Result<(), Self::Error> {
self.max = Some(match self.max {
Some(max) => cmp::max(max, sample),
_ => sample,
});
Ok(())
}
}
impl<T> Statistic for Max<T>
where
T: Ord + Copy + Zero + Num + NumCast + Default,
{
type Semantic = Gauge;
type Sample = T;
type Aggregation = T;
fn reset(&mut self) {
*self = Default::default();
}
fn aggregation(&self) -> Option<Self::Aggregation> {
self.max
}
}
#[derive(Clone, Debug)]
pub struct Union<T> {
bits: T,
}
impl<T> Union<T> {
pub fn with_bits(bits: T) -> Self {
Union { bits }
}
}
impl<T> Default for Union<T>
where
T: Zero,
{
fn default() -> Self {
Union::with_bits(T::zero())
}
}
impl<T> Fill<T> for Union<T>
where
Self: Sampler<T, Error = OverflowError>,
T: Num + NumCast,
{
fn fill(&mut self, sample: T, _n: NonZeroUsize) -> Result<(), Self::Error> {
self.fold(sample)
}
}
impl Sampler<u64> for Union<u64> {
type Error = OverflowError;
fn fold(&mut self, sample: u64) -> Result<(), Self::Error> {
self.bits = self.bits | sample;
Ok(())
}
}
impl Statistic for Union<u64> {
type Semantic = BitSet;
type Sample = u64;
type Aggregation = u64;
fn reset(&mut self) {
*self = Default::default();
}
fn aggregation(&self) -> Option<Self::Aggregation> {
Some(self.bits)
}
}
#[derive(Clone, Debug)]
pub struct LatchMax<T> {
last: Option<T>,
max: Option<T>,
sum: T,
}
impl<T> LatchMax<T> {
pub fn with_max(max: T) -> Self
where
T: Zero,
{
LatchMax::with_max_and_sum(max, T::zero())
}
pub fn with_max_and_sum(max: T, sum: T) -> Self {
LatchMax { last: None, max: Some(max), sum }
}
}
impl<T> Default for LatchMax<T>
where
T: Zero,
{
fn default() -> Self {
LatchMax { last: None, max: None, sum: T::zero() }
}
}
impl<T> Fill<T> for LatchMax<T>
where
Self: Sampler<T>,
{
fn fill(&mut self, sample: T, _n: NonZeroUsize) -> Result<(), Self::Error> {
self.fold(sample)
}
}
impl Sampler<u64> for LatchMax<u64> {
type Error = OverflowError;
fn fold(&mut self, sample: u64) -> Result<(), Self::Error> {
match self.last {
Some(last) if sample < last => {
self.sum = self.sum.checked_add(last).ok_or(OverflowError)?;
}
_ => {}
}
self.last = Some(sample);
let sum = sample.checked_add(self.sum).ok_or(OverflowError)?;
self.max = Some(match self.max {
Some(max) => cmp::max(max, sum),
_ => sum,
});
Ok(())
}
}
impl Statistic for LatchMax<u64> {
type Semantic = Counter;
type Sample = u64;
type Aggregation = u64;
fn reset(&mut self) {}
fn aggregation(&self) -> Option<Self::Aggregation> {
self.max
}
}
#[derive(Clone, Copy, Debug)]
pub struct PostAggregation<F, R> {
statistic: F,
transform: R,
}
pub type Transform<F, A> = PostAggregation<F, fn(Aggregation<F>) -> A>;
impl<F, R> PostAggregation<F, R>
where
F: Default,
{
pub fn from_transform(transform: R) -> Self {
PostAggregation { statistic: F::default(), transform }
}
}
impl<T, F, R> Fill<T> for PostAggregation<F, R>
where
F: Fill<T>,
{
fn fill(&mut self, sample: T, n: NonZeroUsize) -> Result<(), Self::Error> {
self.statistic.fill(sample, n)
}
}
impl<T, F, R> Sampler<T> for PostAggregation<F, R>
where
F: Sampler<T>,
{
type Error = F::Error;
fn fold(&mut self, sample: T) -> Result<(), Self::Error> {
self.statistic.fold(sample)
}
}
impl<A, F, R> Statistic for PostAggregation<F, R>
where
F: Statistic,
R: Clone + Fn(F::Aggregation) -> A,
A: Clone,
{
type Semantic = F::Semantic;
type Sample = F::Sample;
type Aggregation = A;
fn reset(&mut self) {
self.statistic.reset()
}
fn aggregation(&self) -> Option<Self::Aggregation> {
self.statistic.aggregation().map(|aggregation| (self.transform)(aggregation))
}
}
#[derive(Clone, Copy, Debug)]
pub struct Reset<F, R> {
statistic: F,
reset: R,
}
impl<F, R> Reset<F, R>
where
F: Statistic,
R: FnMut() -> F,
{
pub fn with(statistic: F, reset: R) -> Self {
Reset { statistic, reset }
}
}
impl<F, R, T> Fill<T> for Reset<F, R>
where
F: Fill<T>,
{
fn fill(&mut self, sample: T, n: NonZeroUsize) -> Result<(), Self::Error> {
self.statistic.fill(sample, n)
}
}
impl<F, R, T> Sampler<T> for Reset<F, R>
where
F: Sampler<T>,
{
type Error = F::Error;
fn fold(&mut self, sample: T) -> Result<(), Self::Error> {
self.statistic.fold(sample)
}
}
impl<F, R> Statistic for Reset<F, R>
where
F: Statistic,
R: Clone + FnMut() -> F,
{
type Semantic = F::Semantic;
type Sample = F::Sample;
type Aggregation = F::Aggregation;
fn reset(&mut self) {
self.statistic = (self.reset)();
}
fn aggregation(&self) -> Option<Self::Aggregation> {
self.statistic.aggregation()
}
}
#[cfg(test)]
mod tests {
use std::num::NonZeroUsize;
use crate::experimental::series::statistic::{
ArithmeticMean, LatchMax, Max, OverflowError, PostAggregation, Reset, Statistic, Sum, Union,
};
use crate::experimental::series::{Fill, Sampler};
#[test]
fn arithmetic_mean_aggregation() {
let mut mean = ArithmeticMean::<f32>::default();
mean.fold(1.0).unwrap();
mean.fold(1.0).unwrap();
mean.fold(1.0).unwrap();
let aggregation = mean.aggregation().unwrap();
assert!(aggregation > 0.99 && aggregation < 1.01); let mut mean = ArithmeticMean::<f32>::default();
mean.fold(0.0).unwrap();
mean.fold(1.0).unwrap();
mean.fold(2.0).unwrap();
let aggregation = mean.aggregation().unwrap();
assert!(aggregation > 0.99 && aggregation < 1.01); }
#[test]
fn arithmetic_mean_aggregation_fill() {
let mut mean = ArithmeticMean::<f32>::default();
mean.fill(1.0, NonZeroUsize::new(1000).unwrap()).unwrap();
let aggregation = mean.aggregation().unwrap();
assert!(aggregation > 0.99 && aggregation < 1.01); }
#[test]
fn arithmetic_mean_count_overflow() {
let mut mean = ArithmeticMean::<f32> { sum: 1.0, n: u64::MAX };
let result = mean.fold(1.0);
assert_eq!(result, Err(OverflowError));
}
#[test]
fn sum_aggregation() {
let mut sum = Sum::<u64>::default();
sum.fold(1).unwrap();
sum.fold(1).unwrap();
sum.fold(1).unwrap();
let aggregation = sum.aggregation().unwrap();
assert_eq!(aggregation, 3);
let mut sum = Sum::<u64>::default();
sum.fold(0).unwrap();
sum.fold(1).unwrap();
sum.fold(2).unwrap();
let aggregation = sum.aggregation().unwrap();
assert_eq!(aggregation, 3);
}
#[test]
fn sum_aggregation_fill() {
let mut sum = Sum::<u64>::default();
sum.fill(10, NonZeroUsize::new(1000).unwrap()).unwrap();
let aggregation = sum.aggregation().unwrap();
assert_eq!(aggregation, 10_000);
}
#[test]
fn sum_overflow() {
let mut sum = Sum::<u64> { sum: u64::MAX };
let result = sum.fold(1);
assert_eq!(result, Err(OverflowError));
}
#[test]
fn max_aggregation() {
let mut max = Max::<u64>::default();
max.fold(0).unwrap();
max.fold(1337).unwrap();
max.fold(42).unwrap();
let aggregation = max.aggregation().unwrap();
assert_eq!(aggregation, 1337);
}
#[test]
fn max_aggregation_fill() {
let mut max = Max::<u64>::default();
max.fill(42, NonZeroUsize::new(1000).unwrap()).unwrap();
let aggregation = max.aggregation().unwrap();
assert_eq!(aggregation, 42);
}
#[test]
fn union_aggregation() {
let mut value = Union::<u64>::default();
value.fold(1 << 1).unwrap();
value.fold(1 << 3).unwrap();
value.fold(1 << 5).unwrap();
let aggregation = value.aggregation().unwrap();
assert_eq!(aggregation, 0b101010);
}
#[test]
fn union_aggregation_fill() {
let mut value = Union::<u64>::default();
value.fill(1 << 2, NonZeroUsize::new(1000).unwrap()).unwrap();
let aggregation = value.aggregation().unwrap();
assert_eq!(aggregation, 0b100);
}
#[test]
fn latch_max_aggregation() {
let mut max = LatchMax::<u64>::default();
max.fold(1).unwrap();
max.fold(1).unwrap();
max.fold(1).unwrap();
let aggregation = max.aggregation().unwrap();
assert_eq!(aggregation, 1);
let mut max = LatchMax::<u64>::default();
max.fold(0).unwrap();
max.fold(1).unwrap();
max.fold(2).unwrap();
let aggregation = max.aggregation().unwrap();
assert_eq!(aggregation, 2);
let mut max = LatchMax::<u64>::default();
max.fold(1).unwrap();
max.fold(5).unwrap();
max.fold(6).unwrap();
max.fold(2).unwrap(); max.fold(9).unwrap();
max.fold(3).unwrap(); max.fold(5).unwrap();
let aggregation = max.aggregation().unwrap();
assert_eq!(aggregation, 20);
}
#[test]
fn latch_max_aggregation_fill() {
let mut max = LatchMax::<u64>::default();
max.fill(10, NonZeroUsize::new(1000).unwrap()).unwrap();
let aggregation = max.aggregation().unwrap();
assert_eq!(aggregation, 10);
}
#[test]
fn latch_max_overflow_max() {
let mut max = LatchMax::<u64>::default();
max.fold(1).unwrap();
max.fold(0).unwrap(); let result = max.fold(u64::MAX); assert_eq!(result, Err(OverflowError));
}
#[test]
fn post_sum_aggregation() {
let mut sum = PostAggregation::<Sum<u64>, _>::from_transform(|sum| sum + 1);
sum.fold(0).unwrap();
sum.fold(1).unwrap();
sum.fold(2).unwrap();
let aggregation = sum.aggregation().unwrap();
assert_eq!(aggregation, 4);
}
#[test]
fn reset_with_function() {
let mut sum = Reset::with(Sum::<u64>::default(), || Sum::with_sum(3));
assert_eq!(sum.aggregation().unwrap(), 0);
sum.fold(1).unwrap();
assert_eq!(sum.aggregation().unwrap(), 1);
sum.reset();
assert_eq!(sum.aggregation().unwrap(), 3);
sum.fold(1).unwrap();
assert_eq!(sum.aggregation().unwrap(), 4);
}
}