windowed_stats/experimental/series/buffer/
mod.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Ring buffers and compression.
6
7mod delta_simple8b_rle;
8mod delta_zigzag_simple8b_rle;
9mod simple8b_rle;
10mod uncompressed;
11mod zigzag_simple8b_rle;
12
13pub mod encoding;
14
15use log::warn;
16use std::fmt::{self, Debug, Display, Formatter};
17use std::io::{self, Write};
18use std::num::NonZeroUsize;
19
20use crate::experimental::series::SamplingInterval;
21use crate::experimental::series::buffer::encoding::Encoding;
22use crate::experimental::series::interpolation::InterpolationKind;
23
24use Capacity::MinSamples;
25
26// The choice of ring buffer is determined by three types associated with a time matrix:
27//
28//   1. The data semantic.
29//   2. The aggregation.
30//   3. The interpolation.
31//
32// Because of this, this trait is implemented by `DataSemantic` types like `Gauge` for various
33// aggregation and interpolation input types. For each supported triple, there is an associated
34// ring buffer. In type bounds, this is typically expressed via the `SerialStatistic` trait, which
35// is implemented for any and all `Statistic` types with an associated data semantic type that
36// implements `BufferStrategy` for its associated aggregation type. See the blanket implementation
37// for `SerialStatistic`.
38/// A type that can construct a [`RingBuffer`] associated with an aggregation type and
39/// interpolation.
40///
41/// [`RingBuffer`]: crate::experimental::series::buffer::RingBuffer
42pub trait BufferStrategy<A, P>
43where
44    P: InterpolationKind,
45{
46    type Buffer: Clone + RingBuffer<A>;
47
48    /// Constructs a ring buffer with the given fixed capacity.
49    fn buffer(interval: &SamplingInterval) -> Self::Buffer {
50        Self::Buffer::with_capacity(interval.capacity())
51    }
52}
53
54/// A fixed-capacity circular ring buffer.
55pub trait RingBuffer<A> {
56    /// The compression and payload of the buffer.
57    type Encoding: Encoding<A>;
58
59    fn with_capacity(capacity: Capacity) -> Self
60    where
61        Self: Sized;
62
63    fn push(&mut self, item: A);
64
65    fn fill(&mut self, item: A, count: NonZeroUsize);
66
67    fn serialize(&self, write: impl Write) -> io::Result<()>;
68
69    // TODO(https://fxbug.dev/369886210): Implement a durability query. This is the duration of the
70    //                                    sampling interval (`SamplingInterval::duration`)
71    //                                    multiplied by the (approximate) number of samples for
72    //                                    which the buffer has capacity.
73    //
74    // /// Gets the approximate durability of the buffer.
75    // ///
76    // /// Durability is the maximum period of time represented by the aggregations of a sampling
77    // /// interval. This is the time period for which it represents historical data.
78    // fn durability(&self) -> Duration;
79}
80
81/// Capacity bounds of a [`RingBuffer`].
82///
83/// Because buffers may be compressed, capacity is expressed in terms of bounds rather than exact
84/// quantities.
85#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
86pub enum Capacity {
87    /// A lower bound on the number of samples that a buffer stores in memory.
88    ///
89    /// Given a bound `n`, a buffer allocates enough memory for at least `n` samples, but may store
90    /// more.
91    MinSamples(NonZeroUsize),
92}
93
94impl Capacity {
95    /// Constructs a minimum samples capacity, clamping to `[1, usize::MAX]`.
96    pub fn from_min_samples(n: usize) -> Self {
97        MinSamples(NonZeroUsize::new(n).unwrap_or(NonZeroUsize::MIN))
98    }
99}
100
101impl Display for Capacity {
102    fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
103        match self {
104            MinSamples(n) => write!(formatter, "{}", n),
105        }
106    }
107}
108
109#[derive(Clone, Debug)]
110pub struct Uncompressed<A>(uncompressed::UncompressedRingBuffer<A>);
111
112impl RingBuffer<f32> for Uncompressed<f32> {
113    type Encoding = uncompressed::Encoding<f32>;
114
115    fn with_capacity(capacity: Capacity) -> Self {
116        Uncompressed(match capacity {
117            MinSamples(n) => uncompressed::UncompressedRingBuffer::with_min_samples(n.get()),
118        })
119    }
120
121    fn push(&mut self, item: f32) {
122        self.0.push(item);
123    }
124
125    fn fill(&mut self, item: f32, count: NonZeroUsize) {
126        self.0.fill(item, count);
127    }
128
129    fn serialize(&self, mut write: impl Write) -> io::Result<()> {
130        self.0.serialize(&mut write)
131    }
132}
133
134#[derive(Clone, Debug)]
135pub struct Simple8bRle(simple8b_rle::Simple8bRleRingBuffer);
136
137impl<A> RingBuffer<A> for Simple8bRle
138where
139    A: Into<u64>,
140{
141    type Encoding = simple8b_rle::Encoding;
142
143    fn with_capacity(capacity: Capacity) -> Self {
144        Simple8bRle(match capacity {
145            MinSamples(n) => simple8b_rle::Simple8bRleRingBuffer::with_min_samples(n.get()),
146        })
147    }
148
149    fn push(&mut self, item: A) {
150        self.0.push(item.into());
151    }
152
153    fn fill(&mut self, item: A, count: NonZeroUsize) {
154        self.0.fill(item.into(), count);
155    }
156
157    fn serialize(&self, mut write: impl Write) -> io::Result<()> {
158        self.0.serialize(&mut write)
159    }
160}
161
162/// A ring buffer that encodes signed integer items using Zigzag, Simple8B, and RLE compression.
163#[derive(Clone, Debug)]
164pub struct ZigzagSimple8bRle(zigzag_simple8b_rle::ZigzagSimple8bRleRingBuffer);
165
166impl<A> RingBuffer<A> for ZigzagSimple8bRle
167where
168    A: Into<i64>,
169{
170    type Encoding = zigzag_simple8b_rle::Encoding;
171
172    fn with_capacity(capacity: Capacity) -> Self {
173        ZigzagSimple8bRle(match capacity {
174            MinSamples(n) => {
175                zigzag_simple8b_rle::ZigzagSimple8bRleRingBuffer::with_min_samples(n.get())
176            }
177        })
178    }
179
180    fn push(&mut self, item: A) {
181        self.0.push(item.into());
182    }
183
184    fn fill(&mut self, item: A, count: NonZeroUsize) {
185        self.0.fill(item.into(), count);
186    }
187
188    fn serialize(&self, mut write: impl Write) -> io::Result<()> {
189        self.0.serialize(&mut write)
190    }
191}
192
193#[derive(Clone, Debug)]
194pub struct DeltaSimple8bRle(delta_simple8b_rle::DeltaSimple8bRleRingBuffer);
195
196impl<A> RingBuffer<A> for DeltaSimple8bRle
197where
198    A: Into<u64>,
199{
200    type Encoding = delta_simple8b_rle::Encoding;
201
202    fn with_capacity(capacity: Capacity) -> Self {
203        let ring_buffer = match capacity {
204            MinSamples(n) => {
205                delta_simple8b_rle::DeltaSimple8bRleRingBuffer::with_min_samples(n.get())
206            }
207        };
208        DeltaSimple8bRle(ring_buffer)
209    }
210
211    fn push(&mut self, item: A) {
212        if let Err(e) = self.0.push(item.into()) {
213            warn!("DeltaSimple8bRleRingBuffer::push error: {}", e);
214        }
215    }
216
217    fn fill(&mut self, item: A, count: NonZeroUsize) {
218        if let Err(e) = self.0.fill(item.into(), count) {
219            warn!("DeltaSimple8bRleRingBuffer::fill error: {}", e);
220        }
221    }
222
223    fn serialize(&self, mut write: impl Write) -> io::Result<()> {
224        self.0.serialize(&mut write)
225    }
226}
227
228#[derive(Clone, Debug)]
229pub struct DeltaZigzagSimple8bRle<A>(
230    delta_zigzag_simple8b_rle::DeltaZigzagSimple8bRleRingBuffer<A>,
231);
232
233impl RingBuffer<i64> for DeltaZigzagSimple8bRle<i64> {
234    type Encoding = delta_zigzag_simple8b_rle::Encoding<i64>;
235
236    fn with_capacity(capacity: Capacity) -> Self {
237        let ring_buffer = match capacity {
238            MinSamples(n) => {
239                delta_zigzag_simple8b_rle::DeltaZigzagSimple8bRleRingBuffer::with_min_samples(
240                    n.get(),
241                )
242            }
243        };
244        DeltaZigzagSimple8bRle(ring_buffer)
245    }
246
247    fn push(&mut self, item: i64) {
248        self.0.push(item)
249    }
250
251    fn fill(&mut self, item: i64, count: NonZeroUsize) {
252        self.0.fill(item, count);
253    }
254
255    fn serialize(&self, mut write: impl Write) -> io::Result<()> {
256        self.0.serialize(&mut write)
257    }
258}
259
260impl RingBuffer<u64> for DeltaZigzagSimple8bRle<u64> {
261    type Encoding = delta_zigzag_simple8b_rle::Encoding<u64>;
262
263    fn with_capacity(capacity: Capacity) -> Self {
264        let ring_buffer = match capacity {
265            MinSamples(n) => {
266                delta_zigzag_simple8b_rle::DeltaZigzagSimple8bRleRingBuffer::with_min_samples(
267                    n.get(),
268                )
269            }
270        };
271        DeltaZigzagSimple8bRle(ring_buffer)
272    }
273
274    fn push(&mut self, item: u64) {
275        self.0.push(item)
276    }
277
278    fn fill(&mut self, item: u64, count: NonZeroUsize) {
279        self.0.fill(item, count);
280    }
281
282    fn serialize(&self, mut write: impl Write) -> io::Result<()> {
283        self.0.serialize(&mut write)
284    }
285}
286
287#[cfg(test)]
288mod tests {
289    use super::*;
290
291    #[test]
292    fn uncompressed_buffer() {
293        let mut buffer =
294            <Uncompressed<f32> as RingBuffer<f32>>::with_capacity(Capacity::from_min_samples(2));
295        buffer.push(22f32);
296        let mut data = vec![];
297        let result = RingBuffer::<f32>::serialize(&buffer, &mut data);
298        assert!(result.is_ok());
299        assert!(!data.is_empty());
300    }
301
302    #[test]
303    fn simple8b_rle_buffer() {
304        let mut buffer =
305            <Simple8bRle as RingBuffer<u64>>::with_capacity(Capacity::from_min_samples(2));
306        buffer.push(22u64);
307        let mut data = vec![];
308        let result = RingBuffer::<u64>::serialize(&buffer, &mut data);
309        assert!(result.is_ok());
310        assert!(!data.is_empty());
311    }
312
313    #[test]
314    fn zigzag_simple8b_rle_buffer() {
315        let mut buffer =
316            <ZigzagSimple8bRle as RingBuffer<i64>>::with_capacity(Capacity::from_min_samples(2));
317        buffer.push(22i64);
318        let mut data = vec![];
319        let result = RingBuffer::<i64>::serialize(&buffer, &mut data);
320        assert!(result.is_ok());
321        assert!(!data.is_empty());
322    }
323
324    #[test]
325    fn delta_simple8b_rle_buffer() {
326        let mut buffer =
327            <DeltaSimple8bRle as RingBuffer<u64>>::with_capacity(Capacity::from_min_samples(2));
328        buffer.push(22u64);
329        buffer.push(30u64);
330        let mut data = vec![];
331        let result = RingBuffer::<u64>::serialize(&buffer, &mut data);
332        assert!(result.is_ok());
333        assert!(!data.is_empty());
334    }
335
336    #[test]
337    fn delta_zigzag_simple8b_rle_buffer_i64() {
338        let mut buffer = <DeltaZigzagSimple8bRle<i64> as RingBuffer<i64>>::with_capacity(
339            Capacity::from_min_samples(2),
340        );
341        buffer.push(22i64);
342        buffer.push(-1i64);
343        let mut data = vec![];
344        let result = RingBuffer::<i64>::serialize(&buffer, &mut data);
345        assert!(result.is_ok());
346        assert!(!data.is_empty());
347    }
348
349    #[test]
350    fn delta_zigzag_simple8b_rle_buffer_u64() {
351        let mut buffer = <DeltaZigzagSimple8bRle<u64> as RingBuffer<u64>>::with_capacity(
352            Capacity::from_min_samples(2),
353        );
354        buffer.push(22u64);
355        buffer.push(u64::MAX);
356        let mut data = vec![];
357        let result = RingBuffer::<u64>::serialize(&buffer, &mut data);
358        assert!(result.is_ok());
359        assert!(!data.is_empty());
360    }
361}