windowed_stats/experimental/series/buffer/
decode.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::convert::Infallible;
6use std::iter;
7use std::marker::PhantomData;
8use std::num::NonZeroU8;
9
10use byteorder::{LittleEndian, ReadBytesExt as _};
11use itertools::Itertools;
12use thiserror::Error;
13
14use crate::experimental::clock::{Duration, Timestamp, TimestampExt as _};
15use crate::experimental::series::SerializedBuffer;
16use crate::experimental::series::buffer::simple8b_rle::SIMPLE8B_SELECTOR_BIT_COUNTS;
17
18const BLOCK_LENGTH: usize = std::mem::size_of::<u64>();
19const BITS_PER_SELECTOR: NonZeroU8 = NonZeroU8::new(4).unwrap();
20
21/// A struct for decoding serialized data by this crate.
22#[derive(Debug)]
23pub struct Decoder<'a> {
24    pub semantic: &'a str,
25    pub created_timestamp: Timestamp,
26    pub end_timestamp: Timestamp,
27    pub series_type: SeriesType,
28    data: &'a [u8],
29}
30
31impl<'a> Decoder<'a> {
32    pub fn from_serialized_buffer(buffer: &'a SerializedBuffer) -> Result<Self, DecodeError> {
33        Self::new(buffer.data.as_slice(), &buffer.data_semantic)
34    }
35
36    pub fn new(mut data: &'a [u8], semantic: &'a str) -> Result<Self, DecodeError> {
37        let version = data.read_u8()?;
38        if version != 1 {
39            return Err(DecodeError::UnknownVersion(version));
40        }
41        let created_timestamp =
42            Timestamp::from_quanta(data.read_u32::<LittleEndian>()?.try_into()?);
43        let end_timestamp = Timestamp::from_quanta(data.read_u32::<LittleEndian>()?.try_into()?);
44
45        let series_type = SeriesType::decode(&mut data)?;
46        Ok(Self { semantic, created_timestamp, end_timestamp, series_type, data })
47    }
48
49    pub fn iter_series(&self) -> TimeSeriesIterator<'a> {
50        TimeSeriesIterator { data: self.data, series_type: self.series_type }
51    }
52}
53
54pub struct TimeSeriesIterator<'a> {
55    data: &'a [u8],
56    series_type: SeriesType,
57}
58
59impl<'a> TimeSeriesIterator<'a> {
60    fn try_next_series(&mut self) -> Result<TimeSeries<'a>, DecodeError> {
61        let Self { data, series_type } = self;
62        let len = usize::from(data.read_u16::<LittleEndian>()?);
63        if data.len() < len {
64            return Err(DecodeError::ShortBuffer);
65        }
66        let granularity = Duration::from_seconds(data.read_u16::<LittleEndian>()?.into());
67        // Can't underflow given we checked against data length above.
68        let len = len.checked_sub(std::mem::size_of::<u16>()).unwrap();
69        let (mut time_series, rest) = data.split_at(len);
70        *data = rest;
71        let series_data = SeriesData::decode(&mut time_series, *series_type)?;
72        Ok(TimeSeries { granularity, data: time_series, series_data })
73    }
74}
75
76impl<'a> Iterator for TimeSeriesIterator<'a> {
77    type Item = Result<TimeSeries<'a>, DecodeError>;
78
79    fn next(&mut self) -> Option<Self::Item> {
80        if self.data.is_empty() {
81            return None;
82        }
83        let r = self.try_next_series();
84        if r.is_err() {
85            // Return None from now on.
86            self.data = &[];
87        }
88        Some(r)
89    }
90}
91
92pub struct TimeSeries<'a> {
93    /// Time series granularity.
94    pub granularity: Duration,
95    series_data: SeriesData<'a>,
96    data: &'a [u8],
97}
98
99impl<'a> TimeSeries<'a> {
100    pub fn data_points<D: DataPoint>(&self) -> impl Iterator<Item = Result<D, DecodeError>> + 'a {
101        let Self { granularity: _, series_data, data } = self;
102        match series_data {
103            SeriesData::Simple8bRle(i) => {
104                CompressedBlocksIterator::from_simple8b_rle_info(data, i).data_points()
105            }
106        }
107    }
108
109    pub fn data_points_64(&self) -> impl Iterator<Item = Result<u64, DecodeError>> + 'a {
110        self.data_points::<u64>()
111    }
112}
113
114pub trait DataPoint: Sized + 'static {
115    fn from_u64(v: u64) -> Result<Self, DecodeError>;
116}
117
118impl DataPoint for u64 {
119    fn from_u64(v: u64) -> Result<Self, DecodeError> {
120        Ok(v)
121    }
122}
123
124struct CompressedBlocksIterator<'a, D> {
125    data: &'a [u8],
126    last_block_num_values: u8,
127    selectors: iter::Take<iter::Skip<BitsIterator<'a>>>,
128    _marker: PhantomData<D>,
129}
130
131impl<'a, D> CompressedBlocksIterator<'a, D> {
132    fn from_simple8b_rle_info(data: &'a [u8], info: &Simple8bRleInfo<'a>) -> Self {
133        let Simple8bRleInfo {
134            selectors,
135            num_selectors,
136            index_to_head_selector,
137            last_block_num_values,
138        } = info;
139        // Selectors are 4 bits each.
140        let selectors = BitsIterator::new(*selectors, BITS_PER_SELECTOR)
141            .skip(*index_to_head_selector)
142            .take(*num_selectors);
143
144        Self {
145            data,
146            last_block_num_values: *last_block_num_values,
147            selectors,
148            _marker: PhantomData,
149        }
150    }
151
152    fn try_next_block(&mut self) -> Result<Option<CompressedBlock<'a, D>>, DecodeError> {
153        let Self { data, last_block_num_values, selectors, _marker } = self;
154        let Some(selector) = selectors.next() else {
155            return Ok(None);
156        };
157        // The bad type here is an artifact of reading u64s from `BitsIterator`,
158        // we know this can't overflow because we create the selector
159        // BitsIterator with BITS_PER_SELECTOR.
160        static_assertions::const_assert!(BITS_PER_SELECTOR.get() as u32 <= u8::BITS);
161        let selector = u8::try_from(selector).expect("selector overflow");
162        let reader = CompressedBlockDataReader::decode(data, selector)?;
163        // Decoding the reader consumes from the data slice. If we get an empty
164        // slice, it means we're yielding the last block and should perhaps
165        // apply the last block limit.
166        let take = (data.is_empty() && reader.should_limit_last_block())
167            .then(|| (*last_block_num_values).into());
168        Ok(Some(CompressedBlock { reader, take, _marker: PhantomData }))
169    }
170}
171
172impl<'a, D: DataPoint> CompressedBlocksIterator<'a, D> {
173    fn data_points(self) -> impl Iterator<Item = Result<D, DecodeError>> + 'a {
174        self.flatten_ok().map(|r| r.and_then(|r| r))
175    }
176}
177
178impl<'a, D> Iterator for CompressedBlocksIterator<'a, D> {
179    type Item = Result<CompressedBlock<'a, D>, DecodeError>;
180
181    fn next(&mut self) -> Option<Self::Item> {
182        if self.data.is_empty() {
183            return None;
184        }
185        let r = self.try_next_block();
186        if r.is_err() {
187            // Return None from now on.
188            self.data = &[];
189        }
190        r.transpose()
191    }
192}
193
194struct AlignedReader<'a, T> {
195    data: &'a [u8],
196    _marker: PhantomData<T>,
197}
198
199impl<'a, T> AlignedReader<'a, T> {
200    fn new(data: &'a [u8]) -> Self {
201        Self { data, _marker: PhantomData }
202    }
203}
204
205impl<T: AlignedDataPoint> Iterator for AlignedReader<'_, T> {
206    type Item = u64;
207
208    fn next(&mut self) -> Option<Self::Item> {
209        let Self { data, _marker } = self;
210        T::read(data).ok()
211    }
212}
213
214trait AlignedDataPoint {
215    fn read(data: &mut &[u8]) -> Result<u64, DecodeError>;
216}
217
218impl AlignedDataPoint for u8 {
219    fn read(data: &mut &[u8]) -> Result<u64, DecodeError> {
220        Ok(data.read_u8().map(u64::from)?)
221    }
222}
223
224impl AlignedDataPoint for u16 {
225    fn read(data: &mut &[u8]) -> Result<u64, DecodeError> {
226        Ok(data.read_u16::<LittleEndian>().map(u64::from)?)
227    }
228}
229
230impl AlignedDataPoint for u32 {
231    fn read(data: &mut &[u8]) -> Result<u64, DecodeError> {
232        Ok(data.read_u32::<LittleEndian>().map(u64::from)?)
233    }
234}
235
236impl AlignedDataPoint for u64 {
237    fn read(data: &mut &[u8]) -> Result<u64, DecodeError> {
238        Ok(data.read_u64::<LittleEndian>()?)
239    }
240}
241
242struct RleReader {
243    value: u64,
244    repeat: u64,
245}
246
247impl RleReader {
248    fn new(mut data: &[u8]) -> Result<Self, DecodeError> {
249        let data = data.read_u64::<LittleEndian>()?;
250        // RLE stores a 6-byte, little-endian integer followed by a 2-byte
251        // length.
252        let value = data & 0x0000ffffffffffff;
253        let repeat = data >> (6 * 8);
254        Ok(Self { value, repeat })
255    }
256}
257
258impl Iterator for RleReader {
259    type Item = u64;
260
261    fn next(&mut self) -> Option<Self::Item> {
262        let Self { value, repeat } = self;
263        let n = repeat.checked_sub(1)?;
264        *repeat = n;
265        Some(*value)
266    }
267}
268
269enum CompressedBlockDataReader<'a> {
270    U8(AlignedReader<'a, u8>),
271    U16(AlignedReader<'a, u16>),
272    U32(AlignedReader<'a, u32>),
273    U64(AlignedReader<'a, u64>),
274    Bits(BitsIterator<'a>),
275    Rle(RleReader),
276}
277
278impl<'a> CompressedBlockDataReader<'a> {
279    fn decode(data: &mut &'a [u8], selector: u8) -> Result<Self, DecodeError> {
280        // The following table describes the meaning of each selector for
281        // Simple8bRle:
282        //
283        // ```
284        // Selector value:  0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 | 15 (RLE)
285        // Integers coded: 64 32 21 16 12 10  9  8  7  6  5  4  3  2  1 | up to 2^16
286        // ```
287
288        if data.len() < BLOCK_LENGTH {
289            return Err(DecodeError::ShortBuffer);
290        }
291        let (block, rest) = data.split_at(BLOCK_LENGTH);
292        *data = rest;
293
294        if selector == 15 {
295            return Ok(Self::Rle(RleReader::new(block)?));
296        }
297        let bits_per_int = *SIMPLE8B_SELECTOR_BIT_COUNTS
298            .get(usize::from(selector))
299            .ok_or_else(|| DecodeError::UnsupportedSelector(selector))?;
300
301        match bits_per_int {
302            // Selector 7 (0x07): 8 x 8-bit integers.
303            u8::BITS => Ok(Self::U8(AlignedReader::new(block))),
304            // Selector 11 (0x0b): 4 x 16-bit integers.
305            u16::BITS => Ok(Self::U16(AlignedReader::new(block))),
306            // Selector 13 (0x0d): 2 x 32-bit integers.
307            u32::BITS => Ok(Self::U32(AlignedReader::new(block))),
308            // Selector 14 (0x0e): 1 x 64-bit integers.
309            u64::BITS => Ok(Self::U64(AlignedReader::new(block))),
310            b => Ok(Self::Bits(BitsIterator::new(
311                block,
312                NonZeroU8::new(b.try_into().expect("bits per int overflow"))
313                    .expect("zero bits iterator"),
314            ))),
315        }
316    }
317
318    fn should_limit_last_block(&self) -> bool {
319        match self {
320            // Rle carries the length with it we don't need to limit.
321            Self::Rle(_) => false,
322            _ => true,
323        }
324    }
325}
326
327impl<'a> Iterator for CompressedBlockDataReader<'a> {
328    type Item = u64;
329
330    fn next(&mut self) -> Option<Self::Item> {
331        match self {
332            Self::U8(r) => r.next(),
333            Self::U16(r) => r.next(),
334            Self::U32(r) => r.next(),
335            Self::U64(r) => r.next(),
336            Self::Bits(r) => r.next(),
337            Self::Rle(r) => r.next(),
338        }
339    }
340}
341
342struct CompressedBlock<'a, D> {
343    reader: CompressedBlockDataReader<'a>,
344    take: Option<usize>,
345    _marker: PhantomData<D>,
346}
347
348impl<'a, D: DataPoint> Iterator for CompressedBlock<'a, D> {
349    type Item = Result<D, DecodeError>;
350
351    fn next(&mut self) -> Option<Self::Item> {
352        let Self { reader, take, _marker } = self;
353        if let Some(take) = take {
354            *take = take.checked_sub(1)?;
355        }
356        reader.next().map(D::from_u64)
357    }
358}
359
360#[derive(Debug, Eq, PartialEq, Copy, Clone)]
361pub enum SeriesType {
362    Simple8bRle,
363}
364
365impl SeriesType {
366    fn decode(data: &mut &[u8]) -> Result<Self, DecodeError> {
367        let series_type = data.read_u8()?;
368        let series_subtype = data.read_u8()?;
369        match (series_type, series_subtype) {
370            (1, 0) => Ok(SeriesType::Simple8bRle),
371            // TODO(https://fxbug.dev/460161247): Support remaining series
372            // types.
373            (t, st) => Err(DecodeError::UnsupportedSeriesType(t, st)),
374        }
375    }
376}
377
378/// Supported decoding compression series types.
379#[derive(Debug, Copy, Clone)]
380enum SeriesData<'a> {
381    Simple8bRle(Simple8bRleInfo<'a>),
382}
383
384impl<'a> SeriesData<'a> {
385    fn decode(data: &mut &'a [u8], series_type: SeriesType) -> Result<Self, DecodeError> {
386        match series_type {
387            SeriesType::Simple8bRle => Simple8bRleInfo::decode(data).map(SeriesData::Simple8bRle),
388        }
389    }
390}
391
392#[derive(Copy, Clone, Debug)]
393struct Simple8bRleInfo<'a> {
394    selectors: &'a [u8],
395    num_selectors: usize,
396    index_to_head_selector: usize,
397    last_block_num_values: u8,
398}
399
400impl<'a> Simple8bRleInfo<'a> {
401    fn decode(data: &mut &'a [u8]) -> Result<Self, DecodeError> {
402        let num_selectors = usize::from(data.read_u16::<LittleEndian>()?);
403        let index_to_head_selector = usize::from(data.read_u16::<LittleEndian>()?);
404        let last_block_num_values = data.read_u8()?;
405
406        // Each selector is 4 bits. When `numSelectors + indexToHeadSelector` is
407        // odd, the last half-byte will be filled with 4 empty bits.
408        let selector_bytes = (num_selectors + index_to_head_selector + 1) / 2;
409        let (selectors, rest) = data.split_at(selector_bytes);
410        *data = rest;
411
412        let want_len = num_selectors * BLOCK_LENGTH;
413        if data.len() != want_len {
414            return Err(DecodeError::UnexpectedBlocksLength { want: want_len, got: data.len() });
415        }
416
417        Ok(Self { selectors, num_selectors, index_to_head_selector, last_block_num_values })
418    }
419}
420
421#[derive(Error, Debug)]
422pub enum DecodeError {
423    #[error("unknown version {0}")]
424    UnknownVersion(u8),
425    #[error("unsupported series type {0} {1}")]
426    UnsupportedSeriesType(u8, u8),
427    #[error("unsupported selector {0}")]
428    UnsupportedSelector(u8),
429    #[error(transparent)]
430    Io(#[from] std::io::Error),
431    #[error("data buffer cut off")]
432    ShortBuffer,
433    #[error("unexpected block length {got}, want: {want}")]
434    UnexpectedBlocksLength { want: usize, got: usize },
435    #[error(transparent)]
436    Other(anyhow::Error),
437}
438
439impl From<Infallible> for DecodeError {
440    fn from(v: Infallible) -> Self {
441        match v {}
442    }
443}
444
445#[derive(Copy, Clone)]
446struct BitsIterator<'a> {
447    data: &'a [u8],
448    bits_per_int: NonZeroU8,
449    current_bit: u8,
450}
451
452impl<'a> BitsIterator<'a> {
453    fn new(data: &'a [u8], bits_per_int: NonZeroU8) -> Self {
454        assert!(u32::from(bits_per_int.get()) <= u64::BITS);
455        Self { data, bits_per_int, current_bit: 0 }
456    }
457}
458
459impl<'a> Iterator for BitsIterator<'a> {
460    type Item = u64;
461
462    fn next(&mut self) -> Option<Self::Item> {
463        let Self { data, bits_per_int, current_bit } = self;
464
465        // The current integer being reconstructed.
466        let mut value: u64 = 0;
467        // Bits remaining to complete the current integer.
468        let mut bits_remaining = bits_per_int.get();
469
470        loop {
471            let current_byte = data.first()?;
472
473            // Calculate bits available in the current byte.
474            let bits_available = 8 - *current_bit;
475            let bits_to_read = std::cmp::min(bits_remaining, bits_available);
476
477            // Calculate the mask for the current byte.
478            let mask = (0xff >> (8 - bits_to_read - *current_bit)) & (0xff << *current_bit);
479
480            // 1. Mask off new bits.
481            // 2. Shift right to align them to 0.
482            // 3. Cast to u64 BEFORE shifting left to their final position in
483            //    'value'.
484            let extracted_bits = (*current_byte & mask) >> *current_bit;
485            value |= u64::from(extracted_bits) << (bits_per_int.get() - bits_remaining);
486
487            // Update accounting.
488            bits_remaining -= bits_to_read;
489            *current_bit += bits_to_read;
490
491            if *current_bit == 8 {
492                *current_bit = 0;
493                *data = data.split_at(1).1;
494            }
495
496            if bits_remaining == 0 {
497                return Some(value);
498            }
499        }
500    }
501}
502
503#[cfg(test)]
504mod tests {
505    use std::fmt::Debug;
506
507    use super::*;
508
509    use proptest::prop_assert_eq;
510    use proptest::strategy::Strategy;
511    use proptest::test_runner::TestCaseError;
512
513    use crate::experimental::clock::{Timed, Timestamp};
514    use crate::experimental::series::buffer::simple8b_rle::{
515        Simple8bRleBlock, Simple8bRleRingBuffer, simple8b_block_from_values,
516    };
517    use crate::experimental::series::interpolation::ConstantSample;
518    use crate::experimental::series::statistic::Last;
519    use crate::experimental::series::{
520        Capacity, SamplingProfile, TimeMatrix, TimeMatrixFold, TimeMatrixTick,
521    };
522
523    #[test]
524    fn decode_simple8b_rle_time_series() {
525        let profile = SamplingProfile::balanced();
526
527        let intervals = profile.clone().into_sampling_intervals();
528        let granular_profile = intervals.iter().min_by_key(|x| x.max_sampling_period()).unwrap();
529        let granularity = granular_profile.max_sampling_period();
530        let samples = match granular_profile.capacity() {
531            Capacity::MinSamples(n) => n.get(),
532        };
533
534        let start = Timestamp::from_nanos(0);
535        let mut time = start;
536
537        let mut series =
538            TimeMatrix::<Last<u64>, ConstantSample>::new_at(time, profile, Default::default());
539
540        let mut value = 1;
541        let mut values = Vec::new();
542        for _ in 0..samples {
543            series.fold(Timed::at(value, time)).expect("fold sample");
544            values.push(value);
545            time += granularity;
546            value += 1
547        }
548        let end = time;
549
550        let buffer = series.tick_and_get_buffers(end).expect("tick");
551        let decoder = Decoder::from_serialized_buffer(&buffer).expect("decode");
552        let Decoder { semantic, created_timestamp, end_timestamp, series_type, data: _ } = &decoder;
553        assert_eq!(semantic, &buffer.data_semantic);
554        assert_eq!(created_timestamp, &start);
555        assert_eq!(end_timestamp, &end);
556        assert_eq!(series_type, &SeriesType::Simple8bRle);
557
558        let duration = end - start;
559
560        // zip_eq panics if the iterators are not the same size.
561        let series = decoder.iter_series().zip_eq(intervals);
562        for (series, interval) in series {
563            let series = series.expect("error decoding series");
564            assert_eq!(series.granularity, interval.max_sampling_period());
565            // Now for checking the data.
566            let data_points =
567                series.data_points_64().collect::<Result<Vec<_>, _>>().expect("decode time series");
568            let expect_samples = usize::try_from(
569                duration.into_seconds() / interval.max_sampling_period().into_seconds(),
570            )
571            .unwrap();
572            assert_eq!(data_points.len(), expect_samples);
573        }
574
575        // Verify the data points from the first series.
576        let first_series = decoder
577            .iter_series()
578            .next()
579            .expect("has first series")
580            .expect("can decode first series");
581        let samples = first_series
582            .data_points_64()
583            .collect::<Result<Vec<_>, _>>()
584            .expect("decode time series");
585        assert_eq!(samples, values);
586    }
587
588    const TEST_MIN_SAMPLES: usize = 500;
589
590    fn decode_simple8b_rle_inner(samples: Vec<u64>) -> Result<(), TestCaseError> {
591        let mut buffer = Simple8bRleRingBuffer::with_min_samples(TEST_MIN_SAMPLES);
592        for i in samples.iter() {
593            let evicted = buffer.push(*i);
594            // Don't lose any data for the test.
595            prop_assert_eq!(evicted, vec![]);
596        }
597
598        let mut serialized = Vec::new();
599        buffer.serialize(&mut serialized).prop_context("serialize")?;
600        let mut serialized = &serialized[..];
601
602        let info = Simple8bRleInfo::decode(&mut serialized).prop_context("decode info")?;
603        let deserialized =
604            CompressedBlocksIterator::<'_, u64>::from_simple8b_rle_info(serialized, &info)
605                .data_points()
606                .collect::<Result<Vec<_>, _>>()
607                .prop_context("data points")?;
608        prop_assert_eq!(deserialized, samples);
609        Ok(())
610    }
611
612    fn u64_samples_strategy() -> impl Strategy<Value = Vec<u64>> {
613        (0..TEST_MIN_SAMPLES, 0..=u64::BITS).prop_flat_map(|(len, bits)| {
614            let max = if bits == u64::BITS { u64::MAX } else { (1 << bits) - 1 };
615            proptest::collection::vec(0..=max, len)
616        })
617    }
618
619    trait ResultExt<T> {
620        fn prop_context(self, context: &str) -> Result<T, TestCaseError>;
621    }
622
623    impl<T, E: Debug> ResultExt<T> for Result<T, E> {
624        fn prop_context(self, context: &str) -> Result<T, TestCaseError> {
625            self.map_err(|e| TestCaseError::fail(format!("{context}: {e:?}")))
626        }
627    }
628
629    fn bits_iterator_inner(
630        mut offset: u64,
631        simple8b_rle_selector: u8,
632    ) -> Result<(), TestCaseError> {
633        let bits = SIMPLE8B_SELECTOR_BIT_COUNTS[usize::from(simple8b_rle_selector)];
634        let mask = if bits == u64::BITS { u64::MAX } else { (1u64 << bits) - 1 };
635
636        let generator = std::iter::repeat_with(move || {
637            let nxt = offset;
638            offset += 1;
639            nxt & mask
640        });
641
642        let Simple8bRleBlock { selector: _, data } =
643            simple8b_block_from_values(simple8b_rle_selector, &mut generator.clone());
644        let num_values = u64::BITS / bits;
645
646        let expect = generator.take(num_values.try_into().unwrap()).collect::<Vec<_>>();
647        let data = data.to_le_bytes();
648
649        let values =
650            BitsIterator::new(&data[..], NonZeroU8::new(bits.try_into().unwrap()).unwrap())
651                .collect::<Vec<_>>();
652        prop_assert_eq!(values, expect);
653        Ok(())
654    }
655
656    proptest::proptest! {
657        #![proptest_config(proptest::test_runner::Config {
658            // Add all failed seeds here.
659            failure_persistence: proptest_support::failed_seeds!(
660                "cc b37d7d02c5bf8547fef78e979a7e64b86ed03eb9b77e2e16770bd96b1deed5d7"
661            ),
662            ..proptest::test_runner::Config::default()
663        })]
664
665        #[test]
666        fn decode_simple8b_rle(x in u64_samples_strategy()) {
667            decode_simple8b_rle_inner(x)?;
668        }
669
670        #[test]
671        fn bits_iterator((offset, selector) in (0u64..100, 0..(SIMPLE8B_SELECTOR_BIT_COUNTS.len()) )) {
672            bits_iterator_inner(offset, selector.try_into().unwrap())?;
673        }
674
675    }
676}