1use core::future::Future;
6use core::pin::Pin;
7use core::task::{Context, Poll};
8
9use futures::stream::{FusedStream, FuturesUnordered, Stream};
10use pin_project::pin_project;
11
12#[pin_project]
17pub struct OneOrMany<F>(#[pin] Impl<F>);
18
19#[pin_project(project=OneInnerProj)]
22enum OneInner<F> {
23 Present(#[pin] F),
24 Absent,
25 AbsentNoneYielded,
26}
27
28impl<F> OneInner<F> {
29 fn take(&mut self) -> Option<F> {
30 let v = std::mem::replace(self, Self::Absent);
31 match v {
32 Self::Present(f) => Some(f),
33 Self::Absent => None,
34 Self::AbsentNoneYielded => {
35 *self = Self::AbsentNoneYielded;
37 None
38 }
39 }
40 }
41}
42
43#[pin_project(project=OneOrManyProj)]
44enum Impl<F> {
45 One(#[pin] OneInner<F>),
46 Many(#[pin] FuturesUnordered<F>),
47}
48
49impl<F> Default for OneOrMany<F> {
50 fn default() -> Self {
51 Self(Impl::One(OneInner::Absent))
52 }
53}
54
55impl<F> OneOrMany<F> {
56 pub fn new(f: F) -> Self {
63 Self(Impl::One(OneInner::Present(f)))
64 }
65
66 pub fn push(&mut self, f: F) {
73 let Self(this) = self;
74 match this {
75 Impl::One(o) => match o.take() {
76 None => *o = OneInner::Present(f),
77 Some(first) => *this = Impl::Many([first, f].into_iter().collect()),
78 },
79 Impl::Many(unordered) => {
80 if unordered.is_empty() {
81 *this = Impl::One(OneInner::Present(f))
89 } else {
90 unordered.push(f);
91 }
92 }
93 }
94 }
95
96 pub fn is_empty(&self) -> bool {
98 let Self(this) = self;
99 match this {
100 Impl::One(OneInner::Absent | OneInner::AbsentNoneYielded) => true,
101 Impl::One(OneInner::Present(_)) => false,
102 Impl::Many(many) => many.is_empty(),
103 }
104 }
105}
106
107impl<F: Future> Stream for OneOrMany<F> {
108 type Item = F::Output;
109
110 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
111 let this = self.project();
112 match this.0.project() {
113 OneOrManyProj::One(mut p) => match p.as_mut().project() {
114 OneInnerProj::Absent | OneInnerProj::AbsentNoneYielded => {
115 p.set(OneInner::AbsentNoneYielded);
116 Poll::Ready(None)
117 }
118 OneInnerProj::Present(f) => match f.poll(cx) {
119 Poll::Ready(t) => {
120 let output = Poll::Ready(Some(t));
121 p.set(OneInner::Absent);
122 output
123 }
124 Poll::Pending => Poll::Pending,
125 },
126 },
127 OneOrManyProj::Many(unordered) => {
128 unordered.poll_next(cx)
139 }
140 }
141 }
142}
143
144impl<F: Future> FusedStream for OneOrMany<F> {
145 fn is_terminated(&self) -> bool {
146 let Self(this) = self;
147 match this {
148 Impl::One(OneInner::Present(_) | OneInner::Absent) => false,
149 Impl::One(OneInner::AbsentNoneYielded) => true,
150 Impl::Many(unordered) => unordered.is_terminated(),
151 }
152 }
153}
154
155impl<F> FromIterator<F> for OneOrMany<F> {
156 fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
157 let mut iter = iter.into_iter();
158
159 Self(match iter.next() {
160 None => Impl::One(OneInner::Absent),
161 Some(first) => match iter.next() {
162 None => Impl::One(OneInner::Present(first)),
163 Some(second) => Impl::Many([first, second].into_iter().chain(iter).collect()),
164 },
165 })
166 }
167}
168
169#[cfg(test)]
170mod tests {
171 use std::collections::HashSet;
172
173 use crate::event::Event;
174 use assert_matches::assert_matches;
175 use futures::future::Ready;
176 use futures::{StreamExt as _, pin_mut};
177 use futures_test::task::new_count_waker;
178 use std::task::Waker;
179
180 use super::*;
181
182 #[test]
183 fn one_or_many_one() {
184 let (waker, count) = new_count_waker();
185 let mut context = Context::from_waker(&waker);
186
187 let event = Event::new();
188 let one_or_many = OneOrMany::new(event.wait());
189 pin_mut!(one_or_many);
190
191 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Pending);
192 assert_eq!(event.signal(), true);
193 assert_eq!(count, 1);
194
195 assert_eq!(one_or_many.poll_next(&mut context), Poll::Ready(Some(())));
196 assert_eq!(count, 1);
197 }
198
199 #[test]
200 fn one_or_many_one_poll_exhausted() {
201 let (waker, count) = new_count_waker();
202 let mut context = Context::from_waker(&waker);
203
204 let one_or_many = OneOrMany::new(futures::future::ready(()));
205 pin_mut!(one_or_many);
206 assert_eq!(one_or_many.is_terminated(), false);
207 assert_eq!(one_or_many.is_empty(), false);
208
209 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
210 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
211 assert_eq!(count, 0);
212 assert_eq!(one_or_many.is_terminated(), true);
213 assert_eq!(one_or_many.is_empty(), true);
214 }
215
216 #[test]
217 fn one_or_many_push_one() {
218 let (waker, count) = new_count_waker();
219 let mut context = Context::from_waker(&waker);
220
221 let mut one_or_many = OneOrMany::new(futures::future::ready(()));
222 one_or_many.push(futures::future::ready(()));
223 pin_mut!(one_or_many);
224 assert_eq!(one_or_many.is_terminated(), false);
225 assert_eq!(one_or_many.is_empty(), false);
226
227 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
228 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
229 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
230 assert_eq!(one_or_many.is_terminated(), true);
231 assert_eq!(count, 0);
232 assert_eq!(one_or_many.is_empty(), true);
233 }
234
235 #[test]
236 fn one_or_many_push_one_after_poll() {
237 let (waker, count) = new_count_waker();
238 let mut context = Context::from_waker(&waker);
239
240 let event = Event::new();
241 let one_or_many = OneOrMany::new(event.wait());
242 pin_mut!(one_or_many);
243 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Pending);
244 assert_eq!(one_or_many.is_empty(), false);
245
246 let other_event = Event::new();
247 one_or_many.push(other_event.wait());
248
249 assert_eq!(count, 0);
250 assert_eq!(event.signal(), true);
251 assert_eq!(count, 1);
252
253 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
254 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Pending);
255 assert_eq!(one_or_many.is_empty(), false);
256 }
257
258 #[test]
259 fn one_or_many_push_one_after_ready_before_poll() {
260 let (waker, count) = new_count_waker();
261 let mut context = Context::from_waker(&waker);
262
263 let event = Event::new();
264 let one_or_many = OneOrMany::new(event.wait());
265 pin_mut!(one_or_many);
266 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Pending);
267
268 assert_eq!(count, 0);
269 assert_eq!(event.signal(), true);
270
271 let other_event = Event::new();
272 one_or_many.push(other_event.wait());
273 assert_eq!(count, 1);
274
275 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
276 assert_eq!(one_or_many.poll_next(&mut context), Poll::Pending);
277 }
278
279 #[test]
280 fn one_or_many_one_exhausted_push() {
281 let (waker, count) = new_count_waker();
282 let mut context = Context::from_waker(&waker);
283
284 let one_or_many = OneOrMany::new(futures::future::ready(1));
285 pin_mut!(one_or_many);
286 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(1)));
287 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
288 assert_eq!(one_or_many.is_terminated(), true);
289
290 one_or_many.push(futures::future::ready(2));
291 assert_eq!(one_or_many.is_terminated(), false);
292 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(2)));
293 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
294 assert_eq!(one_or_many.is_terminated(), true);
295 assert_eq!(count, 0);
296 }
297
298 #[test]
299 fn one_or_many_many_exhausted_push() {
300 let (waker, count) = new_count_waker();
301 let mut context = Context::from_waker(&waker);
302
303 let one_or_many: OneOrMany<_> = [1, 2].into_iter().map(futures::future::ready).collect();
304 pin_mut!(one_or_many);
305
306 let mut values = [(); 2].map(|()| {
307 let poll = one_or_many.as_mut().poll_next(&mut context);
308 assert_matches!(poll, Poll::Ready(Some(i)) => i)
309 });
310 values.sort();
311 assert_eq!(values, [1, 2]);
312 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
313 assert_eq!(one_or_many.is_terminated(), true);
314
315 one_or_many.push(futures::future::ready(3));
316 assert_eq!(one_or_many.is_terminated(), false);
317
318 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(3)));
319 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
320 assert_eq!(one_or_many.is_terminated(), true);
321 assert_eq!(count, 0)
322 }
323
324 #[test]
325 fn one_or_many_collect_none() {
326 let waker = Waker::noop();
327 let mut context = Context::from_waker(&waker);
328
329 let one_or_many: OneOrMany<Ready<()>> = std::iter::empty().collect();
330 pin_mut!(one_or_many);
331
332 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
333 assert_eq!(one_or_many.is_empty(), true);
334 }
335
336 #[test]
337 fn one_or_many_collect_one() {
338 let waker = Waker::noop();
339 let mut context = Context::from_waker(&waker);
340
341 let one_or_many: OneOrMany<_> = std::iter::once(futures::future::ready(1)).collect();
342 pin_mut!(one_or_many);
343
344 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(1)));
345 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
346 assert_eq!(one_or_many.is_empty(), true);
347 }
348
349 #[test]
350 fn one_or_many_collect_multiple() {
351 let waker = Waker::noop();
352 let mut context = Context::from_waker(&waker);
353
354 let one_or_many: OneOrMany<_> =
355 (1..=5).into_iter().map(|i| futures::future::ready(i)).collect();
356
357 let fut = one_or_many.collect();
358 pin_mut!(fut);
359 let all: HashSet<_> = assert_matches!(fut.poll(&mut context), Poll::Ready(x) => x);
360 assert_eq!(all, HashSet::from_iter(1..=5));
361 }
362
363 #[test]
364 fn fused_stream() {
365 let waker = futures_test::task::panic_waker();
366 let mut context = Context::from_waker(&waker);
367
368 let one_or_many = OneOrMany::<_>::default();
369 pin_mut!(one_or_many);
370 assert!(!one_or_many.is_terminated());
371 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
372 assert!(one_or_many.is_terminated());
373
374 one_or_many.as_mut().push(futures::future::ready(()));
375 assert!(!one_or_many.is_terminated());
376 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
377 assert!(!one_or_many.is_terminated());
378 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
379 assert!(one_or_many.is_terminated());
380
381 one_or_many.as_mut().push(futures::future::ready(()));
384 assert!(!one_or_many.is_terminated());
385 one_or_many.as_mut().push(futures::future::ready(()));
386 assert!(!one_or_many.is_terminated());
387 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
388 assert!(!one_or_many.is_terminated());
389 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
390 assert!(!one_or_many.is_terminated());
391 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
392 assert!(one_or_many.is_terminated());
393 }
394}