use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use futures::stream::{FusedStream, FuturesUnordered, Stream};
use pin_project::pin_project;
#[pin_project]
pub struct OneOrMany<F>(#[pin] Impl<F>);
#[pin_project(project=OneInnerProj)]
enum OneInner<F> {
Present(#[pin] F),
Absent,
AbsentNoneYielded,
}
impl<F> OneInner<F> {
fn take(&mut self) -> Option<F> {
let v = std::mem::replace(self, Self::Absent);
match v {
Self::Present(f) => Some(f),
Self::Absent => None,
Self::AbsentNoneYielded => {
*self = Self::AbsentNoneYielded;
None
}
}
}
}
#[pin_project(project=OneOrManyProj)]
enum Impl<F> {
One(#[pin] OneInner<F>),
Many(#[pin] FuturesUnordered<F>),
}
impl<F> Default for OneOrMany<F> {
fn default() -> Self {
Self(Impl::One(OneInner::Absent))
}
}
impl<F> OneOrMany<F> {
pub fn new(f: F) -> Self {
Self(Impl::One(OneInner::Present(f)))
}
pub fn push(&mut self, f: F) {
let Self(this) = self;
match this {
Impl::One(o) => match o.take() {
None => *o = OneInner::Present(f),
Some(first) => *this = Impl::Many([first, f].into_iter().collect()),
},
Impl::Many(unordered) => {
if unordered.is_empty() {
*this = Impl::One(OneInner::Present(f))
} else {
unordered.push(f);
}
}
}
}
pub fn is_empty(&self) -> bool {
let Self(this) = self;
match this {
Impl::One(OneInner::Absent | OneInner::AbsentNoneYielded) => true,
Impl::One(OneInner::Present(_)) => false,
Impl::Many(many) => many.is_empty(),
}
}
}
impl<F: Future> Stream for OneOrMany<F> {
type Item = F::Output;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
match this.0.project() {
OneOrManyProj::One(mut p) => match p.as_mut().project() {
OneInnerProj::Absent | OneInnerProj::AbsentNoneYielded => {
p.set(OneInner::AbsentNoneYielded);
Poll::Ready(None)
}
OneInnerProj::Present(f) => match f.poll(cx) {
Poll::Ready(t) => {
let output = Poll::Ready(Some(t));
p.set(OneInner::Absent);
output
}
Poll::Pending => Poll::Pending,
},
},
OneOrManyProj::Many(unordered) => {
unordered.poll_next(cx)
}
}
}
}
impl<F: Future> FusedStream for OneOrMany<F> {
fn is_terminated(&self) -> bool {
let Self(this) = self;
match this {
Impl::One(OneInner::Present(_) | OneInner::Absent) => false,
Impl::One(OneInner::AbsentNoneYielded) => true,
Impl::Many(unordered) => unordered.is_terminated(),
}
}
}
impl<F> FromIterator<F> for OneOrMany<F> {
fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
let mut iter = iter.into_iter();
Self(match iter.next() {
None => Impl::One(OneInner::Absent),
Some(first) => match iter.next() {
None => Impl::One(OneInner::Present(first)),
Some(second) => Impl::Many([first, second].into_iter().chain(iter).collect()),
},
})
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use crate::event::Event;
use assert_matches::assert_matches;
use futures::future::Ready;
use futures::{pin_mut, StreamExt as _};
use futures_test::task::{new_count_waker, noop_waker};
use super::*;
#[test]
fn one_or_many_one() {
let (waker, count) = new_count_waker();
let mut context = Context::from_waker(&waker);
let event = Event::new();
let one_or_many = OneOrMany::new(event.wait());
pin_mut!(one_or_many);
assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Pending);
assert_eq!(event.signal(), true);
assert_eq!(count, 1);
assert_eq!(one_or_many.poll_next(&mut context), Poll::Ready(Some(())));
assert_eq!(count, 1);
}
#[test]
fn one_or_many_one_poll_exhausted() {
let (waker, count) = new_count_waker();
let mut context = Context::from_waker(&waker);
let one_or_many = OneOrMany::new(futures::future::ready(()));
pin_mut!(one_or_many);
assert_eq!(one_or_many.is_terminated(), false);
assert_eq!(one_or_many.is_empty(), false);
assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
assert_eq!(count, 0);
assert_eq!(one_or_many.is_terminated(), true);
assert_eq!(one_or_many.is_empty(), true);
}
#[test]
fn one_or_many_push_one() {
let (waker, count) = new_count_waker();
let mut context = Context::from_waker(&waker);
let mut one_or_many = OneOrMany::new(futures::future::ready(()));
one_or_many.push(futures::future::ready(()));
pin_mut!(one_or_many);
assert_eq!(one_or_many.is_terminated(), false);
assert_eq!(one_or_many.is_empty(), false);
assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
assert_eq!(one_or_many.is_terminated(), true);
assert_eq!(count, 0);
assert_eq!(one_or_many.is_empty(), true);
}
#[test]
fn one_or_many_push_one_after_poll() {
let (waker, count) = new_count_waker();
let mut context = Context::from_waker(&waker);
let event = Event::new();
let one_or_many = OneOrMany::new(event.wait());
pin_mut!(one_or_many);
assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Pending);
assert_eq!(one_or_many.is_empty(), false);
let other_event = Event::new();
one_or_many.push(other_event.wait());
assert_eq!(count, 0);
assert_eq!(event.signal(), true);
assert_eq!(count, 1);
assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Pending);
assert_eq!(one_or_many.is_empty(), false);
}
#[test]
fn one_or_many_push_one_after_ready_before_poll() {
let (waker, count) = new_count_waker();
let mut context = Context::from_waker(&waker);
let event = Event::new();
let one_or_many = OneOrMany::new(event.wait());
pin_mut!(one_or_many);
assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Pending);
assert_eq!(count, 0);
assert_eq!(event.signal(), true);
let other_event = Event::new();
one_or_many.push(other_event.wait());
assert_eq!(count, 1);
assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
assert_eq!(one_or_many.poll_next(&mut context), Poll::Pending);
}
#[test]
fn one_or_many_one_exhausted_push() {
let (waker, count) = new_count_waker();
let mut context = Context::from_waker(&waker);
let one_or_many = OneOrMany::new(futures::future::ready(1));
pin_mut!(one_or_many);
assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(1)));
assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
assert_eq!(one_or_many.is_terminated(), true);
one_or_many.push(futures::future::ready(2));
assert_eq!(one_or_many.is_terminated(), false);
assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(2)));
assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
assert_eq!(one_or_many.is_terminated(), true);
assert_eq!(count, 0);
}
#[test]
fn one_or_many_many_exhausted_push() {
let (waker, count) = new_count_waker();
let mut context = Context::from_waker(&waker);
let one_or_many: OneOrMany<_> = [1, 2].into_iter().map(futures::future::ready).collect();
pin_mut!(one_or_many);
let mut values = [(); 2].map(|()| {
let poll = one_or_many.as_mut().poll_next(&mut context);
assert_matches!(poll, Poll::Ready(Some(i)) => i)
});
values.sort();
assert_eq!(values, [1, 2]);
assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
assert_eq!(one_or_many.is_terminated(), true);
one_or_many.push(futures::future::ready(3));
assert_eq!(one_or_many.is_terminated(), false);
assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(3)));
assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
assert_eq!(one_or_many.is_terminated(), true);
assert_eq!(count, 0)
}
#[test]
fn one_or_many_collect_none() {
let waker = noop_waker();
let mut context = Context::from_waker(&waker);
let one_or_many: OneOrMany<Ready<()>> = std::iter::empty().collect();
pin_mut!(one_or_many);
assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
assert_eq!(one_or_many.is_empty(), true);
}
#[test]
fn one_or_many_collect_one() {
let waker = noop_waker();
let mut context = Context::from_waker(&waker);
let one_or_many: OneOrMany<_> = std::iter::once(futures::future::ready(1)).collect();
pin_mut!(one_or_many);
assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(1)));
assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
assert_eq!(one_or_many.is_empty(), true);
}
#[test]
fn one_or_many_collect_multiple() {
let waker = noop_waker();
let mut context = Context::from_waker(&waker);
let one_or_many: OneOrMany<_> =
(1..=5).into_iter().map(|i| futures::future::ready(i)).collect();
let fut = one_or_many.collect();
pin_mut!(fut);
let all: HashSet<_> = assert_matches!(fut.poll(&mut context), Poll::Ready(x) => x);
assert_eq!(all, HashSet::from_iter(1..=5));
}
#[test]
fn fused_stream() {
let waker = futures_test::task::panic_waker();
let mut context = Context::from_waker(&waker);
let one_or_many = OneOrMany::<_>::default();
pin_mut!(one_or_many);
assert!(!one_or_many.is_terminated());
assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
assert!(one_or_many.is_terminated());
one_or_many.as_mut().push(futures::future::ready(()));
assert!(!one_or_many.is_terminated());
assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
assert!(!one_or_many.is_terminated());
assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
assert!(one_or_many.is_terminated());
one_or_many.as_mut().push(futures::future::ready(()));
assert!(!one_or_many.is_terminated());
one_or_many.as_mut().push(futures::future::ready(()));
assert!(!one_or_many.is_terminated());
assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
assert!(!one_or_many.is_terminated());
assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
assert!(!one_or_many.is_terminated());
assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
assert!(one_or_many.is_terminated());
}
}