1use super::{
2 observer::{DelegateObserver, Observer},
3 utils::modulus,
4};
5#[cfg(feature = "std")]
6use crate::utils::slice_assume_init_mut;
7use crate::utils::write_slice;
8use core::mem::MaybeUninit;
9#[cfg(feature = "std")]
10use std::{
11 cmp,
12 io::{self, Read},
13};
1415/// Producer part of ring buffer.
16pub trait Producer: Observer {
17/// Set read index.
18 ///
19 /// # Safety
20 ///
21 /// Index must go only forward, never backward. It is recommended to use [`Self::advance_write_index`] instead.
22 ///
23 /// All slots with index less than `value` must be initialized until write index, all slots with index equal or greater - must be uninitialized.
24unsafe fn set_write_index(&self, value: usize);
2526/// Moves `write` pointer by `count` places forward.
27 ///
28 /// # Safety
29 ///
30 /// First `count` items in free space must be initialized.
31 ///
32 /// Must not be called concurrently.
33unsafe fn advance_write_index(&self, count: usize) {
34self.set_write_index((self.write_index() + count) % modulus(self));
35 }
3637/// Provides a direct access to the ring buffer vacant memory.
38 ///
39 /// Returns a pair of slices of uninitialized memory, the second one may be empty.
40fn vacant_slices(&self) -> (&[MaybeUninit<Self::Item>], &[MaybeUninit<Self::Item>]) {
41unsafe { self.unsafe_slices(self.write_index(), self.read_index() + self.capacity().get()) }
42 }
4344/// Mutable version of [`Self::vacant_slices`].
45 ///
46 /// Vacant memory is uninitialized. Initialized items must be put starting from the beginning of first slice.
47 /// When first slice is fully filled then items must be put to the beginning of the second slice.
48 ///
49 /// *This method must be followed by [`Self::advance_write_index`] call with the number of items being put previously as argument.*
50 /// *No other mutating calls allowed before that.*
51 ///
52 /// *Vacant slices must not be used to store any data because their contents aren't synchronized properly.*
53fn vacant_slices_mut(&mut self) -> (&mut [MaybeUninit<Self::Item>], &mut [MaybeUninit<Self::Item>]) {
54unsafe { self.unsafe_slices_mut(self.write_index(), self.read_index() + self.capacity().get()) }
55 }
5657/// Appends an item to the ring buffer.
58 ///
59 /// If buffer is full returns an `Err` containing the item that hasn't been appended.
60fn try_push(&mut self, elem: Self::Item) -> Result<(), Self::Item> {
61if !self.is_full() {
62unsafe {
63self.vacant_slices_mut().0.get_unchecked_mut(0).write(elem);
64self.advance_write_index(1)
65 };
66Ok(())
67 } else {
68Err(elem)
69 }
70 }
7172/// Appends items from an iterator to the ring buffer.
73 /// Elements that haven't been added to the ring buffer remain in the iterator.
74 ///
75 /// Returns count of items been appended to the ring buffer.
76 ///
77 /// *Inserted items are committed to the ring buffer all at once in the end,*
78 /// *e.g. when buffer is full or iterator has ended.*
79fn push_iter<I: Iterator<Item = Self::Item>>(&mut self, mut iter: I) -> usize {
80let (left, right) = self.vacant_slices_mut();
81let mut count = 0;
82for place in left.iter_mut().chain(right.iter_mut()) {
83match iter.next() {
84Some(elem) => unsafe { place.as_mut_ptr().write(elem) },
85None => break,
86 }
87 count += 1;
88 }
89unsafe { self.advance_write_index(count) };
90 count
91 }
9293/// Appends items from slice to the ring buffer.
94 ///
95 /// Returns count of items been appended to the ring buffer.
96fn push_slice(&mut self, elems: &[Self::Item]) -> usize
97where
98Self::Item: Copy,
99 {
100let (left, right) = self.vacant_slices_mut();
101let count = if elems.len() < left.len() {
102 write_slice(&mut left[..elems.len()], elems);
103 elems.len()
104 } else {
105let (left_elems, elems) = elems.split_at(left.len());
106 write_slice(left, left_elems);
107 left.len()
108 + if elems.len() < right.len() {
109 write_slice(&mut right[..elems.len()], elems);
110 elems.len()
111 } else {
112 write_slice(right, &elems[..right.len()]);
113 right.len()
114 }
115 };
116unsafe { self.advance_write_index(count) };
117 count
118 }
119120#[cfg(feature = "std")]
121/// Reads at most `count` bytes from `Read` instance and appends them to the ring buffer.
122 /// If `count` is `None` then as much as possible bytes will be read.
123 ///
124 /// Returns:
125 /// + `None`: ring buffer is empty or `count` is `0`. In this case `read` isn't called at all.
126 /// + `Some(Ok(n))`: `read` succeeded. `n` is number of bytes been read. `n == 0` means that `read` also returned `0`.
127 /// + `Some(Err(e))` `read` is failed and `e` is original error. In this case it is guaranteed that no items was read from the reader.
128 /// To achieve this we read only one contiguous slice at once. So this call may read less than `vacant_len` items in the buffer even if the reader is ready to provide more.
129fn read_from<S: Read>(&mut self, reader: &mut S, count: Option<usize>) -> Option<io::Result<usize>>
130where
131Self: Producer<Item = u8>,
132 {
133let (left, _) = self.vacant_slices_mut();
134let count = cmp::min(count.unwrap_or(left.len()), left.len());
135if count == 0 {
136return None;
137 }
138139let buf = &mut left[..count];
140// Initialize memory before read. It's an overhead but there's no way to read to uninit buffer in stable Rust yet.
141 // TODO: Use `reader.read_buf` when it stabilized (see https://github.com/rust-lang/rust/issues/78485).
142buf.fill(MaybeUninit::new(0));
143let left_init = unsafe { slice_assume_init_mut(buf) };
144145let read_count = match reader.read(left_init) {
146Ok(n) => n,
147Err(e) => return Some(Err(e)),
148 };
149assert!(read_count <= count);
150unsafe { self.advance_write_index(read_count) };
151Some(Ok(read_count))
152 }
153}
154155/// Trait used for delegating consumer methods.
156pub trait DelegateProducer: DelegateObserver
157where
158Self::Base: Producer,
159{
160}
161162impl<D: DelegateProducer> Producer for D
163where
164D::Base: Producer,
165{
166#[inline]
167unsafe fn set_write_index(&self, value: usize) {
168self.base().set_write_index(value)
169 }
170#[inline]
171unsafe fn advance_write_index(&self, count: usize) {
172self.base().advance_write_index(count)
173 }
174175#[inline]
176fn vacant_slices(&self) -> (&[core::mem::MaybeUninit<Self::Item>], &[core::mem::MaybeUninit<Self::Item>]) {
177self.base().vacant_slices()
178 }
179180#[inline]
181fn vacant_slices_mut(&mut self) -> (&mut [core::mem::MaybeUninit<Self::Item>], &mut [core::mem::MaybeUninit<Self::Item>]) {
182self.base_mut().vacant_slices_mut()
183 }
184185#[inline]
186fn try_push(&mut self, elem: Self::Item) -> Result<(), Self::Item> {
187self.base_mut().try_push(elem)
188 }
189190#[inline]
191fn push_iter<I: Iterator<Item = Self::Item>>(&mut self, iter: I) -> usize {
192self.base_mut().push_iter(iter)
193 }
194195#[inline]
196fn push_slice(&mut self, elems: &[Self::Item]) -> usize
197where
198Self::Item: Copy,
199 {
200self.base_mut().push_slice(elems)
201 }
202}
203204macro_rules! impl_producer_traits {
205 ($type:ident $(< $( $param:tt $( : $first_bound:tt $(+ $next_bound:tt )* )? ),+ >)?) => {
206207#[cfg(feature = "std")]
208impl $(< $( $param $( : $first_bound $(+ $next_bound )* )? ),+ >)? std::io::Write for $type $(< $( $param ),+ >)?
209where
210Self: $crate::traits::Producer<Item = u8>,
211 {
212fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
213let n = self.push_slice(buf);
214if n == 0 {
215Err(std::io::ErrorKind::WouldBlock.into())
216 } else {
217Ok(n)
218 }
219 }
220fn flush(&mut self) -> std::io::Result<()> {
221Ok(())
222 }
223 }
224225impl $(< $( $param $( : $first_bound $(+ $next_bound )* )? ),+ >)? core::fmt::Write for $type $(< $( $param ),+ >)?
226where
227Self: $crate::traits::Producer<Item = u8>,
228 {
229fn write_str(&mut self, s: &str) -> core::fmt::Result {
230let n = self.push_slice(s.as_bytes());
231if n != s.len() {
232Err(core::fmt::Error::default())
233 } else {
234Ok(())
235 }
236 }
237 }
238 };
239 }
240pub(crate) use impl_producer_traits;