windowed_stats/experimental/series/buffer/
mod.rs1mod delta_simple8b_rle;
8mod delta_zigzag_simple8b_rle;
9mod simple8b_rle;
10mod uncompressed;
11mod zigzag_simple8b_rle;
12
13pub mod encoding;
14
15use log::warn;
16use std::fmt::{self, Debug, Display, Formatter};
17use std::io::{self, Write};
18use std::num::NonZeroUsize;
19
20use crate::experimental::series::SamplingInterval;
21use crate::experimental::series::buffer::encoding::Encoding;
22use crate::experimental::series::interpolation::InterpolationKind;
23
24use Capacity::MinSamples;
25
26pub trait BufferStrategy<A, P>
43where
44 P: InterpolationKind,
45{
46 type Buffer: Clone + RingBuffer<A>;
47
48 fn buffer(interval: &SamplingInterval) -> Self::Buffer {
50 Self::Buffer::with_capacity(interval.capacity())
51 }
52}
53
54pub trait RingBuffer<A> {
56 type Encoding: Encoding<A>;
58
59 fn with_capacity(capacity: Capacity) -> Self
60 where
61 Self: Sized;
62
63 fn push(&mut self, item: A);
64
65 fn fill(&mut self, item: A, count: NonZeroUsize);
66
67 fn serialize(&self, write: impl Write) -> io::Result<()>;
68
69 }
80
81#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
86pub enum Capacity {
87 MinSamples(NonZeroUsize),
92}
93
94impl Capacity {
95 pub fn from_min_samples(n: usize) -> Self {
97 MinSamples(NonZeroUsize::new(n).unwrap_or(NonZeroUsize::MIN))
98 }
99}
100
101impl Display for Capacity {
102 fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
103 match self {
104 MinSamples(n) => write!(formatter, "{}", n),
105 }
106 }
107}
108
109#[derive(Clone, Debug)]
110pub struct Uncompressed<A>(uncompressed::UncompressedRingBuffer<A>);
111
112impl RingBuffer<f32> for Uncompressed<f32> {
113 type Encoding = uncompressed::Encoding<f32>;
114
115 fn with_capacity(capacity: Capacity) -> Self {
116 Uncompressed(match capacity {
117 MinSamples(n) => uncompressed::UncompressedRingBuffer::with_min_samples(n.get()),
118 })
119 }
120
121 fn push(&mut self, item: f32) {
122 self.0.push(item);
123 }
124
125 fn fill(&mut self, item: f32, count: NonZeroUsize) {
126 self.0.fill(item, count);
127 }
128
129 fn serialize(&self, mut write: impl Write) -> io::Result<()> {
130 self.0.serialize(&mut write)
131 }
132}
133
134#[derive(Clone, Debug)]
135pub struct Simple8bRle(simple8b_rle::Simple8bRleRingBuffer);
136
137impl<A> RingBuffer<A> for Simple8bRle
138where
139 A: Into<u64>,
140{
141 type Encoding = simple8b_rle::Encoding;
142
143 fn with_capacity(capacity: Capacity) -> Self {
144 Simple8bRle(match capacity {
145 MinSamples(n) => simple8b_rle::Simple8bRleRingBuffer::with_min_samples(n.get()),
146 })
147 }
148
149 fn push(&mut self, item: A) {
150 self.0.push(item.into());
151 }
152
153 fn fill(&mut self, item: A, count: NonZeroUsize) {
154 self.0.fill(item.into(), count);
155 }
156
157 fn serialize(&self, mut write: impl Write) -> io::Result<()> {
158 self.0.serialize(&mut write)
159 }
160}
161
162#[derive(Clone, Debug)]
164pub struct ZigzagSimple8bRle(zigzag_simple8b_rle::ZigzagSimple8bRleRingBuffer);
165
166impl<A> RingBuffer<A> for ZigzagSimple8bRle
167where
168 A: Into<i64>,
169{
170 type Encoding = zigzag_simple8b_rle::Encoding;
171
172 fn with_capacity(capacity: Capacity) -> Self {
173 ZigzagSimple8bRle(match capacity {
174 MinSamples(n) => {
175 zigzag_simple8b_rle::ZigzagSimple8bRleRingBuffer::with_min_samples(n.get())
176 }
177 })
178 }
179
180 fn push(&mut self, item: A) {
181 self.0.push(item.into());
182 }
183
184 fn fill(&mut self, item: A, count: NonZeroUsize) {
185 self.0.fill(item.into(), count);
186 }
187
188 fn serialize(&self, mut write: impl Write) -> io::Result<()> {
189 self.0.serialize(&mut write)
190 }
191}
192
193#[derive(Clone, Debug)]
194pub struct DeltaSimple8bRle(delta_simple8b_rle::DeltaSimple8bRleRingBuffer);
195
196impl<A> RingBuffer<A> for DeltaSimple8bRle
197where
198 A: Into<u64>,
199{
200 type Encoding = delta_simple8b_rle::Encoding;
201
202 fn with_capacity(capacity: Capacity) -> Self {
203 let ring_buffer = match capacity {
204 MinSamples(n) => {
205 delta_simple8b_rle::DeltaSimple8bRleRingBuffer::with_min_samples(n.get())
206 }
207 };
208 DeltaSimple8bRle(ring_buffer)
209 }
210
211 fn push(&mut self, item: A) {
212 if let Err(e) = self.0.push(item.into()) {
213 warn!("DeltaSimple8bRleRingBuffer::push error: {}", e);
214 }
215 }
216
217 fn fill(&mut self, item: A, count: NonZeroUsize) {
218 if let Err(e) = self.0.fill(item.into(), count) {
219 warn!("DeltaSimple8bRleRingBuffer::fill error: {}", e);
220 }
221 }
222
223 fn serialize(&self, mut write: impl Write) -> io::Result<()> {
224 self.0.serialize(&mut write)
225 }
226}
227
228#[derive(Clone, Debug)]
229pub struct DeltaZigzagSimple8bRle<A>(
230 delta_zigzag_simple8b_rle::DeltaZigzagSimple8bRleRingBuffer<A>,
231);
232
233impl RingBuffer<i64> for DeltaZigzagSimple8bRle<i64> {
234 type Encoding = delta_zigzag_simple8b_rle::Encoding<i64>;
235
236 fn with_capacity(capacity: Capacity) -> Self {
237 let ring_buffer = match capacity {
238 MinSamples(n) => {
239 delta_zigzag_simple8b_rle::DeltaZigzagSimple8bRleRingBuffer::with_min_samples(
240 n.get(),
241 )
242 }
243 };
244 DeltaZigzagSimple8bRle(ring_buffer)
245 }
246
247 fn push(&mut self, item: i64) {
248 self.0.push(item)
249 }
250
251 fn fill(&mut self, item: i64, count: NonZeroUsize) {
252 self.0.fill(item, count);
253 }
254
255 fn serialize(&self, mut write: impl Write) -> io::Result<()> {
256 self.0.serialize(&mut write)
257 }
258}
259
260impl RingBuffer<u64> for DeltaZigzagSimple8bRle<u64> {
261 type Encoding = delta_zigzag_simple8b_rle::Encoding<u64>;
262
263 fn with_capacity(capacity: Capacity) -> Self {
264 let ring_buffer = match capacity {
265 MinSamples(n) => {
266 delta_zigzag_simple8b_rle::DeltaZigzagSimple8bRleRingBuffer::with_min_samples(
267 n.get(),
268 )
269 }
270 };
271 DeltaZigzagSimple8bRle(ring_buffer)
272 }
273
274 fn push(&mut self, item: u64) {
275 self.0.push(item)
276 }
277
278 fn fill(&mut self, item: u64, count: NonZeroUsize) {
279 self.0.fill(item, count);
280 }
281
282 fn serialize(&self, mut write: impl Write) -> io::Result<()> {
283 self.0.serialize(&mut write)
284 }
285}
286
287#[cfg(test)]
288mod tests {
289 use super::*;
290
291 #[test]
292 fn uncompressed_buffer() {
293 let mut buffer =
294 <Uncompressed<f32> as RingBuffer<f32>>::with_capacity(Capacity::from_min_samples(2));
295 buffer.push(22f32);
296 let mut data = vec![];
297 let result = RingBuffer::<f32>::serialize(&buffer, &mut data);
298 assert!(result.is_ok());
299 assert!(!data.is_empty());
300 }
301
302 #[test]
303 fn simple8b_rle_buffer() {
304 let mut buffer =
305 <Simple8bRle as RingBuffer<u64>>::with_capacity(Capacity::from_min_samples(2));
306 buffer.push(22u64);
307 let mut data = vec![];
308 let result = RingBuffer::<u64>::serialize(&buffer, &mut data);
309 assert!(result.is_ok());
310 assert!(!data.is_empty());
311 }
312
313 #[test]
314 fn zigzag_simple8b_rle_buffer() {
315 let mut buffer =
316 <ZigzagSimple8bRle as RingBuffer<i64>>::with_capacity(Capacity::from_min_samples(2));
317 buffer.push(22i64);
318 let mut data = vec![];
319 let result = RingBuffer::<i64>::serialize(&buffer, &mut data);
320 assert!(result.is_ok());
321 assert!(!data.is_empty());
322 }
323
324 #[test]
325 fn delta_simple8b_rle_buffer() {
326 let mut buffer =
327 <DeltaSimple8bRle as RingBuffer<u64>>::with_capacity(Capacity::from_min_samples(2));
328 buffer.push(22u64);
329 buffer.push(30u64);
330 let mut data = vec![];
331 let result = RingBuffer::<u64>::serialize(&buffer, &mut data);
332 assert!(result.is_ok());
333 assert!(!data.is_empty());
334 }
335
336 #[test]
337 fn delta_zigzag_simple8b_rle_buffer_i64() {
338 let mut buffer = <DeltaZigzagSimple8bRle<i64> as RingBuffer<i64>>::with_capacity(
339 Capacity::from_min_samples(2),
340 );
341 buffer.push(22i64);
342 buffer.push(-1i64);
343 let mut data = vec![];
344 let result = RingBuffer::<i64>::serialize(&buffer, &mut data);
345 assert!(result.is_ok());
346 assert!(!data.is_empty());
347 }
348
349 #[test]
350 fn delta_zigzag_simple8b_rle_buffer_u64() {
351 let mut buffer = <DeltaZigzagSimple8bRle<u64> as RingBuffer<u64>>::with_capacity(
352 Capacity::from_min_samples(2),
353 );
354 buffer.push(22u64);
355 buffer.push(u64::MAX);
356 let mut data = vec![];
357 let result = RingBuffer::<u64>::serialize(&buffer, &mut data);
358 assert!(result.is_ok());
359 assert!(!data.is_empty());
360 }
361}