async_ringbuf/traits/
producer.rs

1use core::{
2    future::Future,
3    iter::Peekable,
4    pin::Pin,
5    task::{Context, Poll, Waker},
6};
7use futures::future::FusedFuture;
8use ringbuf::traits::Producer;
9#[cfg(feature = "std")]
10use std::io;
11
12pub trait AsyncProducer: Producer {
13    fn register_waker(&self, waker: &Waker);
14
15    fn close(&mut self);
16    /// Whether the corresponding consumer was closed.
17    fn is_closed(&self) -> bool {
18        !self.read_is_held()
19    }
20
21    /// Push item to the ring buffer waiting asynchronously if the buffer is full.
22    ///
23    /// Future returns:
24    /// + `Ok` - item successfully pushed.
25    /// + `Err(item)` - the corresponding consumer was dropped, item is returned back.
26    fn push(&mut self, item: Self::Item) -> PushFuture<'_, Self> {
27        PushFuture {
28            owner: self,
29            item: Some(item),
30        }
31    }
32
33    /// Push items from iterator waiting asynchronously if the buffer is full.
34    ///
35    /// Future returns:
36    /// + `Ok` - iterator ended.
37    /// + `Err(iter)` - the corresponding consumer was dropped, remaining iterator is returned back.
38    fn push_iter_all<I: Iterator<Item = Self::Item>>(&mut self, iter: I) -> PushIterFuture<'_, Self, I> {
39        PushIterFuture {
40            owner: self,
41            iter: Some(iter.peekable()),
42        }
43    }
44
45    /// Wait for the buffer to have at least `count` free places for items or to close.
46    ///
47    /// In debug mode panics if `count` is greater than buffer capacity.
48    ///
49    /// The method takes `&mut self` because only single [`WaitVacantFuture`] is allowed at a time.
50    fn wait_vacant(&mut self, count: usize) -> WaitVacantFuture<'_, Self> {
51        debug_assert!(count <= self.capacity().get());
52        WaitVacantFuture {
53            owner: self,
54            count,
55            done: false,
56        }
57    }
58
59    /// Copy slice contents to the buffer waiting asynchronously if the buffer is full.
60    ///
61    /// Future returns:
62    /// + `Ok` - all slice contents are copied.
63    /// + `Err(count)` - the corresponding consumer was dropped, number of copied items returned.
64    fn push_exact<'a: 'b, 'b>(&'a mut self, slice: &'b [Self::Item]) -> PushSliceFuture<'a, 'b, Self>
65    where
66        Self::Item: Copy,
67    {
68        PushSliceFuture {
69            owner: self,
70            slice: Some(slice),
71            count: 0,
72        }
73    }
74
75    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<bool> {
76        let mut waker_registered = false;
77        loop {
78            if self.is_closed() {
79                break Poll::Ready(false);
80            }
81            if !self.is_full() {
82                break Poll::Ready(true);
83            }
84            if waker_registered {
85                break Poll::Pending;
86            }
87            self.register_waker(cx.waker());
88            waker_registered = true;
89        }
90    }
91
92    #[cfg(feature = "std")]
93    fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>>
94    where
95        Self: AsyncProducer<Item = u8> + Unpin,
96    {
97        let mut waker_registered = false;
98        loop {
99            if self.is_closed() {
100                break Poll::Ready(Ok(0));
101            }
102            let count = self.push_slice(buf);
103            if count > 0 {
104                break Poll::Ready(Ok(count));
105            }
106            if waker_registered {
107                break Poll::Pending;
108            }
109            self.register_waker(cx.waker());
110            waker_registered = true;
111        }
112    }
113}
114
115pub struct PushFuture<'a, A: AsyncProducer + ?Sized> {
116    owner: &'a mut A,
117    item: Option<A::Item>,
118}
119impl<'a, A: AsyncProducer> Unpin for PushFuture<'a, A> {}
120impl<'a, A: AsyncProducer> FusedFuture for PushFuture<'a, A> {
121    fn is_terminated(&self) -> bool {
122        self.item.is_none()
123    }
124}
125impl<'a, A: AsyncProducer> Future for PushFuture<'a, A> {
126    type Output = Result<(), A::Item>;
127
128    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
129        let mut waker_registered = false;
130        loop {
131            let item = self.item.take().unwrap();
132            if self.owner.is_closed() {
133                break Poll::Ready(Err(item));
134            }
135            let push_result = self.owner.try_push(item);
136            if push_result.is_ok() {
137                break Poll::Ready(Ok(()));
138            }
139            self.item.replace(push_result.unwrap_err());
140            if waker_registered {
141                break Poll::Pending;
142            }
143            self.owner.register_waker(cx.waker());
144            waker_registered = true;
145        }
146    }
147}
148
149pub struct PushSliceFuture<'a, 'b, A: AsyncProducer + ?Sized>
150where
151    A::Item: Copy,
152{
153    owner: &'a mut A,
154    slice: Option<&'b [A::Item]>,
155    count: usize,
156}
157impl<'a, 'b, A: AsyncProducer> Unpin for PushSliceFuture<'a, 'b, A> where A::Item: Copy {}
158impl<'a, 'b, A: AsyncProducer> FusedFuture for PushSliceFuture<'a, 'b, A>
159where
160    A::Item: Copy,
161{
162    fn is_terminated(&self) -> bool {
163        self.slice.is_none()
164    }
165}
166impl<'a, 'b, A: AsyncProducer> Future for PushSliceFuture<'a, 'b, A>
167where
168    A::Item: Copy,
169{
170    type Output = Result<(), usize>;
171
172    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
173        let mut waker_registered = false;
174        loop {
175            let mut slice = self.slice.take().unwrap();
176            if self.owner.is_closed() {
177                break Poll::Ready(Err(self.count));
178            }
179            let len = self.owner.push_slice(slice);
180            slice = &slice[len..];
181            self.count += len;
182            if slice.is_empty() {
183                break Poll::Ready(Ok(()));
184            }
185            self.slice.replace(slice);
186            if waker_registered {
187                break Poll::Pending;
188            }
189            self.owner.register_waker(cx.waker());
190            waker_registered = true;
191        }
192    }
193}
194
195pub struct PushIterFuture<'a, A: AsyncProducer + ?Sized, I: Iterator<Item = A::Item>> {
196    owner: &'a mut A,
197    iter: Option<Peekable<I>>,
198}
199impl<'a, A: AsyncProducer, I: Iterator<Item = A::Item>> Unpin for PushIterFuture<'a, A, I> {}
200impl<'a, A: AsyncProducer, I: Iterator<Item = A::Item>> FusedFuture for PushIterFuture<'a, A, I> {
201    fn is_terminated(&self) -> bool {
202        self.iter.is_none() || self.owner.is_closed()
203    }
204}
205impl<'a, A: AsyncProducer, I: Iterator<Item = A::Item>> Future for PushIterFuture<'a, A, I> {
206    type Output = bool;
207
208    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
209        let mut waker_registered = false;
210        loop {
211            let mut iter = self.iter.take().unwrap();
212            if self.owner.is_closed() {
213                break Poll::Ready(false);
214            }
215            self.owner.push_iter(&mut iter);
216            if iter.peek().is_none() {
217                break Poll::Ready(true);
218            }
219            self.iter.replace(iter);
220            if waker_registered {
221                break Poll::Pending;
222            }
223            self.owner.register_waker(cx.waker());
224            waker_registered = true;
225        }
226    }
227}
228
229pub struct WaitVacantFuture<'a, A: AsyncProducer + ?Sized> {
230    owner: &'a A,
231    count: usize,
232    done: bool,
233}
234impl<'a, A: AsyncProducer> Unpin for WaitVacantFuture<'a, A> {}
235impl<'a, A: AsyncProducer> FusedFuture for WaitVacantFuture<'a, A> {
236    fn is_terminated(&self) -> bool {
237        self.done
238    }
239}
240impl<'a, A: AsyncProducer> Future for WaitVacantFuture<'a, A> {
241    type Output = ();
242
243    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
244        let mut waker_registered = false;
245        loop {
246            assert!(!self.done);
247            let closed = self.owner.is_closed();
248            if self.count <= self.owner.vacant_len() || closed {
249                break Poll::Ready(());
250            }
251            if waker_registered {
252                break Poll::Pending;
253            }
254            self.owner.register_waker(cx.waker());
255            waker_registered = true;
256        }
257    }
258}