windowed_stats/experimental/event/
builder.rs1use std::marker::PhantomData;
6
7use crate::experimental::clock::Timed;
8use crate::experimental::event::Event;
9use crate::experimental::event::reactor::{Context, Reactor};
10use crate::experimental::series::interpolation::InterpolationKind;
11use crate::experimental::series::statistic::{FoldError, Metadata, SerialStatistic, Statistic};
12use crate::experimental::series::{MatrixSampler, SamplingProfile, TimeMatrix};
13use crate::experimental::serve::{InspectSender, InspectedTimeMatrix, TimeMatrixClient};
14
15pub trait Optional {
17 type Field;
18}
19
20#[derive(Clone, Copy, Debug, Default)]
22pub struct Set<T>(PhantomData<fn() -> T>);
23
24impl<T> Optional for Set<T> {
25 type Field = T;
26}
27
28#[derive(Clone, Copy, Debug, Default)]
30pub struct Unset;
31
32impl Optional for Unset {
33 type Field = ();
34}
35
36#[derive(Clone, Copy, Debug)]
49pub struct SampleDataRecord<F, S = (), M = Unset>
50where
51 M: Optional,
52{
53 statistic: F,
54 metadata: M::Field,
55 phantom: PhantomData<fn() -> S>,
56}
57
58impl<F, S, M> SampleDataRecord<F, S, M>
59where
60 M: Optional,
61{
62 fn reactor<T>(
63 matrix: InspectedTimeMatrix<T>,
64 ) -> impl Reactor<T, S, Response = (), Error = FoldError>
65 where
66 T: Clone,
67 {
68 move |event: Timed<Event<T>>, _: Context<'_, S>| {
69 if let Some(sample) = event.to_timed_sample() { matrix.fold(sample) } else { Ok(()) }
70 }
71 }
72}
73
74impl<F, S> SampleDataRecord<F, S, Set<Metadata<F>>>
75where
76 F: Statistic,
77{
78 pub fn in_time_matrix<P>(
79 self,
80 client: &TimeMatrixClient,
81 name: impl AsRef<str>,
82 profile: SamplingProfile,
83 interpolation: P::Output<F::Sample>,
84 ) -> impl Reactor<F::Sample, S, Response = (), Error = FoldError>
85 where
86 TimeMatrix<F, P>: 'static + MatrixSampler<F::Sample> + Send,
87 Metadata<F>: 'static + Send + Sync,
88 F: SerialStatistic<P>,
89 F::Sample: Send,
90 P: InterpolationKind,
91 {
92 let SampleDataRecord { statistic, metadata, .. } = self;
93 let matrix = client.inspect_time_matrix_with_metadata(
94 name.as_ref(),
95 TimeMatrix::with_statistic(profile, interpolation, statistic),
96 metadata,
97 );
98 Self::reactor(matrix)
99 }
100}
101
102impl<F, S> SampleDataRecord<F, S, Unset>
103where
104 F: Statistic,
105{
106 pub fn with_metadata(
120 self,
121 metadata: impl Into<Metadata<F>>,
122 ) -> SampleDataRecord<F, S, Set<Metadata<F>>> {
123 let SampleDataRecord { statistic, .. } = self;
124 SampleDataRecord { statistic, metadata: metadata.into(), phantom: PhantomData }
125 }
126
127 pub fn in_time_matrix<P>(
128 self,
129 client: &TimeMatrixClient,
130 name: impl AsRef<str>,
131 profile: SamplingProfile,
132 interpolation: P::Output<F::Sample>,
133 ) -> impl Reactor<F::Sample, S, Response = (), Error = FoldError>
134 where
135 TimeMatrix<F, P>: 'static + MatrixSampler<F::Sample> + Send,
136 Metadata<F>: 'static + Send + Sync,
137 F: SerialStatistic<P>,
138 F::Sample: Send,
139 P: InterpolationKind,
140 {
141 let SampleDataRecord { statistic, .. } = self;
142 let matrix = client.inspect_time_matrix(
143 name.as_ref(),
144 TimeMatrix::with_statistic(profile, interpolation, statistic),
145 );
146 Self::reactor(matrix)
147 }
148}
149
150pub fn sample_data_record<S, F>(statistic: F) -> SampleDataRecord<F, S, Unset>
158where
159 F: Statistic,
160{
161 SampleDataRecord { statistic, metadata: (), phantom: PhantomData }
162}