pub(crate) mod buffer;
mod interval;
pub mod interpolation;
pub mod statistic;
use derivative::Derivative;
use std::convert::Infallible;
use std::fmt::{Debug, Display};
use std::io;
use std::marker::PhantomData;
use std::num::NonZeroUsize;
use thiserror::Error;
use crate::experimental::clock::{
MonotonicityError, ObservationTime, Tick, TimedSample, Timestamp, TimestampExt,
};
use crate::experimental::series::buffer::{
Buffer, BufferStrategy, DeltaSimple8bRle, DeltaZigZagSimple8bRle, RingBuffer, Simple8bRle,
Uncompressed, ZigzagSimple8bRle,
};
use crate::experimental::series::interpolation::{
Constant, Interpolation, InterpolationFor, InterpolationState, LastAggregation, LastSample,
};
use crate::experimental::series::statistic::{OverflowError, PostAggregation, Statistic};
use crate::experimental::Vec1;
pub use crate::experimental::series::interval::{SamplingInterval, SamplingProfile};
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum FoldError {
#[error(transparent)]
Io(#[from] io::Error),
#[error(transparent)]
Monotonicity(#[from] MonotonicityError),
#[error(transparent)]
Overflow(#[from] OverflowError),
}
impl From<Infallible> for FoldError {
fn from(_: Infallible) -> Self {
unreachable!()
}
}
pub trait Sampler<T> {
type Error;
fn fold(&mut self, sample: T) -> Result<(), Self::Error>;
}
pub trait Fill<T>: Sampler<T> {
fn fill(&mut self, sample: T, n: NonZeroUsize) -> Result<(), Self::Error>;
}
pub trait Interpolator {
type Error;
fn interpolate(&mut self, timestamp: Timestamp) -> Result<(), Self::Error>;
fn interpolate_and_get_buffers(
&mut self,
timestamp: Timestamp,
) -> Result<SerializedBuffer, Self::Error>;
}
pub trait MatrixSampler<T>:
Interpolator<Error = FoldError> + Sampler<TimedSample<T>, Error = FoldError>
{
}
pub trait DataSemantic {
fn display() -> impl Display;
}
#[derive(Debug)]
pub enum Counter {}
impl BufferStrategy<u64, LastAggregation> for Counter {
type Buffer = DeltaSimple8bRle;
}
impl BufferStrategy<u64, LastSample> for Counter {
type Buffer = DeltaSimple8bRle;
}
impl DataSemantic for Counter {
fn display() -> impl Display {
"counter"
}
}
#[derive(Debug)]
pub enum Gauge {}
impl<P> BufferStrategy<f32, P> for Gauge
where
P: Interpolation,
{
type Buffer = Uncompressed<f32>;
}
impl BufferStrategy<i64, Constant> for Gauge {
type Buffer = ZigzagSimple8bRle;
}
impl BufferStrategy<i64, LastAggregation> for Gauge {
type Buffer = DeltaZigZagSimple8bRle;
}
impl BufferStrategy<i64, LastSample> for Gauge {
type Buffer = DeltaZigZagSimple8bRle;
}
impl BufferStrategy<u64, Constant> for Gauge {
type Buffer = Simple8bRle;
}
impl BufferStrategy<u64, LastAggregation> for Gauge {
type Buffer = DeltaZigZagSimple8bRle;
}
impl BufferStrategy<u64, LastSample> for Gauge {
type Buffer = DeltaZigZagSimple8bRle;
}
impl DataSemantic for Gauge {
fn display() -> impl Display {
"gauge"
}
}
#[derive(Debug)]
pub enum BitSet {}
impl<A, P> BufferStrategy<A, P> for BitSet
where
Simple8bRle: RingBuffer<A>,
P: Interpolation,
{
type Buffer = Simple8bRle;
}
impl DataSemantic for BitSet {
fn display() -> impl Display {
"bitset"
}
}
#[derive(Clone, Debug)]
struct SerializedTimeSeries {
interval: SamplingInterval,
data: Vec<u8>,
}
impl SerializedTimeSeries {
pub fn interval(&self) -> &SamplingInterval {
&self.interval
}
pub fn data(&self) -> &[u8] {
self.data.as_slice()
}
}
#[derive(Clone, Debug)]
struct TimeSeries<F>
where
F: Statistic,
{
interval: SamplingInterval,
statistic: F,
}
impl<F> TimeSeries<F>
where
F: Statistic,
{
pub fn new(interval: SamplingInterval) -> Self
where
F: Default,
{
TimeSeries { interval, statistic: F::default() }
}
pub const fn with_statistic(interval: SamplingInterval, statistic: F) -> Self {
TimeSeries { interval, statistic }
}
#[must_use]
fn interpolate_and_get_aggregations<'i, P>(
&'i mut self,
interpolation: &'i mut P,
tick: Tick,
) -> impl 'i + Iterator<Item = Result<F::Aggregation, F::Error>>
where
P: InterpolationState<F::Aggregation, FillSample = F::Sample>,
{
self.interval.fold_and_get_expirations(tick, PhantomData::<F::Sample>).flat_map(
move |expiration| {
expiration
.interpolate_and_get_aggregation(&mut self.statistic, interpolation)
.transpose()
},
)
}
#[must_use]
fn fold_and_get_aggregations<'i, P>(
&'i mut self,
interpolation: &'i mut P,
tick: Tick,
sample: F::Sample,
) -> impl 'i + Iterator<Item = Result<F::Aggregation, F::Error>>
where
P: InterpolationState<F::Aggregation, FillSample = F::Sample>,
{
self.interval.fold_and_get_expirations(tick, sample).flat_map(move |expiration| {
expiration.fold_and_get_aggregation(&mut self.statistic, interpolation).transpose()
})
}
pub fn interval(&self) -> &SamplingInterval {
&self.interval
}
}
impl<F, R, A> TimeSeries<PostAggregation<F, R>>
where
F: Default + Statistic,
R: Clone + Fn(F::Aggregation) -> A,
A: Clone,
{
pub fn with_transform(interval: SamplingInterval, transform: R) -> Self {
TimeSeries { interval, statistic: PostAggregation::from_transform(transform) }
}
}
#[derive(Derivative)]
#[derivative(
Clone(bound = "F: Clone, Buffer<F, P>: Clone, P::State<F>: Clone,"),
Debug(bound = "F: Debug,
F::Sample: Debug,
F::Aggregation: Debug,
Buffer<F, P>: Debug,
P::State<F>: Debug,")
)]
struct BufferedTimeSeries<F, P>
where
F: BufferStrategy<F::Aggregation, P> + Statistic,
P: Interpolation<FillSample<F> = F::Sample>,
{
buffer: Buffer<F, P>,
interpolation: P::State<F>,
series: TimeSeries<F>,
}
impl<F, P> BufferedTimeSeries<F, P>
where
F: BufferStrategy<F::Aggregation, P> + Statistic,
P: Interpolation<FillSample<F> = F::Sample>,
{
pub fn new(interpolation: P::State<F>, series: TimeSeries<F>) -> Self {
let buffer = F::buffer(&series.interval);
BufferedTimeSeries { buffer, interpolation, series }
}
fn interpolate(&mut self, tick: Tick) -> Result<(), F::Error> {
for aggregation in
self.series.interpolate_and_get_aggregations(&mut self.interpolation, tick)
{
self.buffer.push(aggregation?);
}
Ok(())
}
fn fold(&mut self, tick: Tick, sample: F::Sample) -> Result<(), F::Error> {
for aggregation in
self.series.fold_and_get_aggregations(&mut self.interpolation, tick, sample)
{
self.buffer.push(aggregation?);
}
Ok(())
}
pub fn serialize_and_get_buffer(&self) -> io::Result<SerializedTimeSeries> {
let mut data = vec![];
self.buffer.serialize(&mut data)?;
Ok(SerializedTimeSeries { interval: *self.series.interval(), data })
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct SerializedBuffer {
pub data_semantic: String,
pub data: Vec<u8>,
}
#[derive(Derivative)]
#[derivative(
Clone(bound = "F: Clone, Buffer<F, P>: Clone, P::State<F>: Clone,"),
Debug(bound = "F: Debug,
F::Sample: Debug,
F::Aggregation: Debug,
Buffer<F, P>: Debug,
P::State<F>: Debug,")
)]
pub struct TimeMatrix<F, P>
where
F: BufferStrategy<F::Aggregation, P> + Statistic,
P: Interpolation<FillSample<F> = F::Sample>,
{
created: Timestamp,
last: ObservationTime,
buffers: Vec1<BufferedTimeSeries<F, P>>,
}
impl<F, P> TimeMatrix<F, P>
where
F: BufferStrategy<F::Aggregation, P> + Statistic,
P: Interpolation<FillSample<F> = F::Sample>,
{
fn from_series_with<Q>(series: impl Into<Vec1<TimeSeries<F>>>, mut interpolation: Q) -> Self
where
Q: FnMut() -> P::State<F>,
{
let buffers =
series.into().map_into(|series| BufferedTimeSeries::new((interpolation)(), series));
TimeMatrix { created: Timestamp::now(), last: ObservationTime::default(), buffers }
}
pub fn new(profile: impl Into<SamplingProfile>, interpolation: P::State<F>) -> Self
where
F: Default,
{
let sampling_intervals = profile.into().into_sampling_intervals();
TimeMatrix::from_series_with(sampling_intervals.map_into(TimeSeries::new), || {
interpolation.clone()
})
}
pub fn with_statistic(
profile: impl Into<SamplingProfile>,
interpolation: P::State<F>,
statistic: F,
) -> Self {
let sampling_intervals = profile.into().into_sampling_intervals();
TimeMatrix::from_series_with(
sampling_intervals
.map_into(|window| TimeSeries::with_statistic(window, statistic.clone())),
|| interpolation.clone(),
)
}
pub fn fold_and_get_buffers(
&mut self,
sample: TimedSample<F::Sample>,
) -> Result<SerializedBuffer, FoldError>
where
FoldError: From<F::Error>,
{
self.fold(sample)?;
let series_buffers = self
.buffers
.try_map_ref(BufferedTimeSeries::serialize_and_get_buffer)
.map_err::<FoldError, _>(From::from)?;
self.serialize(series_buffers).map_err(From::from)
}
fn serialize(
&self,
series_buffers: Vec1<SerializedTimeSeries>,
) -> io::Result<SerializedBuffer> {
use crate::experimental::clock::DurationExt;
use byteorder::{LittleEndian, WriteBytesExt};
use std::io::Write;
let created_timestamp = u32::try_from(self.created.quantize()).unwrap_or(u32::MAX);
let end_timestamp =
u32::try_from(self.last.last_update_timestamp.quantize()).unwrap_or(u32::MAX);
let ring_buffer_type = F::buffer_type();
let mut buffer = vec![];
buffer.write_u8(1)?; buffer.write_u32::<LittleEndian>(created_timestamp)?;
buffer.write_u32::<LittleEndian>(end_timestamp)?;
buffer.write_u8(ring_buffer_type.type_descriptor())?;
buffer.write_u8(ring_buffer_type.subtype_descriptor())?;
for series in series_buffers {
const GRANULARITY_FIELD_LEN: usize = 2;
let len = u16::try_from(series.data.len() + GRANULARITY_FIELD_LEN).unwrap_or(u16::MAX);
let granularity =
u16::try_from(series.interval().duration().into_quanta()).unwrap_or(u16::MAX);
buffer.write_u16::<LittleEndian>(len)?;
buffer.write_u16::<LittleEndian>(granularity)?;
buffer.write_all(&series.data[..len as usize - GRANULARITY_FIELD_LEN])?;
}
Ok(SerializedBuffer {
data_semantic: format!("{}", <F as Statistic>::Semantic::display()),
data: buffer,
})
}
}
impl<F, R, P, A> TimeMatrix<PostAggregation<F, R>, P>
where
PostAggregation<F, R>: BufferStrategy<A, P>,
F: Default + Statistic,
R: Clone + Fn(F::Aggregation) -> A,
P: InterpolationFor<PostAggregation<F, R>>,
A: Clone,
{
pub fn with_transform(
profile: impl Into<SamplingProfile>,
interpolation: P::State<PostAggregation<F, R>>,
transform: R,
) -> Self
where
R: Clone,
{
let sampling_intervals = profile.into().into_sampling_intervals();
TimeMatrix::from_series_with(
sampling_intervals
.map_into(|window| TimeSeries::with_transform(window, transform.clone())),
|| interpolation.clone(),
)
}
}
impl<F, P> Default for TimeMatrix<F, P>
where
F: BufferStrategy<F::Aggregation, P> + Default + Statistic,
P: Interpolation<FillSample<F> = F::Sample>,
P::State<F>: Default,
{
fn default() -> Self {
TimeMatrix::new(SamplingProfile::default(), P::State::default())
}
}
impl<F, P> Interpolator for TimeMatrix<F, P>
where
FoldError: From<F::Error>,
F: BufferStrategy<F::Aggregation, P> + Statistic,
P: Interpolation<FillSample<F> = F::Sample>,
{
type Error = FoldError;
fn interpolate(&mut self, timestamp: Timestamp) -> Result<(), Self::Error> {
let tick = self.last.tick(timestamp.into(), false)?;
Ok(for buffer in self.buffers.iter_mut() {
buffer.interpolate(tick)?;
})
}
fn interpolate_and_get_buffers(
&mut self,
timestamp: Timestamp,
) -> Result<SerializedBuffer, Self::Error> {
self.interpolate(timestamp)?;
let series_buffers = self
.buffers
.try_map_ref(BufferedTimeSeries::serialize_and_get_buffer)
.map_err::<FoldError, _>(From::from)?;
self.serialize(series_buffers).map_err(From::from)
}
}
impl<F, P> Sampler<TimedSample<F::Sample>> for TimeMatrix<F, P>
where
FoldError: From<F::Error>,
F: BufferStrategy<F::Aggregation, P> + Statistic,
P: Interpolation<FillSample<F> = F::Sample>,
{
type Error = FoldError;
fn fold(&mut self, timed: TimedSample<F::Sample>) -> Result<(), Self::Error> {
let (timestamp, sample) = timed.into();
let tick = self.last.tick(timestamp, true)?;
Ok(for buffer in self.buffers.iter_mut() {
buffer.fold(tick, sample.clone())?;
})
}
}
impl<F, P> MatrixSampler<F::Sample> for TimeMatrix<F, P>
where
FoldError: From<F::Error>,
F: BufferStrategy<F::Aggregation, P> + Statistic,
P: Interpolation<FillSample<F> = F::Sample>,
{
}
#[cfg(test)]
mod tests {
use fuchsia_async as fasync;
use crate::experimental::clock::{TimedSample, Timestamp};
use crate::experimental::series::interpolation::{Constant, LastAggregation, LastSample};
use crate::experimental::series::statistic::{
ArithmeticMean, LatchMax, Max, PostAggregation, Sum, Transform, Union,
};
use crate::experimental::series::{
Interpolator, MatrixSampler, Sampler, SamplingProfile, TimeMatrix,
};
fn fold_and_interpolate_f32(sampler: &mut impl MatrixSampler<f32>) {
sampler.fold(TimedSample::now(0.0)).unwrap();
sampler.fold(TimedSample::now(1.0)).unwrap();
sampler.fold(TimedSample::now(2.0)).unwrap();
let _buffers = sampler.interpolate(Timestamp::now()).unwrap();
}
#[test]
fn static_test_define_time_matrix() {
type Mean<T> = ArithmeticMean<T>;
type MeanTransform<T, F> = Transform<Mean<T>, F>;
let _exec = fasync::TestExecutor::new_with_fake_time();
let _ = TimeMatrix::<Mean<f32>, Constant>::default();
let _ = TimeMatrix::<Mean<f32>, LastSample>::new(
SamplingProfile::balanced(),
LastSample::or(0.0f32),
);
let _ = TimeMatrix::<_, Constant>::with_statistic(
SamplingProfile::granular(),
Constant::default(),
Mean::<f32>::default(),
);
let mut matrix = TimeMatrix::<MeanTransform<f32, i64>, LastSample>::with_transform(
SamplingProfile::highly_granular(),
LastSample::or(0.0f32),
|aggregation| aggregation.ceil() as i64,
);
fold_and_interpolate_f32(&mut matrix);
let mut matrix = TimeMatrix::<_, Constant>::with_statistic(
SamplingProfile::default(),
Constant::default(),
PostAggregation::<ArithmeticMean<f32>, _>::from_transform(|aggregation: f32| {
aggregation.ceil() as i64
}),
);
fold_and_interpolate_f32(&mut matrix);
}
#[test]
fn static_test_supported_statistic_and_interpolation_combinations() {
let _exec = fasync::TestExecutor::new_with_fake_time();
let _ = TimeMatrix::<ArithmeticMean<f32>, Constant>::default();
let _ = TimeMatrix::<ArithmeticMean<f32>, LastSample>::default();
let _ = TimeMatrix::<ArithmeticMean<f32>, LastAggregation>::default();
let _ = TimeMatrix::<LatchMax<u64>, LastSample>::default();
let _ = TimeMatrix::<LatchMax<u64>, LastAggregation>::default();
let _ = TimeMatrix::<Max<u64>, Constant>::default();
let _ = TimeMatrix::<Max<u64>, LastSample>::default();
let _ = TimeMatrix::<Max<u64>, LastAggregation>::default();
let _ = TimeMatrix::<Sum<u64>, Constant>::default();
let _ = TimeMatrix::<Sum<u64>, LastSample>::default();
let _ = TimeMatrix::<Sum<u64>, LastAggregation>::default();
let _ = TimeMatrix::<Union<u64>, Constant>::default();
let _ = TimeMatrix::<Union<u64>, LastSample>::default();
let _ = TimeMatrix::<Union<u64>, LastAggregation>::default();
}
#[test]
fn time_matrix_with_uncompressed_buffer() {
let exec = fasync::TestExecutor::new_with_fake_time();
exec.set_fake_time(fasync::Time::from_nanos(3_000_000_000));
let mut time_matrix = TimeMatrix::<ArithmeticMean<f32>, Constant>::new(
SamplingProfile::highly_granular(),
Constant::default(),
);
let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
assert_eq!(
buffer.data,
vec![
1, 3, 0, 0, 0, 3, 0, 0, 0, 0, 0, 4, 0, 10, 0, 0, 0, 4, 0, 60, 0, 0, 0, ]
);
time_matrix.fold(TimedSample::now(f32::from_bits(42u32))).unwrap();
exec.set_fake_time(fasync::Time::from_nanos(10_000_000_000));
let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
assert_eq!(
buffer.data,
vec![
1, 3, 0, 0, 0, 10, 0, 0, 0, 0, 0, 8, 0, 10, 0, 1, 0, 42, 0, 0, 0, 4, 0, 60, 0, 0, 0, ]
);
}
#[test]
fn time_matrix_with_simple8b_rle_buffer() {
let exec = fasync::TestExecutor::new_with_fake_time();
exec.set_fake_time(fasync::Time::from_nanos(3_000_000_000));
let mut time_matrix = TimeMatrix::<Max<u64>, Constant>::new(
SamplingProfile::highly_granular(),
Constant::default(),
);
let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
assert_eq!(
buffer.data,
vec![
1, 3, 0, 0, 0, 3, 0, 0, 0, 1, 0, 7, 0, 10, 0, 0, 0, 0, 0, 0, 7, 0, 60, 0, 0, 0, 0, 0, 0, ]
);
time_matrix.fold(TimedSample::now(42)).unwrap();
exec.set_fake_time(fasync::Time::from_nanos(10_000_000_000));
let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
assert_eq!(
buffer.data,
vec![
1, 3, 0, 0, 0, 10, 0, 0, 0, 1, 0, 16, 0, 10, 0, 1, 0, 0, 0, 1, 0x0f, 42, 0, 0, 0, 0, 0, 1, 0, 7, 0, 60, 0, 0, 0, 0, 0, 0, ]
);
}
#[test]
fn time_matrix_with_zigzag_simple8b_rle_buffer() {
let exec = fasync::TestExecutor::new_with_fake_time();
exec.set_fake_time(fasync::Time::from_nanos(3_000_000_000));
let mut time_matrix = TimeMatrix::<Max<i64>, Constant>::new(
SamplingProfile::highly_granular(),
Constant::default(),
);
let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
assert_eq!(
buffer.data,
vec![
1, 3, 0, 0, 0, 3, 0, 0, 0, 1, 1, 7, 0, 10, 0, 0, 0, 0, 0, 0, 7, 0, 60, 0, 0, 0, 0, 0, 0, ]
);
time_matrix.fold(TimedSample::now(-2)).unwrap();
exec.set_fake_time(fasync::Time::from_nanos(10_000_000_000));
let buffer = time_matrix.interpolate_and_get_buffers(Timestamp::now()).unwrap();
assert_eq!(
buffer.data,
vec![
1, 3, 0, 0, 0, 10, 0, 0, 0, 1, 1, 16, 0, 10, 0, 1, 0, 0, 0, 1, 0x0f, 3, 0, 0, 0, 0, 0, 1, 0, 7, 0, 60, 0, 0, 0, 0, 0, 0, ]
);
}
}