ringbuf/traits/
consumer.rs

1use super::{
2    observer::{DelegateObserver, Observer},
3    utils::modulus,
4};
5use crate::utils::{move_uninit_slice, slice_as_uninit_mut, slice_assume_init_mut, slice_assume_init_ref};
6use core::{iter::Chain, mem::MaybeUninit, ptr, slice};
7#[cfg(feature = "std")]
8use std::io::{self, Write};
9
10/// Consumer part of ring buffer.
11pub trait Consumer: Observer {
12    /// Set read index.
13    ///
14    /// # Safety
15    ///
16    /// Index must go only forward, never backward. It is recommended to use [`Self::advance_read_index`] instead.
17    ///
18    /// All slots with index less than `value` must be uninitialized until write index, all slots with index equal or greater - must be initialized.
19    unsafe fn set_read_index(&self, value: usize);
20
21    /// Moves `read` pointer by `count` places forward.
22    ///
23    /// # Safety
24    ///
25    /// First `count` items in occupied memory must be moved out or dropped.
26    ///
27    /// Must not be called concurrently.
28    unsafe fn advance_read_index(&self, count: usize) {
29        self.set_read_index((self.read_index() + count) % modulus(self));
30    }
31
32    /// Provides a direct access to the ring buffer occupied memory.
33    /// The difference from [`Self::as_slices`] is that this method provides slices of [`MaybeUninit`], so items may be moved out of slices.  
34    ///
35    /// Returns a pair of slices of stored items, the second one may be empty.
36    /// Elements with lower indices in slice are older. First slice contains older items that second one.
37    ///
38    /// # Safety
39    ///
40    /// All items are initialized. Elements must be removed starting from the beginning of first slice.
41    /// When all items are removed from the first slice then items must be removed from the beginning of the second slice.
42    ///
43    /// *This method must be followed by [`Self::advance_read_index`] call with the number of items being removed previously as argument.*
44    /// *No other mutating calls allowed before that.*
45    fn occupied_slices(&self) -> (&[MaybeUninit<Self::Item>], &[MaybeUninit<Self::Item>]) {
46        unsafe { self.unsafe_slices(self.read_index(), self.write_index()) }
47    }
48
49    /// Provides a direct mutable access to the ring buffer occupied memory.
50    ///
51    /// Same as [`Self::occupied_slices`].
52    ///
53    /// # Safety
54    ///
55    /// When some item is replaced with uninitialized value then it must not be read anymore.
56    unsafe fn occupied_slices_mut(&mut self) -> (&mut [MaybeUninit<Self::Item>], &mut [MaybeUninit<Self::Item>]) {
57        self.unsafe_slices_mut(self.read_index(), self.write_index())
58    }
59
60    /// Returns a pair of slices which contain, in order, the contents of the ring buffer.
61    #[inline]
62    fn as_slices(&self) -> (&[Self::Item], &[Self::Item]) {
63        unsafe {
64            let (left, right) = self.occupied_slices();
65            (slice_assume_init_ref(left), slice_assume_init_ref(right))
66        }
67    }
68
69    /// Returns a pair of mutable slices which contain, in order, the contents of the ring buffer.
70    #[inline]
71    fn as_mut_slices(&mut self) -> (&mut [Self::Item], &mut [Self::Item]) {
72        unsafe {
73            let (left, right) = self.occupied_slices_mut();
74            (slice_assume_init_mut(left), slice_assume_init_mut(right))
75        }
76    }
77
78    /// Returns a reference to the eldest item in the ring buffer, if exists.
79    #[inline]
80    fn first(&self) -> Option<&Self::Item> {
81        self.as_slices().0.first()
82    }
83    /// Returns a mutable reference to the eldest item in the ring buffer, if exists.
84    #[inline]
85    fn first_mut(&mut self) -> Option<&mut Self::Item> {
86        self.as_mut_slices().0.first_mut()
87    }
88    /// Returns a reference to the most recent item in the ring buffer, if exists.
89    ///
90    /// *Returned item may not be actually the most recent if there is a concurrent producer activity.*
91    fn last(&self) -> Option<&Self::Item> {
92        let (first, second) = self.as_slices();
93        if second.is_empty() {
94            first.last()
95        } else {
96            second.last()
97        }
98    }
99    /// Returns a mutable reference to the most recent item in the ring buffer, if exists.
100    ///
101    /// *Returned item may not be actually the most recent if there is a concurrent producer activity.*
102    fn last_mut(&mut self) -> Option<&mut Self::Item> {
103        let (first, second) = self.as_mut_slices();
104        if second.is_empty() {
105            first.last_mut()
106        } else {
107            second.last_mut()
108        }
109    }
110
111    /// Removes the eldest item from the ring buffer and returns it.
112    ///
113    /// Returns `None` if the ring buffer is empty.
114    fn try_pop(&mut self) -> Option<Self::Item> {
115        if !self.is_empty() {
116            let elem = unsafe { self.occupied_slices().0.get_unchecked(0).assume_init_read() };
117            unsafe { self.advance_read_index(1) };
118            Some(elem)
119        } else {
120            None
121        }
122    }
123
124    /// Returns the reference to the eldest item without removing it from the buffer.
125    ///
126    /// Returns `None` if the ring buffer is empty.
127    fn try_peek(&self) -> Option<&Self::Item> {
128        if !self.is_empty() {
129            Some(unsafe { self.occupied_slices().0.get_unchecked(0).assume_init_ref() })
130        } else {
131            None
132        }
133    }
134
135    /// Copies items from the ring buffer to an uninit slice without removing them from the ring buffer.
136    ///
137    /// Returns a number of items being copied.
138    fn peek_slice_uninit(&self, elems: &mut [MaybeUninit<Self::Item>]) -> usize {
139        let (left, right) = self.occupied_slices();
140        let count = if elems.len() < left.len() {
141            move_uninit_slice(elems, unsafe { left.get_unchecked(..elems.len()) });
142            elems.len()
143        } else {
144            let (left_elems, elems) = elems.split_at_mut(left.len());
145            move_uninit_slice(left_elems, left);
146            left.len()
147                + if elems.len() < right.len() {
148                    move_uninit_slice(elems, unsafe { right.get_unchecked(..elems.len()) });
149                    elems.len()
150                } else {
151                    move_uninit_slice(unsafe { elems.get_unchecked_mut(..right.len()) }, right);
152                    right.len()
153                }
154        };
155        count
156    }
157
158    /// Copies items from the ring buffer to a slice without removing them from the ring buffer.
159    ///
160    /// Returns a number of items being copied.
161    fn peek_slice(&self, elems: &mut [Self::Item]) -> usize
162    where
163        Self::Item: Copy,
164    {
165        self.peek_slice_uninit(unsafe { slice_as_uninit_mut(elems) })
166    }
167
168    /// Removes items from the ring buffer and writes them into an uninit slice.
169    ///
170    /// Returns count of items been removed.
171    fn pop_slice_uninit(&mut self, elems: &mut [MaybeUninit<Self::Item>]) -> usize {
172        let count = self.peek_slice_uninit(elems);
173        unsafe { self.advance_read_index(count) };
174        count
175    }
176
177    /// Removes items from the ring buffer and writes them into a slice.
178    ///
179    /// Returns count of items been removed.
180    fn pop_slice(&mut self, elems: &mut [Self::Item]) -> usize
181    where
182        Self::Item: Copy,
183    {
184        self.pop_slice_uninit(unsafe { slice_as_uninit_mut(elems) })
185    }
186
187    /// Returns an iterator that removes items one by one from the ring buffer.
188    fn pop_iter(&mut self) -> PopIter<Self> {
189        PopIter::new(self)
190    }
191
192    /// Returns a front-to-back iterator containing references to items in the ring buffer.
193    ///
194    /// This iterator does not remove items out of the ring buffer.
195    fn iter(&self) -> Iter<'_, Self> {
196        let (left, right) = self.as_slices();
197        left.iter().chain(right.iter())
198    }
199
200    /// Returns a front-to-back iterator that returns mutable references to items in the ring buffer.
201    ///
202    /// This iterator does not remove items out of the ring buffer.
203    fn iter_mut(&mut self) -> IterMut<'_, Self> {
204        let (left, right) = self.as_mut_slices();
205        left.iter_mut().chain(right.iter_mut())
206    }
207
208    /// Removes at most `count` and at least `min(count, Self::len())` items from the buffer and safely drops them.
209    ///
210    /// If there is no concurring producer activity then exactly `min(count, Self::len())` items are removed.
211    ///
212    /// Returns the number of deleted items.
213    ///
214    /// ```
215    /// # extern crate ringbuf;
216    /// # use ringbuf::{LocalRb, storage::Array, traits::*};
217    /// # fn main() {
218    /// let mut rb = LocalRb::<Array<i32, 8>>::default();
219    ///
220    /// assert_eq!(rb.push_iter(0..8), 8);
221    ///
222    /// assert_eq!(rb.skip(4), 4);
223    /// assert_eq!(rb.skip(8), 4);
224    /// assert_eq!(rb.skip(4), 0);
225    /// # }
226    /// ```
227    fn skip(&mut self, count: usize) -> usize {
228        unsafe {
229            let (left, right) = self.occupied_slices_mut();
230            for elem in left.iter_mut().chain(right.iter_mut()).take(count) {
231                ptr::drop_in_place(elem.as_mut_ptr());
232            }
233            let actual_count = usize::min(count, left.len() + right.len());
234            self.advance_read_index(actual_count);
235            actual_count
236        }
237    }
238
239    /// Removes all items from the buffer and safely drops them.
240    ///
241    /// Returns the number of deleted items.
242    fn clear(&mut self) -> usize {
243        unsafe {
244            let (left, right) = self.occupied_slices_mut();
245            for elem in left.iter_mut().chain(right.iter_mut()) {
246                ptr::drop_in_place(elem.as_mut_ptr());
247            }
248            let count = left.len() + right.len();
249            self.advance_read_index(count);
250            count
251        }
252    }
253
254    #[cfg(feature = "std")]
255    /// Removes at most first `count` bytes from the ring buffer and writes them into a [`Write`] instance.
256    /// If `count` is `None` then as much as possible bytes will be written.
257    ///
258    /// Returns:
259    ///
260    /// + `None`: ring buffer is full or `count` is `0`. In this case `write` isn't called at all.
261    /// + `Some(Ok(n))`: `write` succeeded. `n` is number of bytes been written. `n == 0` means that `write` also returned `0`.
262    /// + `Some(Err(e))`: `write` is failed and `e` is original error. In this case it is guaranteed that no items was written to the writer.
263    ///    To achieve this we write only one contiguous slice at once. So this call may write less than `occupied_len` items even if the writer is ready to get more.
264    fn write_into<S: Write>(&mut self, writer: &mut S, count: Option<usize>) -> Option<io::Result<usize>>
265    where
266        Self: Consumer<Item = u8>,
267    {
268        let (left, _) = self.occupied_slices();
269        let count = usize::min(count.unwrap_or(left.len()), left.len());
270        if count == 0 {
271            return None;
272        }
273        let left_init = unsafe { slice_assume_init_ref(&left[..count]) };
274
275        let write_count = match writer.write(left_init) {
276            Ok(n) => n,
277            Err(e) => return Some(Err(e)),
278        };
279        assert!(write_count <= count);
280        unsafe { self.advance_read_index(write_count) };
281        Some(Ok(write_count))
282    }
283}
284
285/// Owning ring buffer iterator.
286pub struct IntoIter<C: Consumer + ?Sized> {
287    inner: C,
288}
289
290impl<C: Consumer> IntoIter<C> {
291    pub fn new(inner: C) -> Self {
292        Self { inner }
293    }
294    pub fn into_inner(self) -> C {
295        self.inner
296    }
297}
298
299impl<C: Consumer> Iterator for IntoIter<C> {
300    type Item = C::Item;
301
302    #[inline]
303    fn next(&mut self) -> Option<Self::Item> {
304        self.inner.try_pop()
305    }
306    #[inline]
307    fn size_hint(&self) -> (usize, Option<usize>) {
308        (self.inner.occupied_len(), None)
309    }
310}
311
312/// An iterator that removes items from the ring buffer.
313///
314/// Producer will see removed items only when iterator is dropped or [`PopIter::commit`] is called.
315pub struct PopIter<'a, C: Consumer + ?Sized> {
316    inner: &'a C,
317    iter: Chain<slice::Iter<'a, MaybeUninit<C::Item>>, slice::Iter<'a, MaybeUninit<C::Item>>>,
318    count: usize,
319    len: usize,
320}
321
322impl<'a, C: Consumer + ?Sized> Drop for PopIter<'a, C> {
323    fn drop(&mut self) {
324        self.commit();
325    }
326}
327
328impl<'a, C: Consumer + ?Sized> PopIter<'a, C> {
329    /// Create an iterator.
330    pub fn new(inner: &'a mut C) -> Self {
331        let (len, iter) = {
332            let (left, right) = inner.occupied_slices();
333            (left.len() + right.len(), left.iter().chain(right))
334        };
335        Self {
336            inner,
337            iter,
338            count: 0,
339            len,
340        }
341    }
342
343    /// Send information about removed items to the ring buffer.
344    pub fn commit(&mut self) {
345        unsafe { self.inner.advance_read_index(self.count) };
346        self.count = 0;
347    }
348}
349
350impl<'a, C: Consumer> Iterator for PopIter<'a, C> {
351    type Item = C::Item;
352
353    #[inline]
354    fn next(&mut self) -> Option<Self::Item> {
355        self.iter.next().map(|item| {
356            self.count += 1;
357            unsafe { item.assume_init_read() }
358        })
359    }
360    #[inline]
361    fn size_hint(&self) -> (usize, Option<usize>) {
362        let remain = self.len - self.count;
363        (remain, Some(remain))
364    }
365}
366
367impl<'a, C: Consumer> ExactSizeIterator for PopIter<'a, C> {}
368
369/// Iterator over ring buffer contents.
370///
371/// *Please do not rely on actual type, it may change in future.*
372#[allow(type_alias_bounds)]
373pub type Iter<'a, C: Consumer> = Chain<slice::Iter<'a, C::Item>, slice::Iter<'a, C::Item>>;
374
375/// Mutable iterator over ring buffer contents.
376///
377/// *Please do not rely on actual type, it may change in future.*
378#[allow(type_alias_bounds)]
379pub type IterMut<'a, C: Consumer> = Chain<slice::IterMut<'a, C::Item>, slice::IterMut<'a, C::Item>>;
380
381/// Trait used for delegating producer methods.
382pub trait DelegateConsumer: DelegateObserver
383where
384    Self::Base: Consumer,
385{
386}
387impl<D: DelegateConsumer> Consumer for D
388where
389    D::Base: Consumer,
390{
391    #[inline]
392    unsafe fn set_read_index(&self, value: usize) {
393        self.base().set_read_index(value)
394    }
395    #[inline]
396    unsafe fn advance_read_index(&self, count: usize) {
397        self.base().advance_read_index(count)
398    }
399
400    #[inline]
401    fn occupied_slices(&self) -> (&[core::mem::MaybeUninit<Self::Item>], &[core::mem::MaybeUninit<Self::Item>]) {
402        self.base().occupied_slices()
403    }
404
405    #[inline]
406    unsafe fn occupied_slices_mut(&mut self) -> (&mut [core::mem::MaybeUninit<Self::Item>], &mut [core::mem::MaybeUninit<Self::Item>]) {
407        self.base_mut().occupied_slices_mut()
408    }
409
410    #[inline]
411    fn as_slices(&self) -> (&[Self::Item], &[Self::Item]) {
412        self.base().as_slices()
413    }
414
415    #[inline]
416    fn as_mut_slices(&mut self) -> (&mut [Self::Item], &mut [Self::Item]) {
417        self.base_mut().as_mut_slices()
418    }
419
420    #[inline]
421    fn try_pop(&mut self) -> Option<Self::Item> {
422        self.base_mut().try_pop()
423    }
424
425    #[inline]
426    fn pop_slice(&mut self, elems: &mut [Self::Item]) -> usize
427    where
428        Self::Item: Copy,
429    {
430        self.base_mut().pop_slice(elems)
431    }
432
433    #[inline]
434    fn iter(&self) -> Iter<'_, Self> {
435        self.base().iter()
436    }
437
438    #[inline]
439    fn iter_mut(&mut self) -> IterMut<'_, Self> {
440        self.base_mut().iter_mut()
441    }
442
443    #[inline]
444    fn skip(&mut self, count: usize) -> usize {
445        self.base_mut().skip(count)
446    }
447
448    #[inline]
449    fn clear(&mut self) -> usize {
450        self.base_mut().clear()
451    }
452}
453
454macro_rules! impl_consumer_traits {
455    ($type:ident $(< $( $param:tt $( : $first_bound:tt $(+ $next_bound:tt )* )? ),+ >)?) => {
456        impl $(< $( $param $( : $first_bound $(+ $next_bound )* )? ),+ >)? core::iter::IntoIterator for $type $(< $( $param ),+ >)? where Self: Sized {
457            type Item = <Self as $crate::traits::Observer>::Item;
458            type IntoIter = $crate::traits::consumer::IntoIter<Self>;
459            fn into_iter(self) -> Self::IntoIter {
460                $crate::traits::consumer::IntoIter::new(self)
461            }
462        }
463
464        #[cfg(feature = "std")]
465        impl $(< $( $param $( : $first_bound $(+ $next_bound )* )? ),+ >)? std::io::Read for $type $(< $( $param ),+ >)?
466        where
467            Self: $crate::traits::Consumer<Item = u8>,
468        {
469            fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
470                let n = self.pop_slice(buf);
471                if n == 0 {
472                    Err(std::io::ErrorKind::WouldBlock.into())
473                } else {
474                    Ok(n)
475                }
476            }
477        }
478    };
479}
480pub(crate) use impl_consumer_traits;