ringbuf/traits/
producer.rs

1use super::{
2    observer::{DelegateObserver, Observer},
3    utils::modulus,
4};
5#[cfg(feature = "std")]
6use crate::utils::slice_assume_init_mut;
7use crate::utils::write_slice;
8use core::mem::MaybeUninit;
9#[cfg(feature = "std")]
10use std::{
11    cmp,
12    io::{self, Read},
13};
14
15/// Producer part of ring buffer.
16pub trait Producer: Observer {
17    /// Set read index.
18    ///
19    /// # Safety
20    ///
21    /// Index must go only forward, never backward. It is recommended to use [`Self::advance_write_index`] instead.
22    ///
23    /// All slots with index less than `value` must be initialized until write index, all slots with index equal or greater - must be uninitialized.
24    unsafe fn set_write_index(&self, value: usize);
25
26    /// Moves `write` pointer by `count` places forward.
27    ///
28    /// # Safety
29    ///
30    /// First `count` items in free space must be initialized.
31    ///
32    /// Must not be called concurrently.
33    unsafe fn advance_write_index(&self, count: usize) {
34        self.set_write_index((self.write_index() + count) % modulus(self));
35    }
36
37    /// Provides a direct access to the ring buffer vacant memory.
38    ///
39    /// Returns a pair of slices of uninitialized memory, the second one may be empty.
40    fn vacant_slices(&self) -> (&[MaybeUninit<Self::Item>], &[MaybeUninit<Self::Item>]) {
41        unsafe { self.unsafe_slices(self.write_index(), self.read_index() + self.capacity().get()) }
42    }
43
44    /// Mutable version of [`Self::vacant_slices`].
45    ///
46    /// Vacant memory is uninitialized. Initialized items must be put starting from the beginning of first slice.
47    /// When first slice is fully filled then items must be put to the beginning of the second slice.
48    ///
49    /// *This method must be followed by [`Self::advance_write_index`] call with the number of items being put previously as argument.*
50    /// *No other mutating calls allowed before that.*
51    ///
52    /// *Vacant slices must not be used to store any data because their contents aren't synchronized properly.*
53    fn vacant_slices_mut(&mut self) -> (&mut [MaybeUninit<Self::Item>], &mut [MaybeUninit<Self::Item>]) {
54        unsafe { self.unsafe_slices_mut(self.write_index(), self.read_index() + self.capacity().get()) }
55    }
56
57    /// Appends an item to the ring buffer.
58    ///
59    /// If buffer is full returns an `Err` containing the item that hasn't been appended.
60    fn try_push(&mut self, elem: Self::Item) -> Result<(), Self::Item> {
61        if !self.is_full() {
62            unsafe {
63                self.vacant_slices_mut().0.get_unchecked_mut(0).write(elem);
64                self.advance_write_index(1)
65            };
66            Ok(())
67        } else {
68            Err(elem)
69        }
70    }
71
72    /// Appends items from an iterator to the ring buffer.
73    /// Elements that haven't been added to the ring buffer remain in the iterator.
74    ///
75    /// Returns count of items been appended to the ring buffer.
76    ///
77    /// *Inserted items are committed to the ring buffer all at once in the end,*
78    /// *e.g. when buffer is full or iterator has ended.*
79    fn push_iter<I: Iterator<Item = Self::Item>>(&mut self, mut iter: I) -> usize {
80        let (left, right) = self.vacant_slices_mut();
81        let mut count = 0;
82        for place in left.iter_mut().chain(right.iter_mut()) {
83            match iter.next() {
84                Some(elem) => unsafe { place.as_mut_ptr().write(elem) },
85                None => break,
86            }
87            count += 1;
88        }
89        unsafe { self.advance_write_index(count) };
90        count
91    }
92
93    /// Appends items from slice to the ring buffer.
94    ///
95    /// Returns count of items been appended to the ring buffer.
96    fn push_slice(&mut self, elems: &[Self::Item]) -> usize
97    where
98        Self::Item: Copy,
99    {
100        let (left, right) = self.vacant_slices_mut();
101        let count = if elems.len() < left.len() {
102            write_slice(&mut left[..elems.len()], elems);
103            elems.len()
104        } else {
105            let (left_elems, elems) = elems.split_at(left.len());
106            write_slice(left, left_elems);
107            left.len()
108                + if elems.len() < right.len() {
109                    write_slice(&mut right[..elems.len()], elems);
110                    elems.len()
111                } else {
112                    write_slice(right, &elems[..right.len()]);
113                    right.len()
114                }
115        };
116        unsafe { self.advance_write_index(count) };
117        count
118    }
119
120    #[cfg(feature = "std")]
121    /// Reads at most `count` bytes from `Read` instance and appends them to the ring buffer.
122    /// If `count` is `None` then as much as possible bytes will be read.
123    ///
124    /// Returns:
125    /// + `None`: ring buffer is empty or `count` is `0`. In this case `read` isn't called at all.
126    /// + `Some(Ok(n))`: `read` succeeded. `n` is number of bytes been read. `n == 0` means that `read` also returned `0`.
127    /// + `Some(Err(e))` `read` is failed and `e` is original error. In this case it is guaranteed that no items was read from the reader.
128    ///   To achieve this we read only one contiguous slice at once. So this call may read less than `vacant_len` items in the buffer even if the reader is ready to provide more.
129    fn read_from<S: Read>(&mut self, reader: &mut S, count: Option<usize>) -> Option<io::Result<usize>>
130    where
131        Self: Producer<Item = u8>,
132    {
133        let (left, _) = self.vacant_slices_mut();
134        let count = cmp::min(count.unwrap_or(left.len()), left.len());
135        if count == 0 {
136            return None;
137        }
138
139        let buf = &mut left[..count];
140        // Initialize memory before read. It's an overhead but there's no way to read to uninit buffer in stable Rust yet.
141        // TODO: Use `reader.read_buf` when it stabilized (see https://github.com/rust-lang/rust/issues/78485).
142        buf.fill(MaybeUninit::new(0));
143        let left_init = unsafe { slice_assume_init_mut(buf) };
144
145        let read_count = match reader.read(left_init) {
146            Ok(n) => n,
147            Err(e) => return Some(Err(e)),
148        };
149        assert!(read_count <= count);
150        unsafe { self.advance_write_index(read_count) };
151        Some(Ok(read_count))
152    }
153}
154
155/// Trait used for delegating consumer methods.
156pub trait DelegateProducer: DelegateObserver
157where
158    Self::Base: Producer,
159{
160}
161
162impl<D: DelegateProducer> Producer for D
163where
164    D::Base: Producer,
165{
166    #[inline]
167    unsafe fn set_write_index(&self, value: usize) {
168        self.base().set_write_index(value)
169    }
170    #[inline]
171    unsafe fn advance_write_index(&self, count: usize) {
172        self.base().advance_write_index(count)
173    }
174
175    #[inline]
176    fn vacant_slices(&self) -> (&[core::mem::MaybeUninit<Self::Item>], &[core::mem::MaybeUninit<Self::Item>]) {
177        self.base().vacant_slices()
178    }
179
180    #[inline]
181    fn vacant_slices_mut(&mut self) -> (&mut [core::mem::MaybeUninit<Self::Item>], &mut [core::mem::MaybeUninit<Self::Item>]) {
182        self.base_mut().vacant_slices_mut()
183    }
184
185    #[inline]
186    fn try_push(&mut self, elem: Self::Item) -> Result<(), Self::Item> {
187        self.base_mut().try_push(elem)
188    }
189
190    #[inline]
191    fn push_iter<I: Iterator<Item = Self::Item>>(&mut self, iter: I) -> usize {
192        self.base_mut().push_iter(iter)
193    }
194
195    #[inline]
196    fn push_slice(&mut self, elems: &[Self::Item]) -> usize
197    where
198        Self::Item: Copy,
199    {
200        self.base_mut().push_slice(elems)
201    }
202}
203
204macro_rules! impl_producer_traits {
205    ($type:ident $(< $( $param:tt $( : $first_bound:tt $(+ $next_bound:tt )* )? ),+ >)?) => {
206
207        #[cfg(feature = "std")]
208        impl $(< $( $param $( : $first_bound $(+ $next_bound )* )? ),+ >)? std::io::Write for $type $(< $( $param ),+ >)?
209        where
210            Self: $crate::traits::Producer<Item = u8>,
211        {
212            fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
213                let n = self.push_slice(buf);
214                if n == 0 {
215                    Err(std::io::ErrorKind::WouldBlock.into())
216                } else {
217                    Ok(n)
218                }
219            }
220            fn flush(&mut self) -> std::io::Result<()> {
221                Ok(())
222            }
223         }
224
225        impl $(< $( $param $( : $first_bound $(+ $next_bound )* )? ),+ >)? core::fmt::Write for $type $(< $( $param ),+ >)?
226        where
227            Self: $crate::traits::Producer<Item = u8>,
228        {
229            fn write_str(&mut self, s: &str) -> core::fmt::Result {
230                let n = self.push_slice(s.as_bytes());
231                if n != s.len() {
232                    Err(core::fmt::Error::default())
233                } else {
234                    Ok(())
235                }
236            }
237         }
238    };
239 }
240pub(crate) use impl_producer_traits;