async_utils/stream/
one_or_many.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use core::future::Future;
6use core::pin::Pin;
7use core::task::{Context, Poll};
8
9use futures::stream::{FusedStream, FuturesUnordered, Stream};
10use pin_project::pin_project;
11
12/// A collection of multiple futures that optimizes for the single-future case.
13///
14/// Instances of `OneOrMany` can be created with `Default`, [`OneOrMany::new`],
15/// or as the result of `.collect()`ing from an iterator.
16#[pin_project]
17pub struct OneOrMany<F>(#[pin] Impl<F>);
18
19/// Maintains internal state for the [`Impl::One`] case, keeping track of when
20/// `None` is already yielded to provide a correct `FusedFuture` implementation.
21#[pin_project(project=OneInnerProj)]
22enum OneInner<F> {
23    Present(#[pin] F),
24    Absent,
25    AbsentNoneYielded,
26}
27
28impl<F> OneInner<F> {
29    fn take(&mut self) -> Option<F> {
30        let v = std::mem::replace(self, Self::Absent);
31        match v {
32            Self::Present(f) => Some(f),
33            Self::Absent => None,
34            Self::AbsentNoneYielded => {
35                // Restore back the NoneYieldedState.
36                *self = Self::AbsentNoneYielded;
37                None
38            }
39        }
40    }
41}
42
43#[pin_project(project=OneOrManyProj)]
44enum Impl<F> {
45    One(#[pin] OneInner<F>),
46    Many(#[pin] FuturesUnordered<F>),
47}
48
49impl<F> Default for OneOrMany<F> {
50    fn default() -> Self {
51        Self(Impl::One(OneInner::Absent))
52    }
53}
54
55impl<F> OneOrMany<F> {
56    /// Constructs a `OneOrMany` with a single future.
57    ///
58    /// Constructs a new `OneOrMany` with exactly one future. If no additional
59    /// futures are added via [`push`](OneOrMany::push), this is behaviorally
60    /// identical to constructing a stream by providing the future to
61    /// [`futures::stream::once`].
62    pub fn new(f: F) -> Self {
63        Self(Impl::One(OneInner::Present(f)))
64    }
65
66    /// Appends a new future to the set of pending futures.
67    ///
68    /// Like [`FuturesUnordered::push`], this doesn't call
69    /// [`poll`](Future::poll) on the provided future. The caller must ensure
70    /// that [`poll_next`](Stream::poll_next) is called in order to receive
71    /// wake-up notifications for the provided future.
72    pub fn push(&mut self, f: F) {
73        let Self(this) = self;
74        match this {
75            Impl::One(o) => match o.take() {
76                None => *o = OneInner::Present(f),
77                Some(first) => *this = Impl::Many([first, f].into_iter().collect()),
78            },
79            Impl::Many(unordered) => {
80                if unordered.is_empty() {
81                    // Opportunistically switch back to `One`, but only if there
82                    // are no more futures. This is expensive in the short term
83                    // but more performant on average assuming most of the time
84                    // this `OneOrMany` is holding zero or one futures.
85                    // Otherwise the cost of allocating and deallocating a
86                    // `FuturesUnordered` would outweigh the gains of less
87                    // indirection.
88                    *this = Impl::One(OneInner::Present(f))
89                } else {
90                    unordered.push(f);
91                }
92            }
93        }
94    }
95
96    /// Returns true if and only if there are no futures held.
97    pub fn is_empty(&self) -> bool {
98        let Self(this) = self;
99        match this {
100            Impl::One(OneInner::Absent | OneInner::AbsentNoneYielded) => true,
101            Impl::One(OneInner::Present(_)) => false,
102            Impl::Many(many) => many.is_empty(),
103        }
104    }
105}
106
107impl<F: Future> Stream for OneOrMany<F> {
108    type Item = F::Output;
109
110    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
111        let this = self.project();
112        match this.0.project() {
113            OneOrManyProj::One(mut p) => match p.as_mut().project() {
114                OneInnerProj::Absent | OneInnerProj::AbsentNoneYielded => {
115                    p.set(OneInner::AbsentNoneYielded);
116                    Poll::Ready(None)
117                }
118                OneInnerProj::Present(f) => match f.poll(cx) {
119                    Poll::Ready(t) => {
120                        let output = Poll::Ready(Some(t));
121                        p.set(OneInner::Absent);
122                        output
123                    }
124                    Poll::Pending => Poll::Pending,
125                },
126            },
127            OneOrManyProj::Many(unordered) => {
128                // Instead of returning the value directly, we could check
129                // whether `unordered` contains a single element and, if so,
130                // return `this` to the `Impl::One` case. We avoid doing that
131                // because it's unfriendly to an expected common pattern, where
132                // the OneOrMany holds two futures and a new one is added every
133                // time one of them completes (e.g. if the futures are
134                // constructed from streams). Instead we implement a little bit
135                // of hysteresis here and in the `Impl::Many` case in `push`. by
136                // requiring `unordered` to be completely empty before reverting
137                // to `Impl::One`.
138                unordered.poll_next(cx)
139            }
140        }
141    }
142}
143
144impl<F: Future> FusedStream for OneOrMany<F> {
145    fn is_terminated(&self) -> bool {
146        let Self(this) = self;
147        match this {
148            Impl::One(OneInner::Present(_) | OneInner::Absent) => false,
149            Impl::One(OneInner::AbsentNoneYielded) => true,
150            Impl::Many(unordered) => unordered.is_terminated(),
151        }
152    }
153}
154
155impl<F> FromIterator<F> for OneOrMany<F> {
156    fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
157        let mut iter = iter.into_iter();
158
159        Self(match iter.next() {
160            None => Impl::One(OneInner::Absent),
161            Some(first) => match iter.next() {
162                None => Impl::One(OneInner::Present(first)),
163                Some(second) => Impl::Many([first, second].into_iter().chain(iter).collect()),
164            },
165        })
166    }
167}
168
169#[cfg(test)]
170mod tests {
171    use std::collections::HashSet;
172
173    use crate::event::Event;
174    use assert_matches::assert_matches;
175    use futures::future::Ready;
176    use futures::{StreamExt as _, pin_mut};
177    use futures_test::task::new_count_waker;
178    use std::task::Waker;
179
180    use super::*;
181
182    #[test]
183    fn one_or_many_one() {
184        let (waker, count) = new_count_waker();
185        let mut context = Context::from_waker(&waker);
186
187        let event = Event::new();
188        let one_or_many = OneOrMany::new(event.wait());
189        pin_mut!(one_or_many);
190
191        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Pending);
192        assert_eq!(event.signal(), true);
193        assert_eq!(count, 1);
194
195        assert_eq!(one_or_many.poll_next(&mut context), Poll::Ready(Some(())));
196        assert_eq!(count, 1);
197    }
198
199    #[test]
200    fn one_or_many_one_poll_exhausted() {
201        let (waker, count) = new_count_waker();
202        let mut context = Context::from_waker(&waker);
203
204        let one_or_many = OneOrMany::new(futures::future::ready(()));
205        pin_mut!(one_or_many);
206        assert_eq!(one_or_many.is_terminated(), false);
207        assert_eq!(one_or_many.is_empty(), false);
208
209        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
210        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
211        assert_eq!(count, 0);
212        assert_eq!(one_or_many.is_terminated(), true);
213        assert_eq!(one_or_many.is_empty(), true);
214    }
215
216    #[test]
217    fn one_or_many_push_one() {
218        let (waker, count) = new_count_waker();
219        let mut context = Context::from_waker(&waker);
220
221        let mut one_or_many = OneOrMany::new(futures::future::ready(()));
222        one_or_many.push(futures::future::ready(()));
223        pin_mut!(one_or_many);
224        assert_eq!(one_or_many.is_terminated(), false);
225        assert_eq!(one_or_many.is_empty(), false);
226
227        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
228        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
229        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
230        assert_eq!(one_or_many.is_terminated(), true);
231        assert_eq!(count, 0);
232        assert_eq!(one_or_many.is_empty(), true);
233    }
234
235    #[test]
236    fn one_or_many_push_one_after_poll() {
237        let (waker, count) = new_count_waker();
238        let mut context = Context::from_waker(&waker);
239
240        let event = Event::new();
241        let one_or_many = OneOrMany::new(event.wait());
242        pin_mut!(one_or_many);
243        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Pending);
244        assert_eq!(one_or_many.is_empty(), false);
245
246        let other_event = Event::new();
247        one_or_many.push(other_event.wait());
248
249        assert_eq!(count, 0);
250        assert_eq!(event.signal(), true);
251        assert_eq!(count, 1);
252
253        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
254        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Pending);
255        assert_eq!(one_or_many.is_empty(), false);
256    }
257
258    #[test]
259    fn one_or_many_push_one_after_ready_before_poll() {
260        let (waker, count) = new_count_waker();
261        let mut context = Context::from_waker(&waker);
262
263        let event = Event::new();
264        let one_or_many = OneOrMany::new(event.wait());
265        pin_mut!(one_or_many);
266        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Pending);
267
268        assert_eq!(count, 0);
269        assert_eq!(event.signal(), true);
270
271        let other_event = Event::new();
272        one_or_many.push(other_event.wait());
273        assert_eq!(count, 1);
274
275        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
276        assert_eq!(one_or_many.poll_next(&mut context), Poll::Pending);
277    }
278
279    #[test]
280    fn one_or_many_one_exhausted_push() {
281        let (waker, count) = new_count_waker();
282        let mut context = Context::from_waker(&waker);
283
284        let one_or_many = OneOrMany::new(futures::future::ready(1));
285        pin_mut!(one_or_many);
286        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(1)));
287        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
288        assert_eq!(one_or_many.is_terminated(), true);
289
290        one_or_many.push(futures::future::ready(2));
291        assert_eq!(one_or_many.is_terminated(), false);
292        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(2)));
293        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
294        assert_eq!(one_or_many.is_terminated(), true);
295        assert_eq!(count, 0);
296    }
297
298    #[test]
299    fn one_or_many_many_exhausted_push() {
300        let (waker, count) = new_count_waker();
301        let mut context = Context::from_waker(&waker);
302
303        let one_or_many: OneOrMany<_> = [1, 2].into_iter().map(futures::future::ready).collect();
304        pin_mut!(one_or_many);
305
306        let mut values = [(); 2].map(|()| {
307            let poll = one_or_many.as_mut().poll_next(&mut context);
308            assert_matches!(poll, Poll::Ready(Some(i)) => i)
309        });
310        values.sort();
311        assert_eq!(values, [1, 2]);
312        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
313        assert_eq!(one_or_many.is_terminated(), true);
314
315        one_or_many.push(futures::future::ready(3));
316        assert_eq!(one_or_many.is_terminated(), false);
317
318        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(3)));
319        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
320        assert_eq!(one_or_many.is_terminated(), true);
321        assert_eq!(count, 0)
322    }
323
324    #[test]
325    fn one_or_many_collect_none() {
326        let waker = Waker::noop();
327        let mut context = Context::from_waker(&waker);
328
329        let one_or_many: OneOrMany<Ready<()>> = std::iter::empty().collect();
330        pin_mut!(one_or_many);
331
332        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
333        assert_eq!(one_or_many.is_empty(), true);
334    }
335
336    #[test]
337    fn one_or_many_collect_one() {
338        let waker = Waker::noop();
339        let mut context = Context::from_waker(&waker);
340
341        let one_or_many: OneOrMany<_> = std::iter::once(futures::future::ready(1)).collect();
342        pin_mut!(one_or_many);
343
344        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(1)));
345        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
346        assert_eq!(one_or_many.is_empty(), true);
347    }
348
349    #[test]
350    fn one_or_many_collect_multiple() {
351        let waker = Waker::noop();
352        let mut context = Context::from_waker(&waker);
353
354        let one_or_many: OneOrMany<_> =
355            (1..=5).into_iter().map(|i| futures::future::ready(i)).collect();
356
357        let fut = one_or_many.collect();
358        pin_mut!(fut);
359        let all: HashSet<_> = assert_matches!(fut.poll(&mut context), Poll::Ready(x) => x);
360        assert_eq!(all, HashSet::from_iter(1..=5));
361    }
362
363    #[test]
364    fn fused_stream() {
365        let waker = futures_test::task::panic_waker();
366        let mut context = Context::from_waker(&waker);
367
368        let one_or_many = OneOrMany::<_>::default();
369        pin_mut!(one_or_many);
370        assert!(!one_or_many.is_terminated());
371        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
372        assert!(one_or_many.is_terminated());
373
374        one_or_many.as_mut().push(futures::future::ready(()));
375        assert!(!one_or_many.is_terminated());
376        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
377        assert!(!one_or_many.is_terminated());
378        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
379        assert!(one_or_many.is_terminated());
380
381        // Do it again with two futures to test the FuturesUnordered passthrough
382        // case.
383        one_or_many.as_mut().push(futures::future::ready(()));
384        assert!(!one_or_many.is_terminated());
385        one_or_many.as_mut().push(futures::future::ready(()));
386        assert!(!one_or_many.is_terminated());
387        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
388        assert!(!one_or_many.is_terminated());
389        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
390        assert!(!one_or_many.is_terminated());
391        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
392        assert!(one_or_many.is_terminated());
393    }
394}