windowed_stats/experimental/series/buffer/
mod.rs1mod delta_simple8b_rle;
8mod delta_zigzag_simple8b_rle;
9mod simple8b_rle;
10mod uncompressed;
11mod zigzag_simple8b_rle;
12
13pub mod decode;
14pub mod encoding;
15
16use log::warn;
17use std::fmt::{self, Debug, Display, Formatter};
18use std::io::{self, Write};
19use std::num::NonZeroUsize;
20
21use crate::experimental::series::SamplingInterval;
22use crate::experimental::series::buffer::encoding::Encoding;
23use crate::experimental::series::interpolation::InterpolationKind;
24
25use Capacity::MinSamples;
26
27pub trait BufferStrategy<A, P>
44where
45 P: InterpolationKind,
46{
47 type Buffer: Clone + RingBuffer<A>;
48
49 fn buffer(interval: &SamplingInterval) -> Self::Buffer {
51 Self::Buffer::with_capacity(interval.capacity())
52 }
53}
54
55pub trait RingBuffer<A> {
57 type Encoding: Encoding<A>;
59
60 fn with_capacity(capacity: Capacity) -> Self
61 where
62 Self: Sized;
63
64 fn push(&mut self, item: A);
65
66 fn fill(&mut self, item: A, count: NonZeroUsize);
67
68 fn serialize(&self, write: impl Write) -> io::Result<()>;
69
70 }
81
82#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
87pub enum Capacity {
88 MinSamples(NonZeroUsize),
93}
94
95impl Capacity {
96 pub fn from_min_samples(n: usize) -> Self {
98 MinSamples(NonZeroUsize::new(n).unwrap_or(NonZeroUsize::MIN))
99 }
100}
101
102impl Display for Capacity {
103 fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
104 match self {
105 MinSamples(n) => write!(formatter, "{}", n),
106 }
107 }
108}
109
110#[derive(Clone, Debug)]
111pub struct Uncompressed<A>(uncompressed::UncompressedRingBuffer<A>);
112
113impl RingBuffer<f32> for Uncompressed<f32> {
114 type Encoding = uncompressed::Encoding<f32>;
115
116 fn with_capacity(capacity: Capacity) -> Self {
117 Uncompressed(match capacity {
118 MinSamples(n) => uncompressed::UncompressedRingBuffer::with_min_samples(n.get()),
119 })
120 }
121
122 fn push(&mut self, item: f32) {
123 self.0.push(item);
124 }
125
126 fn fill(&mut self, item: f32, count: NonZeroUsize) {
127 self.0.fill(item, count);
128 }
129
130 fn serialize(&self, mut write: impl Write) -> io::Result<()> {
131 self.0.serialize(&mut write)
132 }
133}
134
135#[derive(Clone, Debug)]
136pub struct Simple8bRle(simple8b_rle::Simple8bRleRingBuffer);
137
138impl<A> RingBuffer<A> for Simple8bRle
139where
140 A: Into<u64>,
141{
142 type Encoding = simple8b_rle::Encoding;
143
144 fn with_capacity(capacity: Capacity) -> Self {
145 Simple8bRle(match capacity {
146 MinSamples(n) => simple8b_rle::Simple8bRleRingBuffer::with_min_samples(n.get()),
147 })
148 }
149
150 fn push(&mut self, item: A) {
151 self.0.push(item.into());
152 }
153
154 fn fill(&mut self, item: A, count: NonZeroUsize) {
155 self.0.fill(item.into(), count);
156 }
157
158 fn serialize(&self, mut write: impl Write) -> io::Result<()> {
159 self.0.serialize(&mut write)
160 }
161}
162
163#[derive(Clone, Debug)]
165pub struct ZigzagSimple8bRle(zigzag_simple8b_rle::ZigzagSimple8bRleRingBuffer);
166
167impl<A> RingBuffer<A> for ZigzagSimple8bRle
168where
169 A: Into<i64>,
170{
171 type Encoding = zigzag_simple8b_rle::Encoding;
172
173 fn with_capacity(capacity: Capacity) -> Self {
174 ZigzagSimple8bRle(match capacity {
175 MinSamples(n) => {
176 zigzag_simple8b_rle::ZigzagSimple8bRleRingBuffer::with_min_samples(n.get())
177 }
178 })
179 }
180
181 fn push(&mut self, item: A) {
182 self.0.push(item.into());
183 }
184
185 fn fill(&mut self, item: A, count: NonZeroUsize) {
186 self.0.fill(item.into(), count);
187 }
188
189 fn serialize(&self, mut write: impl Write) -> io::Result<()> {
190 self.0.serialize(&mut write)
191 }
192}
193
194#[derive(Clone, Debug)]
195pub struct DeltaSimple8bRle(delta_simple8b_rle::DeltaSimple8bRleRingBuffer);
196
197impl<A> RingBuffer<A> for DeltaSimple8bRle
198where
199 A: Into<u64>,
200{
201 type Encoding = delta_simple8b_rle::Encoding;
202
203 fn with_capacity(capacity: Capacity) -> Self {
204 let ring_buffer = match capacity {
205 MinSamples(n) => {
206 delta_simple8b_rle::DeltaSimple8bRleRingBuffer::with_min_samples(n.get())
207 }
208 };
209 DeltaSimple8bRle(ring_buffer)
210 }
211
212 fn push(&mut self, item: A) {
213 if let Err(e) = self.0.push(item.into()) {
214 warn!("DeltaSimple8bRleRingBuffer::push error: {}", e);
215 }
216 }
217
218 fn fill(&mut self, item: A, count: NonZeroUsize) {
219 if let Err(e) = self.0.fill(item.into(), count) {
220 warn!("DeltaSimple8bRleRingBuffer::fill error: {}", e);
221 }
222 }
223
224 fn serialize(&self, mut write: impl Write) -> io::Result<()> {
225 self.0.serialize(&mut write)
226 }
227}
228
229#[derive(Clone, Debug)]
230pub struct DeltaZigzagSimple8bRle<A>(
231 delta_zigzag_simple8b_rle::DeltaZigzagSimple8bRleRingBuffer<A>,
232);
233
234impl RingBuffer<i64> for DeltaZigzagSimple8bRle<i64> {
235 type Encoding = delta_zigzag_simple8b_rle::Encoding<i64>;
236
237 fn with_capacity(capacity: Capacity) -> Self {
238 let ring_buffer = match capacity {
239 MinSamples(n) => {
240 delta_zigzag_simple8b_rle::DeltaZigzagSimple8bRleRingBuffer::with_min_samples(
241 n.get(),
242 )
243 }
244 };
245 DeltaZigzagSimple8bRle(ring_buffer)
246 }
247
248 fn push(&mut self, item: i64) {
249 self.0.push(item)
250 }
251
252 fn fill(&mut self, item: i64, count: NonZeroUsize) {
253 self.0.fill(item, count);
254 }
255
256 fn serialize(&self, mut write: impl Write) -> io::Result<()> {
257 self.0.serialize(&mut write)
258 }
259}
260
261impl RingBuffer<u64> for DeltaZigzagSimple8bRle<u64> {
262 type Encoding = delta_zigzag_simple8b_rle::Encoding<u64>;
263
264 fn with_capacity(capacity: Capacity) -> Self {
265 let ring_buffer = match capacity {
266 MinSamples(n) => {
267 delta_zigzag_simple8b_rle::DeltaZigzagSimple8bRleRingBuffer::with_min_samples(
268 n.get(),
269 )
270 }
271 };
272 DeltaZigzagSimple8bRle(ring_buffer)
273 }
274
275 fn push(&mut self, item: u64) {
276 self.0.push(item)
277 }
278
279 fn fill(&mut self, item: u64, count: NonZeroUsize) {
280 self.0.fill(item, count);
281 }
282
283 fn serialize(&self, mut write: impl Write) -> io::Result<()> {
284 self.0.serialize(&mut write)
285 }
286}
287
288#[cfg(test)]
289mod tests {
290 use super::*;
291
292 #[test]
293 fn uncompressed_buffer() {
294 let mut buffer =
295 <Uncompressed<f32> as RingBuffer<f32>>::with_capacity(Capacity::from_min_samples(2));
296 buffer.push(22f32);
297 let mut data = vec![];
298 let result = RingBuffer::<f32>::serialize(&buffer, &mut data);
299 assert!(result.is_ok());
300 assert!(!data.is_empty());
301 }
302
303 #[test]
304 fn simple8b_rle_buffer() {
305 let mut buffer =
306 <Simple8bRle as RingBuffer<u64>>::with_capacity(Capacity::from_min_samples(2));
307 buffer.push(22u64);
308 let mut data = vec![];
309 let result = RingBuffer::<u64>::serialize(&buffer, &mut data);
310 assert!(result.is_ok());
311 assert!(!data.is_empty());
312 }
313
314 #[test]
315 fn zigzag_simple8b_rle_buffer() {
316 let mut buffer =
317 <ZigzagSimple8bRle as RingBuffer<i64>>::with_capacity(Capacity::from_min_samples(2));
318 buffer.push(22i64);
319 let mut data = vec![];
320 let result = RingBuffer::<i64>::serialize(&buffer, &mut data);
321 assert!(result.is_ok());
322 assert!(!data.is_empty());
323 }
324
325 #[test]
326 fn delta_simple8b_rle_buffer() {
327 let mut buffer =
328 <DeltaSimple8bRle as RingBuffer<u64>>::with_capacity(Capacity::from_min_samples(2));
329 buffer.push(22u64);
330 buffer.push(30u64);
331 let mut data = vec![];
332 let result = RingBuffer::<u64>::serialize(&buffer, &mut data);
333 assert!(result.is_ok());
334 assert!(!data.is_empty());
335 }
336
337 #[test]
338 fn delta_zigzag_simple8b_rle_buffer_i64() {
339 let mut buffer = <DeltaZigzagSimple8bRle<i64> as RingBuffer<i64>>::with_capacity(
340 Capacity::from_min_samples(2),
341 );
342 buffer.push(22i64);
343 buffer.push(-1i64);
344 let mut data = vec![];
345 let result = RingBuffer::<i64>::serialize(&buffer, &mut data);
346 assert!(result.is_ok());
347 assert!(!data.is_empty());
348 }
349
350 #[test]
351 fn delta_zigzag_simple8b_rle_buffer_u64() {
352 let mut buffer = <DeltaZigzagSimple8bRle<u64> as RingBuffer<u64>>::with_capacity(
353 Capacity::from_min_samples(2),
354 );
355 buffer.push(22u64);
356 buffer.push(u64::MAX);
357 let mut data = vec![];
358 let result = RingBuffer::<u64>::serialize(&buffer, &mut data);
359 assert!(result.is_ok());
360 assert!(!data.is_empty());
361 }
362}