1use core::{
2 future::Future,
3 pin::Pin,
4 task::{Context, Poll, Waker},
5};
6use futures::future::FusedFuture;
7use ringbuf::traits::Consumer;
8#[cfg(feature = "std")]
9use std::io;
10
11pub trait AsyncConsumer: Consumer {
12 fn register_waker(&self, waker: &Waker);
13
14 fn close(&mut self);
15 fn is_closed(&self) -> bool {
17 !self.write_is_held()
18 }
19
20 fn pop(&mut self) -> PopFuture<'_, Self> {
26 PopFuture { owner: self, done: false }
27 }
28
29 fn wait_occupied(&mut self, count: usize) -> WaitOccupiedFuture<'_, Self> {
35 debug_assert!(count <= self.capacity().get());
36 WaitOccupiedFuture {
37 owner: self,
38 count,
39 done: false,
40 }
41 }
42
43 fn pop_exact<'a: 'b, 'b>(&'a mut self, slice: &'b mut [Self::Item]) -> PopSliceFuture<'a, 'b, Self>
49 where
50 Self::Item: Copy,
51 {
52 PopSliceFuture {
53 owner: self,
54 slice: Some(slice),
55 count: 0,
56 }
57 }
58
59 #[cfg(feature = "alloc")]
60 fn pop_until_end<'a: 'b, 'b>(&'a mut self, vec: &'b mut alloc::vec::Vec<Self::Item>) -> PopVecFuture<'a, 'b, Self> {
61 PopVecFuture {
62 owner: self,
63 vec: Some(vec),
64 }
65 }
66
67 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
68 where
69 Self: Unpin,
70 {
71 let mut waker_registered = false;
72 loop {
73 let closed = self.is_closed();
74 if let Some(item) = self.try_pop() {
75 break Poll::Ready(Some(item));
76 }
77 if closed {
78 break Poll::Ready(None);
79 }
80 if waker_registered {
81 break Poll::Pending;
82 }
83 self.register_waker(cx.waker());
84 waker_registered = true;
85 }
86 }
87
88 #[cfg(feature = "std")]
89 fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>>
90 where
91 Self: AsyncConsumer<Item = u8> + Unpin,
92 {
93 let mut waker_registered = false;
94 loop {
95 let closed = self.is_closed();
96 let len = self.pop_slice(buf);
97 if len != 0 || closed {
98 break Poll::Ready(Ok(len));
99 }
100 if waker_registered {
101 break Poll::Pending;
102 }
103 self.register_waker(cx.waker());
104 waker_registered = true;
105 }
106 }
107}
108
109pub struct PopFuture<'a, A: AsyncConsumer + ?Sized> {
110 owner: &'a mut A,
111 done: bool,
112}
113impl<'a, A: AsyncConsumer> Unpin for PopFuture<'a, A> {}
114impl<'a, A: AsyncConsumer> FusedFuture for PopFuture<'a, A> {
115 fn is_terminated(&self) -> bool {
116 self.done || self.owner.is_closed()
117 }
118}
119impl<'a, A: AsyncConsumer> Future for PopFuture<'a, A> {
120 type Output = Option<A::Item>;
121
122 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
123 let mut waker_registered = false;
124 loop {
125 assert!(!self.done);
126 let closed = self.owner.is_closed();
127 if let Some(item) = self.owner.try_pop() {
128 self.done = true;
129 break Poll::Ready(Some(item));
130 }
131 if closed {
132 break Poll::Ready(None);
133 }
134 if waker_registered {
135 break Poll::Pending;
136 }
137 self.owner.register_waker(cx.waker());
138 waker_registered = true;
139 }
140 }
141}
142
143pub struct PopSliceFuture<'a, 'b, A: AsyncConsumer + ?Sized>
144where
145 A::Item: Copy,
146{
147 owner: &'a mut A,
148 slice: Option<&'b mut [A::Item]>,
149 count: usize,
150}
151impl<'a, 'b, A: AsyncConsumer> Unpin for PopSliceFuture<'a, 'b, A> where A::Item: Copy {}
152impl<'a, 'b, A: AsyncConsumer> FusedFuture for PopSliceFuture<'a, 'b, A>
153where
154 A::Item: Copy,
155{
156 fn is_terminated(&self) -> bool {
157 self.slice.is_none()
158 }
159}
160impl<'a, 'b, A: AsyncConsumer> Future for PopSliceFuture<'a, 'b, A>
161where
162 A::Item: Copy,
163{
164 type Output = Result<(), usize>;
165
166 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
167 let mut waker_registered = false;
168 loop {
169 let closed = self.owner.is_closed();
170 let mut slice = self.slice.take().unwrap();
171 let len = self.owner.pop_slice(slice);
172 slice = &mut slice[len..];
173 self.count += len;
174 if slice.is_empty() {
175 break Poll::Ready(Ok(()));
176 }
177 if closed {
178 break Poll::Ready(Err(self.count));
179 }
180 self.slice.replace(slice);
181 if waker_registered {
182 break Poll::Pending;
183 }
184 self.owner.register_waker(cx.waker());
185 waker_registered = true;
186 }
187 }
188}
189
190#[cfg(feature = "alloc")]
191pub struct PopVecFuture<'a, 'b, A: AsyncConsumer + ?Sized> {
192 owner: &'a mut A,
193 vec: Option<&'b mut alloc::vec::Vec<A::Item>>,
194}
195#[cfg(feature = "alloc")]
196impl<'a, 'b, A: AsyncConsumer> Unpin for PopVecFuture<'a, 'b, A> {}
197#[cfg(feature = "alloc")]
198impl<'a, 'b, A: AsyncConsumer> FusedFuture for PopVecFuture<'a, 'b, A> {
199 fn is_terminated(&self) -> bool {
200 self.vec.is_none()
201 }
202}
203#[cfg(feature = "alloc")]
204impl<'a, 'b, A: AsyncConsumer> Future for PopVecFuture<'a, 'b, A> {
205 type Output = ();
206
207 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
208 let mut waker_registered = false;
209 loop {
210 let closed = self.owner.is_closed();
211 let vec = self.vec.take().unwrap();
212
213 loop {
214 if vec.len() == vec.capacity() {
215 vec.reserve(vec.capacity().max(16));
216 }
217 let n = self.owner.pop_slice_uninit(vec.spare_capacity_mut());
218 if n == 0 {
219 break;
220 }
221 unsafe { vec.set_len(vec.len() + n) };
222 }
223
224 if closed {
225 break Poll::Ready(());
226 }
227 self.vec.replace(vec);
228 if waker_registered {
229 break Poll::Pending;
230 }
231 self.owner.register_waker(cx.waker());
232 waker_registered = true;
233 }
234 }
235}
236
237pub struct WaitOccupiedFuture<'a, A: AsyncConsumer + ?Sized> {
238 owner: &'a A,
239 count: usize,
240 done: bool,
241}
242impl<'a, A: AsyncConsumer> Unpin for WaitOccupiedFuture<'a, A> {}
243impl<'a, A: AsyncConsumer> FusedFuture for WaitOccupiedFuture<'a, A> {
244 fn is_terminated(&self) -> bool {
245 self.done
246 }
247}
248impl<'a, A: AsyncConsumer> Future for WaitOccupiedFuture<'a, A> {
249 type Output = ();
250
251 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
252 let mut waker_registered = false;
253 loop {
254 assert!(!self.done);
255 let closed = self.owner.is_closed();
256 if self.count <= self.owner.occupied_len() || closed {
257 break Poll::Ready(());
258 }
259 if waker_registered {
260 break Poll::Pending;
261 }
262 self.owner.register_waker(cx.waker());
263 waker_registered = true;
264 }
265 }
266}