windowed_stats/experimental/series/buffer/
mod.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Ring buffers and compression.
6
7mod delta_simple8b_rle;
8mod delta_zigzag_simple8b_rle;
9mod simple8b_rle;
10mod uncompressed;
11mod zigzag_simple8b_rle;
12
13pub mod decode;
14pub mod encoding;
15
16use log::warn;
17use std::fmt::{self, Debug, Display, Formatter};
18use std::io::{self, Write};
19use std::num::NonZeroUsize;
20
21use crate::experimental::series::SamplingInterval;
22use crate::experimental::series::buffer::encoding::Encoding;
23use crate::experimental::series::interpolation::InterpolationKind;
24
25use Capacity::MinSamples;
26
27// The choice of ring buffer is determined by three types associated with a time matrix:
28//
29//   1. The data semantic.
30//   2. The aggregation.
31//   3. The interpolation.
32//
33// Because of this, this trait is implemented by `DataSemantic` types like `Gauge` for various
34// aggregation and interpolation input types. For each supported triple, there is an associated
35// ring buffer. In type bounds, this is typically expressed via the `SerialStatistic` trait, which
36// is implemented for any and all `Statistic` types with an associated data semantic type that
37// implements `BufferStrategy` for its associated aggregation type. See the blanket implementation
38// for `SerialStatistic`.
39/// A type that can construct a [`RingBuffer`] associated with an aggregation type and
40/// interpolation.
41///
42/// [`RingBuffer`]: crate::experimental::series::buffer::RingBuffer
43pub trait BufferStrategy<A, P>
44where
45    P: InterpolationKind,
46{
47    type Buffer: Clone + RingBuffer<A>;
48
49    /// Constructs a ring buffer with the given fixed capacity.
50    fn buffer(interval: &SamplingInterval) -> Self::Buffer {
51        Self::Buffer::with_capacity(interval.capacity())
52    }
53}
54
55/// A fixed-capacity circular ring buffer.
56pub trait RingBuffer<A> {
57    /// The compression and payload of the buffer.
58    type Encoding: Encoding<A>;
59
60    fn with_capacity(capacity: Capacity) -> Self
61    where
62        Self: Sized;
63
64    fn push(&mut self, item: A);
65
66    fn fill(&mut self, item: A, count: NonZeroUsize);
67
68    fn serialize(&self, write: impl Write) -> io::Result<()>;
69
70    // TODO(https://fxbug.dev/369886210): Implement a durability query. This is the duration of the
71    //                                    sampling interval (`SamplingInterval::duration`)
72    //                                    multiplied by the (approximate) number of samples for
73    //                                    which the buffer has capacity.
74    //
75    // /// Gets the approximate durability of the buffer.
76    // ///
77    // /// Durability is the maximum period of time represented by the aggregations of a sampling
78    // /// interval. This is the time period for which it represents historical data.
79    // fn durability(&self) -> Duration;
80}
81
82/// Capacity bounds of a [`RingBuffer`].
83///
84/// Because buffers may be compressed, capacity is expressed in terms of bounds rather than exact
85/// quantities.
86#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
87pub enum Capacity {
88    /// A lower bound on the number of samples that a buffer stores in memory.
89    ///
90    /// Given a bound `n`, a buffer allocates enough memory for at least `n` samples, but may store
91    /// more.
92    MinSamples(NonZeroUsize),
93}
94
95impl Capacity {
96    /// Constructs a minimum samples capacity, clamping to `[1, usize::MAX]`.
97    pub fn from_min_samples(n: usize) -> Self {
98        MinSamples(NonZeroUsize::new(n).unwrap_or(NonZeroUsize::MIN))
99    }
100}
101
102impl Display for Capacity {
103    fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
104        match self {
105            MinSamples(n) => write!(formatter, "{}", n),
106        }
107    }
108}
109
110#[derive(Clone, Debug)]
111pub struct Uncompressed<A>(uncompressed::UncompressedRingBuffer<A>);
112
113impl RingBuffer<f32> for Uncompressed<f32> {
114    type Encoding = uncompressed::Encoding<f32>;
115
116    fn with_capacity(capacity: Capacity) -> Self {
117        Uncompressed(match capacity {
118            MinSamples(n) => uncompressed::UncompressedRingBuffer::with_min_samples(n.get()),
119        })
120    }
121
122    fn push(&mut self, item: f32) {
123        self.0.push(item);
124    }
125
126    fn fill(&mut self, item: f32, count: NonZeroUsize) {
127        self.0.fill(item, count);
128    }
129
130    fn serialize(&self, mut write: impl Write) -> io::Result<()> {
131        self.0.serialize(&mut write)
132    }
133}
134
135#[derive(Clone, Debug)]
136pub struct Simple8bRle(simple8b_rle::Simple8bRleRingBuffer);
137
138impl<A> RingBuffer<A> for Simple8bRle
139where
140    A: Into<u64>,
141{
142    type Encoding = simple8b_rle::Encoding;
143
144    fn with_capacity(capacity: Capacity) -> Self {
145        Simple8bRle(match capacity {
146            MinSamples(n) => simple8b_rle::Simple8bRleRingBuffer::with_min_samples(n.get()),
147        })
148    }
149
150    fn push(&mut self, item: A) {
151        self.0.push(item.into());
152    }
153
154    fn fill(&mut self, item: A, count: NonZeroUsize) {
155        self.0.fill(item.into(), count);
156    }
157
158    fn serialize(&self, mut write: impl Write) -> io::Result<()> {
159        self.0.serialize(&mut write)
160    }
161}
162
163/// A ring buffer that encodes signed integer items using Zigzag, Simple8B, and RLE compression.
164#[derive(Clone, Debug)]
165pub struct ZigzagSimple8bRle(zigzag_simple8b_rle::ZigzagSimple8bRleRingBuffer);
166
167impl<A> RingBuffer<A> for ZigzagSimple8bRle
168where
169    A: Into<i64>,
170{
171    type Encoding = zigzag_simple8b_rle::Encoding;
172
173    fn with_capacity(capacity: Capacity) -> Self {
174        ZigzagSimple8bRle(match capacity {
175            MinSamples(n) => {
176                zigzag_simple8b_rle::ZigzagSimple8bRleRingBuffer::with_min_samples(n.get())
177            }
178        })
179    }
180
181    fn push(&mut self, item: A) {
182        self.0.push(item.into());
183    }
184
185    fn fill(&mut self, item: A, count: NonZeroUsize) {
186        self.0.fill(item.into(), count);
187    }
188
189    fn serialize(&self, mut write: impl Write) -> io::Result<()> {
190        self.0.serialize(&mut write)
191    }
192}
193
194#[derive(Clone, Debug)]
195pub struct DeltaSimple8bRle(delta_simple8b_rle::DeltaSimple8bRleRingBuffer);
196
197impl<A> RingBuffer<A> for DeltaSimple8bRle
198where
199    A: Into<u64>,
200{
201    type Encoding = delta_simple8b_rle::Encoding;
202
203    fn with_capacity(capacity: Capacity) -> Self {
204        let ring_buffer = match capacity {
205            MinSamples(n) => {
206                delta_simple8b_rle::DeltaSimple8bRleRingBuffer::with_min_samples(n.get())
207            }
208        };
209        DeltaSimple8bRle(ring_buffer)
210    }
211
212    fn push(&mut self, item: A) {
213        if let Err(e) = self.0.push(item.into()) {
214            warn!("DeltaSimple8bRleRingBuffer::push error: {}", e);
215        }
216    }
217
218    fn fill(&mut self, item: A, count: NonZeroUsize) {
219        if let Err(e) = self.0.fill(item.into(), count) {
220            warn!("DeltaSimple8bRleRingBuffer::fill error: {}", e);
221        }
222    }
223
224    fn serialize(&self, mut write: impl Write) -> io::Result<()> {
225        self.0.serialize(&mut write)
226    }
227}
228
229#[derive(Clone, Debug)]
230pub struct DeltaZigzagSimple8bRle<A>(
231    delta_zigzag_simple8b_rle::DeltaZigzagSimple8bRleRingBuffer<A>,
232);
233
234impl RingBuffer<i64> for DeltaZigzagSimple8bRle<i64> {
235    type Encoding = delta_zigzag_simple8b_rle::Encoding<i64>;
236
237    fn with_capacity(capacity: Capacity) -> Self {
238        let ring_buffer = match capacity {
239            MinSamples(n) => {
240                delta_zigzag_simple8b_rle::DeltaZigzagSimple8bRleRingBuffer::with_min_samples(
241                    n.get(),
242                )
243            }
244        };
245        DeltaZigzagSimple8bRle(ring_buffer)
246    }
247
248    fn push(&mut self, item: i64) {
249        self.0.push(item)
250    }
251
252    fn fill(&mut self, item: i64, count: NonZeroUsize) {
253        self.0.fill(item, count);
254    }
255
256    fn serialize(&self, mut write: impl Write) -> io::Result<()> {
257        self.0.serialize(&mut write)
258    }
259}
260
261impl RingBuffer<u64> for DeltaZigzagSimple8bRle<u64> {
262    type Encoding = delta_zigzag_simple8b_rle::Encoding<u64>;
263
264    fn with_capacity(capacity: Capacity) -> Self {
265        let ring_buffer = match capacity {
266            MinSamples(n) => {
267                delta_zigzag_simple8b_rle::DeltaZigzagSimple8bRleRingBuffer::with_min_samples(
268                    n.get(),
269                )
270            }
271        };
272        DeltaZigzagSimple8bRle(ring_buffer)
273    }
274
275    fn push(&mut self, item: u64) {
276        self.0.push(item)
277    }
278
279    fn fill(&mut self, item: u64, count: NonZeroUsize) {
280        self.0.fill(item, count);
281    }
282
283    fn serialize(&self, mut write: impl Write) -> io::Result<()> {
284        self.0.serialize(&mut write)
285    }
286}
287
288#[cfg(test)]
289mod tests {
290    use super::*;
291
292    #[test]
293    fn uncompressed_buffer() {
294        let mut buffer =
295            <Uncompressed<f32> as RingBuffer<f32>>::with_capacity(Capacity::from_min_samples(2));
296        buffer.push(22f32);
297        let mut data = vec![];
298        let result = RingBuffer::<f32>::serialize(&buffer, &mut data);
299        assert!(result.is_ok());
300        assert!(!data.is_empty());
301    }
302
303    #[test]
304    fn simple8b_rle_buffer() {
305        let mut buffer =
306            <Simple8bRle as RingBuffer<u64>>::with_capacity(Capacity::from_min_samples(2));
307        buffer.push(22u64);
308        let mut data = vec![];
309        let result = RingBuffer::<u64>::serialize(&buffer, &mut data);
310        assert!(result.is_ok());
311        assert!(!data.is_empty());
312    }
313
314    #[test]
315    fn zigzag_simple8b_rle_buffer() {
316        let mut buffer =
317            <ZigzagSimple8bRle as RingBuffer<i64>>::with_capacity(Capacity::from_min_samples(2));
318        buffer.push(22i64);
319        let mut data = vec![];
320        let result = RingBuffer::<i64>::serialize(&buffer, &mut data);
321        assert!(result.is_ok());
322        assert!(!data.is_empty());
323    }
324
325    #[test]
326    fn delta_simple8b_rle_buffer() {
327        let mut buffer =
328            <DeltaSimple8bRle as RingBuffer<u64>>::with_capacity(Capacity::from_min_samples(2));
329        buffer.push(22u64);
330        buffer.push(30u64);
331        let mut data = vec![];
332        let result = RingBuffer::<u64>::serialize(&buffer, &mut data);
333        assert!(result.is_ok());
334        assert!(!data.is_empty());
335    }
336
337    #[test]
338    fn delta_zigzag_simple8b_rle_buffer_i64() {
339        let mut buffer = <DeltaZigzagSimple8bRle<i64> as RingBuffer<i64>>::with_capacity(
340            Capacity::from_min_samples(2),
341        );
342        buffer.push(22i64);
343        buffer.push(-1i64);
344        let mut data = vec![];
345        let result = RingBuffer::<i64>::serialize(&buffer, &mut data);
346        assert!(result.is_ok());
347        assert!(!data.is_empty());
348    }
349
350    #[test]
351    fn delta_zigzag_simple8b_rle_buffer_u64() {
352        let mut buffer = <DeltaZigzagSimple8bRle<u64> as RingBuffer<u64>>::with_capacity(
353            Capacity::from_min_samples(2),
354        );
355        buffer.push(22u64);
356        buffer.push(u64::MAX);
357        let mut data = vec![];
358        let result = RingBuffer::<u64>::serialize(&buffer, &mut data);
359        assert!(result.is_ok());
360        assert!(!data.is_empty());
361    }
362}