async_ringbuf/traits/
consumer.rs

1use core::{
2    future::Future,
3    pin::Pin,
4    task::{Context, Poll, Waker},
5};
6use futures::future::FusedFuture;
7use ringbuf::traits::Consumer;
8#[cfg(feature = "std")]
9use std::io;
10
11pub trait AsyncConsumer: Consumer {
12    fn register_waker(&self, waker: &Waker);
13
14    fn close(&mut self);
15    /// Whether the corresponding producer was closed.
16    fn is_closed(&self) -> bool {
17        !self.write_is_held()
18    }
19
20    /// Pop item from the ring buffer waiting asynchronously if the buffer is empty.
21    ///
22    /// Future returns:
23    /// + `Some(item)` - an item is taken.
24    /// + `None` - the buffer is empty and the corresponding producer was dropped.
25    fn pop(&mut self) -> PopFuture<'_, Self> {
26        PopFuture { owner: self, done: false }
27    }
28
29    /// Wait for the buffer to contain at least `count` items or to close.
30    ///
31    /// In debug mode panics if `count` is greater than buffer capacity.
32    ///
33    /// The method takes `&mut self` because only single [`WaitOccupiedFuture`] is allowed at a time.
34    fn wait_occupied(&mut self, count: usize) -> WaitOccupiedFuture<'_, Self> {
35        debug_assert!(count <= self.capacity().get());
36        WaitOccupiedFuture {
37            owner: self,
38            count,
39            done: false,
40        }
41    }
42
43    /// Pop item from the ring buffer waiting asynchronously if the buffer is empty.
44    ///
45    /// Future returns:
46    /// + `Ok` - the whole slice is filled with the items from the buffer.
47    /// + `Err(count)` - the buffer is empty and the corresponding producer was dropped, number of items copied to slice is returned.
48    fn pop_exact<'a: 'b, 'b>(&'a mut self, slice: &'b mut [Self::Item]) -> PopSliceFuture<'a, 'b, Self>
49    where
50        Self::Item: Copy,
51    {
52        PopSliceFuture {
53            owner: self,
54            slice: Some(slice),
55            count: 0,
56        }
57    }
58
59    #[cfg(feature = "alloc")]
60    fn pop_until_end<'a: 'b, 'b>(&'a mut self, vec: &'b mut alloc::vec::Vec<Self::Item>) -> PopVecFuture<'a, 'b, Self> {
61        PopVecFuture {
62            owner: self,
63            vec: Some(vec),
64        }
65    }
66
67    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
68    where
69        Self: Unpin,
70    {
71        let mut waker_registered = false;
72        loop {
73            let closed = self.is_closed();
74            if let Some(item) = self.try_pop() {
75                break Poll::Ready(Some(item));
76            }
77            if closed {
78                break Poll::Ready(None);
79            }
80            if waker_registered {
81                break Poll::Pending;
82            }
83            self.register_waker(cx.waker());
84            waker_registered = true;
85        }
86    }
87
88    #[cfg(feature = "std")]
89    fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>>
90    where
91        Self: AsyncConsumer<Item = u8> + Unpin,
92    {
93        let mut waker_registered = false;
94        loop {
95            let closed = self.is_closed();
96            let len = self.pop_slice(buf);
97            if len != 0 || closed {
98                break Poll::Ready(Ok(len));
99            }
100            if waker_registered {
101                break Poll::Pending;
102            }
103            self.register_waker(cx.waker());
104            waker_registered = true;
105        }
106    }
107}
108
109pub struct PopFuture<'a, A: AsyncConsumer + ?Sized> {
110    owner: &'a mut A,
111    done: bool,
112}
113impl<'a, A: AsyncConsumer> Unpin for PopFuture<'a, A> {}
114impl<'a, A: AsyncConsumer> FusedFuture for PopFuture<'a, A> {
115    fn is_terminated(&self) -> bool {
116        self.done || self.owner.is_closed()
117    }
118}
119impl<'a, A: AsyncConsumer> Future for PopFuture<'a, A> {
120    type Output = Option<A::Item>;
121
122    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
123        let mut waker_registered = false;
124        loop {
125            assert!(!self.done);
126            let closed = self.owner.is_closed();
127            if let Some(item) = self.owner.try_pop() {
128                self.done = true;
129                break Poll::Ready(Some(item));
130            }
131            if closed {
132                break Poll::Ready(None);
133            }
134            if waker_registered {
135                break Poll::Pending;
136            }
137            self.owner.register_waker(cx.waker());
138            waker_registered = true;
139        }
140    }
141}
142
143pub struct PopSliceFuture<'a, 'b, A: AsyncConsumer + ?Sized>
144where
145    A::Item: Copy,
146{
147    owner: &'a mut A,
148    slice: Option<&'b mut [A::Item]>,
149    count: usize,
150}
151impl<'a, 'b, A: AsyncConsumer> Unpin for PopSliceFuture<'a, 'b, A> where A::Item: Copy {}
152impl<'a, 'b, A: AsyncConsumer> FusedFuture for PopSliceFuture<'a, 'b, A>
153where
154    A::Item: Copy,
155{
156    fn is_terminated(&self) -> bool {
157        self.slice.is_none()
158    }
159}
160impl<'a, 'b, A: AsyncConsumer> Future for PopSliceFuture<'a, 'b, A>
161where
162    A::Item: Copy,
163{
164    type Output = Result<(), usize>;
165
166    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
167        let mut waker_registered = false;
168        loop {
169            let closed = self.owner.is_closed();
170            let mut slice = self.slice.take().unwrap();
171            let len = self.owner.pop_slice(slice);
172            slice = &mut slice[len..];
173            self.count += len;
174            if slice.is_empty() {
175                break Poll::Ready(Ok(()));
176            }
177            if closed {
178                break Poll::Ready(Err(self.count));
179            }
180            self.slice.replace(slice);
181            if waker_registered {
182                break Poll::Pending;
183            }
184            self.owner.register_waker(cx.waker());
185            waker_registered = true;
186        }
187    }
188}
189
190#[cfg(feature = "alloc")]
191pub struct PopVecFuture<'a, 'b, A: AsyncConsumer + ?Sized> {
192    owner: &'a mut A,
193    vec: Option<&'b mut alloc::vec::Vec<A::Item>>,
194}
195#[cfg(feature = "alloc")]
196impl<'a, 'b, A: AsyncConsumer> Unpin for PopVecFuture<'a, 'b, A> {}
197#[cfg(feature = "alloc")]
198impl<'a, 'b, A: AsyncConsumer> FusedFuture for PopVecFuture<'a, 'b, A> {
199    fn is_terminated(&self) -> bool {
200        self.vec.is_none()
201    }
202}
203#[cfg(feature = "alloc")]
204impl<'a, 'b, A: AsyncConsumer> Future for PopVecFuture<'a, 'b, A> {
205    type Output = ();
206
207    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
208        let mut waker_registered = false;
209        loop {
210            let closed = self.owner.is_closed();
211            let vec = self.vec.take().unwrap();
212
213            loop {
214                if vec.len() == vec.capacity() {
215                    vec.reserve(vec.capacity().max(16));
216                }
217                let n = self.owner.pop_slice_uninit(vec.spare_capacity_mut());
218                if n == 0 {
219                    break;
220                }
221                unsafe { vec.set_len(vec.len() + n) };
222            }
223
224            if closed {
225                break Poll::Ready(());
226            }
227            self.vec.replace(vec);
228            if waker_registered {
229                break Poll::Pending;
230            }
231            self.owner.register_waker(cx.waker());
232            waker_registered = true;
233        }
234    }
235}
236
237pub struct WaitOccupiedFuture<'a, A: AsyncConsumer + ?Sized> {
238    owner: &'a A,
239    count: usize,
240    done: bool,
241}
242impl<'a, A: AsyncConsumer> Unpin for WaitOccupiedFuture<'a, A> {}
243impl<'a, A: AsyncConsumer> FusedFuture for WaitOccupiedFuture<'a, A> {
244    fn is_terminated(&self) -> bool {
245        self.done
246    }
247}
248impl<'a, A: AsyncConsumer> Future for WaitOccupiedFuture<'a, A> {
249    type Output = ();
250
251    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
252        let mut waker_registered = false;
253        loop {
254            assert!(!self.done);
255            let closed = self.owner.is_closed();
256            if self.count <= self.owner.occupied_len() || closed {
257                break Poll::Ready(());
258            }
259            if waker_registered {
260                break Poll::Pending;
261            }
262            self.owner.register_waker(cx.waker());
263            waker_registered = true;
264        }
265    }
266}