1use std::convert::Infallible;
6use std::iter;
7use std::marker::PhantomData;
8use std::num::NonZeroU8;
9
10use byteorder::{LittleEndian, ReadBytesExt as _};
11use itertools::Itertools;
12use thiserror::Error;
13
14use crate::experimental::clock::{Duration, Timestamp, TimestampExt as _};
15use crate::experimental::series::SerializedBuffer;
16use crate::experimental::series::buffer::simple8b_rle::SIMPLE8B_SELECTOR_BIT_COUNTS;
17
18const BLOCK_LENGTH: usize = std::mem::size_of::<u64>();
19const BITS_PER_SELECTOR: NonZeroU8 = NonZeroU8::new(4).unwrap();
20
21#[derive(Debug)]
23pub struct Decoder<'a> {
24 pub semantic: &'a str,
25 pub created_timestamp: Timestamp,
26 pub end_timestamp: Timestamp,
27 pub series_type: SeriesType,
28 data: &'a [u8],
29}
30
31impl<'a> Decoder<'a> {
32 pub fn from_serialized_buffer(buffer: &'a SerializedBuffer) -> Result<Self, DecodeError> {
33 Self::new(buffer.data.as_slice(), &buffer.data_semantic)
34 }
35
36 pub fn new(mut data: &'a [u8], semantic: &'a str) -> Result<Self, DecodeError> {
37 let version = data.read_u8()?;
38 if version != 1 {
39 return Err(DecodeError::UnknownVersion(version));
40 }
41 let created_timestamp =
42 Timestamp::from_quanta(data.read_u32::<LittleEndian>()?.try_into()?);
43 let end_timestamp = Timestamp::from_quanta(data.read_u32::<LittleEndian>()?.try_into()?);
44
45 let series_type = SeriesType::decode(&mut data)?;
46 Ok(Self { semantic, created_timestamp, end_timestamp, series_type, data })
47 }
48
49 pub fn iter_series(&self) -> TimeSeriesIterator<'a> {
50 TimeSeriesIterator { data: self.data, series_type: self.series_type }
51 }
52}
53
54pub struct TimeSeriesIterator<'a> {
55 data: &'a [u8],
56 series_type: SeriesType,
57}
58
59impl<'a> TimeSeriesIterator<'a> {
60 fn try_next_series(&mut self) -> Result<TimeSeries<'a>, DecodeError> {
61 let Self { data, series_type } = self;
62 let len = usize::from(data.read_u16::<LittleEndian>()?);
63 if data.len() < len {
64 return Err(DecodeError::ShortBuffer);
65 }
66 let granularity = Duration::from_seconds(data.read_u16::<LittleEndian>()?.into());
67 let len = len.checked_sub(std::mem::size_of::<u16>()).unwrap();
69 let (mut time_series, rest) = data.split_at(len);
70 *data = rest;
71 let series_data = SeriesData::decode(&mut time_series, *series_type)?;
72 Ok(TimeSeries { granularity, data: time_series, series_data })
73 }
74}
75
76impl<'a> Iterator for TimeSeriesIterator<'a> {
77 type Item = Result<TimeSeries<'a>, DecodeError>;
78
79 fn next(&mut self) -> Option<Self::Item> {
80 if self.data.is_empty() {
81 return None;
82 }
83 let r = self.try_next_series();
84 if r.is_err() {
85 self.data = &[];
87 }
88 Some(r)
89 }
90}
91
92pub struct TimeSeries<'a> {
93 pub granularity: Duration,
95 series_data: SeriesData<'a>,
96 data: &'a [u8],
97}
98
99impl<'a> TimeSeries<'a> {
100 pub fn data_points<D: DataPoint>(&self) -> impl Iterator<Item = Result<D, DecodeError>> + 'a {
101 let Self { granularity: _, series_data, data } = self;
102 match series_data {
103 SeriesData::Simple8bRle(i) => {
104 CompressedBlocksIterator::from_simple8b_rle_info(data, i).data_points()
105 }
106 }
107 }
108
109 pub fn data_points_64(&self) -> impl Iterator<Item = Result<u64, DecodeError>> + 'a {
110 self.data_points::<u64>()
111 }
112}
113
114pub trait DataPoint: Sized + 'static {
115 fn from_u64(v: u64) -> Result<Self, DecodeError>;
116}
117
118impl DataPoint for u64 {
119 fn from_u64(v: u64) -> Result<Self, DecodeError> {
120 Ok(v)
121 }
122}
123
124struct CompressedBlocksIterator<'a, D> {
125 data: &'a [u8],
126 last_block_num_values: u8,
127 selectors: iter::Take<iter::Skip<BitsIterator<'a>>>,
128 _marker: PhantomData<D>,
129}
130
131impl<'a, D> CompressedBlocksIterator<'a, D> {
132 fn from_simple8b_rle_info(data: &'a [u8], info: &Simple8bRleInfo<'a>) -> Self {
133 let Simple8bRleInfo {
134 selectors,
135 num_selectors,
136 index_to_head_selector,
137 last_block_num_values,
138 } = info;
139 let selectors = BitsIterator::new(*selectors, BITS_PER_SELECTOR)
141 .skip(*index_to_head_selector)
142 .take(*num_selectors);
143
144 Self {
145 data,
146 last_block_num_values: *last_block_num_values,
147 selectors,
148 _marker: PhantomData,
149 }
150 }
151
152 fn try_next_block(&mut self) -> Result<Option<CompressedBlock<'a, D>>, DecodeError> {
153 let Self { data, last_block_num_values, selectors, _marker } = self;
154 let Some(selector) = selectors.next() else {
155 return Ok(None);
156 };
157 static_assertions::const_assert!(BITS_PER_SELECTOR.get() as u32 <= u8::BITS);
161 let selector = u8::try_from(selector).expect("selector overflow");
162 let reader = CompressedBlockDataReader::decode(data, selector)?;
163 let take = (data.is_empty() && reader.should_limit_last_block())
167 .then(|| (*last_block_num_values).into());
168 Ok(Some(CompressedBlock { reader, take, _marker: PhantomData }))
169 }
170}
171
172impl<'a, D: DataPoint> CompressedBlocksIterator<'a, D> {
173 fn data_points(self) -> impl Iterator<Item = Result<D, DecodeError>> + 'a {
174 self.flatten_ok().map(|r| r.and_then(|r| r))
175 }
176}
177
178impl<'a, D> Iterator for CompressedBlocksIterator<'a, D> {
179 type Item = Result<CompressedBlock<'a, D>, DecodeError>;
180
181 fn next(&mut self) -> Option<Self::Item> {
182 if self.data.is_empty() {
183 return None;
184 }
185 let r = self.try_next_block();
186 if r.is_err() {
187 self.data = &[];
189 }
190 r.transpose()
191 }
192}
193
194struct AlignedReader<'a, T> {
195 data: &'a [u8],
196 _marker: PhantomData<T>,
197}
198
199impl<'a, T> AlignedReader<'a, T> {
200 fn new(data: &'a [u8]) -> Self {
201 Self { data, _marker: PhantomData }
202 }
203}
204
205impl<T: AlignedDataPoint> Iterator for AlignedReader<'_, T> {
206 type Item = u64;
207
208 fn next(&mut self) -> Option<Self::Item> {
209 let Self { data, _marker } = self;
210 T::read(data).ok()
211 }
212}
213
214trait AlignedDataPoint {
215 fn read(data: &mut &[u8]) -> Result<u64, DecodeError>;
216}
217
218impl AlignedDataPoint for u8 {
219 fn read(data: &mut &[u8]) -> Result<u64, DecodeError> {
220 Ok(data.read_u8().map(u64::from)?)
221 }
222}
223
224impl AlignedDataPoint for u16 {
225 fn read(data: &mut &[u8]) -> Result<u64, DecodeError> {
226 Ok(data.read_u16::<LittleEndian>().map(u64::from)?)
227 }
228}
229
230impl AlignedDataPoint for u32 {
231 fn read(data: &mut &[u8]) -> Result<u64, DecodeError> {
232 Ok(data.read_u32::<LittleEndian>().map(u64::from)?)
233 }
234}
235
236impl AlignedDataPoint for u64 {
237 fn read(data: &mut &[u8]) -> Result<u64, DecodeError> {
238 Ok(data.read_u64::<LittleEndian>()?)
239 }
240}
241
242struct RleReader {
243 value: u64,
244 repeat: u64,
245}
246
247impl RleReader {
248 fn new(mut data: &[u8]) -> Result<Self, DecodeError> {
249 let data = data.read_u64::<LittleEndian>()?;
250 let value = data & 0x0000ffffffffffff;
253 let repeat = data >> (6 * 8);
254 Ok(Self { value, repeat })
255 }
256}
257
258impl Iterator for RleReader {
259 type Item = u64;
260
261 fn next(&mut self) -> Option<Self::Item> {
262 let Self { value, repeat } = self;
263 let n = repeat.checked_sub(1)?;
264 *repeat = n;
265 Some(*value)
266 }
267}
268
269enum CompressedBlockDataReader<'a> {
270 U8(AlignedReader<'a, u8>),
271 U16(AlignedReader<'a, u16>),
272 U32(AlignedReader<'a, u32>),
273 U64(AlignedReader<'a, u64>),
274 Bits(BitsIterator<'a>),
275 Rle(RleReader),
276}
277
278impl<'a> CompressedBlockDataReader<'a> {
279 fn decode(data: &mut &'a [u8], selector: u8) -> Result<Self, DecodeError> {
280 if data.len() < BLOCK_LENGTH {
289 return Err(DecodeError::ShortBuffer);
290 }
291 let (block, rest) = data.split_at(BLOCK_LENGTH);
292 *data = rest;
293
294 if selector == 15 {
295 return Ok(Self::Rle(RleReader::new(block)?));
296 }
297 let bits_per_int = *SIMPLE8B_SELECTOR_BIT_COUNTS
298 .get(usize::from(selector))
299 .ok_or_else(|| DecodeError::UnsupportedSelector(selector))?;
300
301 match bits_per_int {
302 u8::BITS => Ok(Self::U8(AlignedReader::new(block))),
304 u16::BITS => Ok(Self::U16(AlignedReader::new(block))),
306 u32::BITS => Ok(Self::U32(AlignedReader::new(block))),
308 u64::BITS => Ok(Self::U64(AlignedReader::new(block))),
310 b => Ok(Self::Bits(BitsIterator::new(
311 block,
312 NonZeroU8::new(b.try_into().expect("bits per int overflow"))
313 .expect("zero bits iterator"),
314 ))),
315 }
316 }
317
318 fn should_limit_last_block(&self) -> bool {
319 match self {
320 Self::Rle(_) => false,
322 _ => true,
323 }
324 }
325}
326
327impl<'a> Iterator for CompressedBlockDataReader<'a> {
328 type Item = u64;
329
330 fn next(&mut self) -> Option<Self::Item> {
331 match self {
332 Self::U8(r) => r.next(),
333 Self::U16(r) => r.next(),
334 Self::U32(r) => r.next(),
335 Self::U64(r) => r.next(),
336 Self::Bits(r) => r.next(),
337 Self::Rle(r) => r.next(),
338 }
339 }
340}
341
342struct CompressedBlock<'a, D> {
343 reader: CompressedBlockDataReader<'a>,
344 take: Option<usize>,
345 _marker: PhantomData<D>,
346}
347
348impl<'a, D: DataPoint> Iterator for CompressedBlock<'a, D> {
349 type Item = Result<D, DecodeError>;
350
351 fn next(&mut self) -> Option<Self::Item> {
352 let Self { reader, take, _marker } = self;
353 if let Some(take) = take {
354 *take = take.checked_sub(1)?;
355 }
356 reader.next().map(D::from_u64)
357 }
358}
359
360#[derive(Debug, Eq, PartialEq, Copy, Clone)]
361pub enum SeriesType {
362 Simple8bRle,
363}
364
365impl SeriesType {
366 fn decode(data: &mut &[u8]) -> Result<Self, DecodeError> {
367 let series_type = data.read_u8()?;
368 let series_subtype = data.read_u8()?;
369 match (series_type, series_subtype) {
370 (1, 0) => Ok(SeriesType::Simple8bRle),
371 (t, st) => Err(DecodeError::UnsupportedSeriesType(t, st)),
374 }
375 }
376}
377
378#[derive(Debug, Copy, Clone)]
380enum SeriesData<'a> {
381 Simple8bRle(Simple8bRleInfo<'a>),
382}
383
384impl<'a> SeriesData<'a> {
385 fn decode(data: &mut &'a [u8], series_type: SeriesType) -> Result<Self, DecodeError> {
386 match series_type {
387 SeriesType::Simple8bRle => Simple8bRleInfo::decode(data).map(SeriesData::Simple8bRle),
388 }
389 }
390}
391
392#[derive(Copy, Clone, Debug)]
393struct Simple8bRleInfo<'a> {
394 selectors: &'a [u8],
395 num_selectors: usize,
396 index_to_head_selector: usize,
397 last_block_num_values: u8,
398}
399
400impl<'a> Simple8bRleInfo<'a> {
401 fn decode(data: &mut &'a [u8]) -> Result<Self, DecodeError> {
402 let num_selectors = usize::from(data.read_u16::<LittleEndian>()?);
403 let index_to_head_selector = usize::from(data.read_u16::<LittleEndian>()?);
404 let last_block_num_values = data.read_u8()?;
405
406 let selector_bytes = (num_selectors + index_to_head_selector + 1) / 2;
409 let (selectors, rest) = data.split_at(selector_bytes);
410 *data = rest;
411
412 let want_len = num_selectors * BLOCK_LENGTH;
413 if data.len() != want_len {
414 return Err(DecodeError::UnexpectedBlocksLength { want: want_len, got: data.len() });
415 }
416
417 Ok(Self { selectors, num_selectors, index_to_head_selector, last_block_num_values })
418 }
419}
420
421#[derive(Error, Debug)]
422pub enum DecodeError {
423 #[error("unknown version {0}")]
424 UnknownVersion(u8),
425 #[error("unsupported series type {0} {1}")]
426 UnsupportedSeriesType(u8, u8),
427 #[error("unsupported selector {0}")]
428 UnsupportedSelector(u8),
429 #[error(transparent)]
430 Io(#[from] std::io::Error),
431 #[error("data buffer cut off")]
432 ShortBuffer,
433 #[error("unexpected block length {got}, want: {want}")]
434 UnexpectedBlocksLength { want: usize, got: usize },
435 #[error(transparent)]
436 Other(anyhow::Error),
437}
438
439impl From<Infallible> for DecodeError {
440 fn from(v: Infallible) -> Self {
441 match v {}
442 }
443}
444
445#[derive(Copy, Clone)]
446struct BitsIterator<'a> {
447 data: &'a [u8],
448 bits_per_int: NonZeroU8,
449 current_bit: u8,
450}
451
452impl<'a> BitsIterator<'a> {
453 fn new(data: &'a [u8], bits_per_int: NonZeroU8) -> Self {
454 assert!(u32::from(bits_per_int.get()) <= u64::BITS);
455 Self { data, bits_per_int, current_bit: 0 }
456 }
457}
458
459impl<'a> Iterator for BitsIterator<'a> {
460 type Item = u64;
461
462 fn next(&mut self) -> Option<Self::Item> {
463 let Self { data, bits_per_int, current_bit } = self;
464
465 let mut value: u64 = 0;
467 let mut bits_remaining = bits_per_int.get();
469
470 loop {
471 let current_byte = data.first()?;
472
473 let bits_available = 8 - *current_bit;
475 let bits_to_read = std::cmp::min(bits_remaining, bits_available);
476
477 let mask = (0xff >> (8 - bits_to_read - *current_bit)) & (0xff << *current_bit);
479
480 let extracted_bits = (*current_byte & mask) >> *current_bit;
485 value |= u64::from(extracted_bits) << (bits_per_int.get() - bits_remaining);
486
487 bits_remaining -= bits_to_read;
489 *current_bit += bits_to_read;
490
491 if *current_bit == 8 {
492 *current_bit = 0;
493 *data = data.split_at(1).1;
494 }
495
496 if bits_remaining == 0 {
497 return Some(value);
498 }
499 }
500 }
501}
502
503#[cfg(test)]
504mod tests {
505 use std::fmt::Debug;
506
507 use super::*;
508
509 use proptest::prop_assert_eq;
510 use proptest::strategy::Strategy;
511 use proptest::test_runner::TestCaseError;
512
513 use crate::experimental::clock::{Timed, Timestamp};
514 use crate::experimental::series::buffer::simple8b_rle::{
515 Simple8bRleBlock, Simple8bRleRingBuffer, simple8b_block_from_values,
516 };
517 use crate::experimental::series::interpolation::ConstantSample;
518 use crate::experimental::series::statistic::Last;
519 use crate::experimental::series::{
520 Capacity, SamplingProfile, TimeMatrix, TimeMatrixFold, TimeMatrixTick,
521 };
522
523 #[test]
524 fn decode_simple8b_rle_time_series() {
525 let profile = SamplingProfile::balanced();
526
527 let intervals = profile.clone().into_sampling_intervals();
528 let granular_profile = intervals.iter().min_by_key(|x| x.max_sampling_period()).unwrap();
529 let granularity = granular_profile.max_sampling_period();
530 let samples = match granular_profile.capacity() {
531 Capacity::MinSamples(n) => n.get(),
532 };
533
534 let start = Timestamp::from_nanos(0);
535 let mut time = start;
536
537 let mut series =
538 TimeMatrix::<Last<u64>, ConstantSample>::new_at(time, profile, Default::default());
539
540 let mut value = 1;
541 let mut values = Vec::new();
542 for _ in 0..samples {
543 series.fold(Timed::at(value, time)).expect("fold sample");
544 values.push(value);
545 time += granularity;
546 value += 1
547 }
548 let end = time;
549
550 let buffer = series.tick_and_get_buffers(end).expect("tick");
551 let decoder = Decoder::from_serialized_buffer(&buffer).expect("decode");
552 let Decoder { semantic, created_timestamp, end_timestamp, series_type, data: _ } = &decoder;
553 assert_eq!(semantic, &buffer.data_semantic);
554 assert_eq!(created_timestamp, &start);
555 assert_eq!(end_timestamp, &end);
556 assert_eq!(series_type, &SeriesType::Simple8bRle);
557
558 let duration = end - start;
559
560 let series = decoder.iter_series().zip_eq(intervals);
562 for (series, interval) in series {
563 let series = series.expect("error decoding series");
564 assert_eq!(series.granularity, interval.max_sampling_period());
565 let data_points =
567 series.data_points_64().collect::<Result<Vec<_>, _>>().expect("decode time series");
568 let expect_samples = usize::try_from(
569 duration.into_seconds() / interval.max_sampling_period().into_seconds(),
570 )
571 .unwrap();
572 assert_eq!(data_points.len(), expect_samples);
573 }
574
575 let first_series = decoder
577 .iter_series()
578 .next()
579 .expect("has first series")
580 .expect("can decode first series");
581 let samples = first_series
582 .data_points_64()
583 .collect::<Result<Vec<_>, _>>()
584 .expect("decode time series");
585 assert_eq!(samples, values);
586 }
587
588 const TEST_MIN_SAMPLES: usize = 500;
589
590 fn decode_simple8b_rle_inner(samples: Vec<u64>) -> Result<(), TestCaseError> {
591 let mut buffer = Simple8bRleRingBuffer::with_min_samples(TEST_MIN_SAMPLES);
592 for i in samples.iter() {
593 let evicted = buffer.push(*i);
594 prop_assert_eq!(evicted, vec![]);
596 }
597
598 let mut serialized = Vec::new();
599 buffer.serialize(&mut serialized).prop_context("serialize")?;
600 let mut serialized = &serialized[..];
601
602 let info = Simple8bRleInfo::decode(&mut serialized).prop_context("decode info")?;
603 let deserialized =
604 CompressedBlocksIterator::<'_, u64>::from_simple8b_rle_info(serialized, &info)
605 .data_points()
606 .collect::<Result<Vec<_>, _>>()
607 .prop_context("data points")?;
608 prop_assert_eq!(deserialized, samples);
609 Ok(())
610 }
611
612 fn u64_samples_strategy() -> impl Strategy<Value = Vec<u64>> {
613 (0..TEST_MIN_SAMPLES, 0..=u64::BITS).prop_flat_map(|(len, bits)| {
614 let max = if bits == u64::BITS { u64::MAX } else { (1 << bits) - 1 };
615 proptest::collection::vec(0..=max, len)
616 })
617 }
618
619 trait ResultExt<T> {
620 fn prop_context(self, context: &str) -> Result<T, TestCaseError>;
621 }
622
623 impl<T, E: Debug> ResultExt<T> for Result<T, E> {
624 fn prop_context(self, context: &str) -> Result<T, TestCaseError> {
625 self.map_err(|e| TestCaseError::fail(format!("{context}: {e:?}")))
626 }
627 }
628
629 fn bits_iterator_inner(
630 mut offset: u64,
631 simple8b_rle_selector: u8,
632 ) -> Result<(), TestCaseError> {
633 let bits = SIMPLE8B_SELECTOR_BIT_COUNTS[usize::from(simple8b_rle_selector)];
634 let mask = if bits == u64::BITS { u64::MAX } else { (1u64 << bits) - 1 };
635
636 let generator = std::iter::repeat_with(move || {
637 let nxt = offset;
638 offset += 1;
639 nxt & mask
640 });
641
642 let Simple8bRleBlock { selector: _, data } =
643 simple8b_block_from_values(simple8b_rle_selector, &mut generator.clone());
644 let num_values = u64::BITS / bits;
645
646 let expect = generator.take(num_values.try_into().unwrap()).collect::<Vec<_>>();
647 let data = data.to_le_bytes();
648
649 let values =
650 BitsIterator::new(&data[..], NonZeroU8::new(bits.try_into().unwrap()).unwrap())
651 .collect::<Vec<_>>();
652 prop_assert_eq!(values, expect);
653 Ok(())
654 }
655
656 proptest::proptest! {
657 #![proptest_config(proptest::test_runner::Config {
658 failure_persistence: proptest_support::failed_seeds!(
660 "cc b37d7d02c5bf8547fef78e979a7e64b86ed03eb9b77e2e16770bd96b1deed5d7"
661 ),
662 ..proptest::test_runner::Config::default()
663 })]
664
665 #[test]
666 fn decode_simple8b_rle(x in u64_samples_strategy()) {
667 decode_simple8b_rle_inner(x)?;
668 }
669
670 #[test]
671 fn bits_iterator((offset, selector) in (0u64..100, 0..(SIMPLE8B_SELECTOR_BIT_COUNTS.len()) )) {
672 bits_iterator_inner(offset, selector.try_into().unwrap())?;
673 }
674
675 }
676}