1use core::{
2 future::Future,
3 iter::Peekable,
4 pin::Pin,
5 task::{Context, Poll, Waker},
6};
7use futures::future::FusedFuture;
8use ringbuf::traits::Producer;
9#[cfg(feature = "std")]
10use std::io;
11
12pub trait AsyncProducer: Producer {
13 fn register_waker(&self, waker: &Waker);
14
15 fn close(&mut self);
16 fn is_closed(&self) -> bool {
18 !self.read_is_held()
19 }
20
21 fn push(&mut self, item: Self::Item) -> PushFuture<'_, Self> {
27 PushFuture {
28 owner: self,
29 item: Some(item),
30 }
31 }
32
33 fn push_iter_all<I: Iterator<Item = Self::Item>>(&mut self, iter: I) -> PushIterFuture<'_, Self, I> {
39 PushIterFuture {
40 owner: self,
41 iter: Some(iter.peekable()),
42 }
43 }
44
45 fn wait_vacant(&mut self, count: usize) -> WaitVacantFuture<'_, Self> {
51 debug_assert!(count <= self.capacity().get());
52 WaitVacantFuture {
53 owner: self,
54 count,
55 done: false,
56 }
57 }
58
59 fn push_exact<'a: 'b, 'b>(&'a mut self, slice: &'b [Self::Item]) -> PushSliceFuture<'a, 'b, Self>
65 where
66 Self::Item: Copy,
67 {
68 PushSliceFuture {
69 owner: self,
70 slice: Some(slice),
71 count: 0,
72 }
73 }
74
75 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<bool> {
76 let mut waker_registered = false;
77 loop {
78 if self.is_closed() {
79 break Poll::Ready(false);
80 }
81 if !self.is_full() {
82 break Poll::Ready(true);
83 }
84 if waker_registered {
85 break Poll::Pending;
86 }
87 self.register_waker(cx.waker());
88 waker_registered = true;
89 }
90 }
91
92 #[cfg(feature = "std")]
93 fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>>
94 where
95 Self: AsyncProducer<Item = u8> + Unpin,
96 {
97 let mut waker_registered = false;
98 loop {
99 if self.is_closed() {
100 break Poll::Ready(Ok(0));
101 }
102 let count = self.push_slice(buf);
103 if count > 0 {
104 break Poll::Ready(Ok(count));
105 }
106 if waker_registered {
107 break Poll::Pending;
108 }
109 self.register_waker(cx.waker());
110 waker_registered = true;
111 }
112 }
113}
114
115pub struct PushFuture<'a, A: AsyncProducer + ?Sized> {
116 owner: &'a mut A,
117 item: Option<A::Item>,
118}
119impl<'a, A: AsyncProducer> Unpin for PushFuture<'a, A> {}
120impl<'a, A: AsyncProducer> FusedFuture for PushFuture<'a, A> {
121 fn is_terminated(&self) -> bool {
122 self.item.is_none()
123 }
124}
125impl<'a, A: AsyncProducer> Future for PushFuture<'a, A> {
126 type Output = Result<(), A::Item>;
127
128 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
129 let mut waker_registered = false;
130 loop {
131 let item = self.item.take().unwrap();
132 if self.owner.is_closed() {
133 break Poll::Ready(Err(item));
134 }
135 let push_result = self.owner.try_push(item);
136 if push_result.is_ok() {
137 break Poll::Ready(Ok(()));
138 }
139 self.item.replace(push_result.unwrap_err());
140 if waker_registered {
141 break Poll::Pending;
142 }
143 self.owner.register_waker(cx.waker());
144 waker_registered = true;
145 }
146 }
147}
148
149pub struct PushSliceFuture<'a, 'b, A: AsyncProducer + ?Sized>
150where
151 A::Item: Copy,
152{
153 owner: &'a mut A,
154 slice: Option<&'b [A::Item]>,
155 count: usize,
156}
157impl<'a, 'b, A: AsyncProducer> Unpin for PushSliceFuture<'a, 'b, A> where A::Item: Copy {}
158impl<'a, 'b, A: AsyncProducer> FusedFuture for PushSliceFuture<'a, 'b, A>
159where
160 A::Item: Copy,
161{
162 fn is_terminated(&self) -> bool {
163 self.slice.is_none()
164 }
165}
166impl<'a, 'b, A: AsyncProducer> Future for PushSliceFuture<'a, 'b, A>
167where
168 A::Item: Copy,
169{
170 type Output = Result<(), usize>;
171
172 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
173 let mut waker_registered = false;
174 loop {
175 let mut slice = self.slice.take().unwrap();
176 if self.owner.is_closed() {
177 break Poll::Ready(Err(self.count));
178 }
179 let len = self.owner.push_slice(slice);
180 slice = &slice[len..];
181 self.count += len;
182 if slice.is_empty() {
183 break Poll::Ready(Ok(()));
184 }
185 self.slice.replace(slice);
186 if waker_registered {
187 break Poll::Pending;
188 }
189 self.owner.register_waker(cx.waker());
190 waker_registered = true;
191 }
192 }
193}
194
195pub struct PushIterFuture<'a, A: AsyncProducer + ?Sized, I: Iterator<Item = A::Item>> {
196 owner: &'a mut A,
197 iter: Option<Peekable<I>>,
198}
199impl<'a, A: AsyncProducer, I: Iterator<Item = A::Item>> Unpin for PushIterFuture<'a, A, I> {}
200impl<'a, A: AsyncProducer, I: Iterator<Item = A::Item>> FusedFuture for PushIterFuture<'a, A, I> {
201 fn is_terminated(&self) -> bool {
202 self.iter.is_none() || self.owner.is_closed()
203 }
204}
205impl<'a, A: AsyncProducer, I: Iterator<Item = A::Item>> Future for PushIterFuture<'a, A, I> {
206 type Output = bool;
207
208 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
209 let mut waker_registered = false;
210 loop {
211 let mut iter = self.iter.take().unwrap();
212 if self.owner.is_closed() {
213 break Poll::Ready(false);
214 }
215 self.owner.push_iter(&mut iter);
216 if iter.peek().is_none() {
217 break Poll::Ready(true);
218 }
219 self.iter.replace(iter);
220 if waker_registered {
221 break Poll::Pending;
222 }
223 self.owner.register_waker(cx.waker());
224 waker_registered = true;
225 }
226 }
227}
228
229pub struct WaitVacantFuture<'a, A: AsyncProducer + ?Sized> {
230 owner: &'a A,
231 count: usize,
232 done: bool,
233}
234impl<'a, A: AsyncProducer> Unpin for WaitVacantFuture<'a, A> {}
235impl<'a, A: AsyncProducer> FusedFuture for WaitVacantFuture<'a, A> {
236 fn is_terminated(&self) -> bool {
237 self.done
238 }
239}
240impl<'a, A: AsyncProducer> Future for WaitVacantFuture<'a, A> {
241 type Output = ();
242
243 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
244 let mut waker_registered = false;
245 loop {
246 assert!(!self.done);
247 let closed = self.owner.is_closed();
248 if self.count <= self.owner.vacant_len() || closed {
249 break Poll::Ready(());
250 }
251 if waker_registered {
252 break Poll::Pending;
253 }
254 self.owner.register_waker(cx.waker());
255 waker_registered = true;
256 }
257 }
258}