1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
//! Definition of the `TryJoinAll` combinator, waiting for all of a list of
//! futures to finish with either success or error.
use alloc::boxed::Box;
use alloc::vec::Vec;
use core::fmt;
use core::future::Future;
use core::iter::FromIterator;
use core::mem;
use core::pin::Pin;
use core::task::{Context, Poll};
use super::{assert_future, TryFuture, TryMaybeDone};
fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> {
// Safety: `std` _could_ make this unsound if it were to decide Pin's
// invariants aren't required to transmit through slices. Otherwise this has
// the same safety as a normal field pin projection.
unsafe { slice.get_unchecked_mut() }.iter_mut().map(|t| unsafe { Pin::new_unchecked(t) })
}
enum FinalState<E = ()> {
Pending,
AllDone,
Error(E),
}
/// Future for the [`try_join_all`] function.
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct TryJoinAll<F>
where
F: TryFuture,
{
elems: Pin<Box<[TryMaybeDone<F>]>>,
}
impl<F> fmt::Debug for TryJoinAll<F>
where
F: TryFuture + fmt::Debug,
F::Ok: fmt::Debug,
F::Error: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TryJoinAll").field("elems", &self.elems).finish()
}
}
/// Creates a future which represents either a collection of the results of the
/// futures given or an error.
///
/// The returned future will drive execution for all of its underlying futures,
/// collecting the results into a destination `Vec<T>` in the same order as they
/// were provided.
///
/// If any future returns an error then all other futures will be canceled and
/// an error will be returned immediately. If all futures complete successfully,
/// however, then the returned future will succeed with a `Vec` of all the
/// successful results.
///
/// This function is only available when the `std` or `alloc` feature of this
/// library is activated, and it is activated by default.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::future::{self, try_join_all};
///
/// let futures = vec![
/// future::ok::<u32, u32>(1),
/// future::ok::<u32, u32>(2),
/// future::ok::<u32, u32>(3),
/// ];
///
/// assert_eq!(try_join_all(futures).await, Ok(vec![1, 2, 3]));
///
/// let futures = vec![
/// future::ok::<u32, u32>(1),
/// future::err::<u32, u32>(2),
/// future::ok::<u32, u32>(3),
/// ];
///
/// assert_eq!(try_join_all(futures).await, Err(2));
/// # });
/// ```
pub fn try_join_all<I>(i: I) -> TryJoinAll<I::Item>
where
I: IntoIterator,
I::Item: TryFuture,
{
let elems: Box<[_]> = i.into_iter().map(TryMaybeDone::Future).collect();
assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>(
TryJoinAll { elems: elems.into() },
)
}
impl<F> Future for TryJoinAll<F>
where
F: TryFuture,
{
type Output = Result<Vec<F::Ok>, F::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut state = FinalState::AllDone;
for elem in iter_pin_mut(self.elems.as_mut()) {
match elem.try_poll(cx) {
Poll::Pending => state = FinalState::Pending,
Poll::Ready(Ok(())) => {}
Poll::Ready(Err(e)) => {
state = FinalState::Error(e);
break;
}
}
}
match state {
FinalState::Pending => Poll::Pending,
FinalState::AllDone => {
let mut elems = mem::replace(&mut self.elems, Box::pin([]));
let results =
iter_pin_mut(elems.as_mut()).map(|e| e.take_output().unwrap()).collect();
Poll::Ready(Ok(results))
}
FinalState::Error(e) => {
let _ = mem::replace(&mut self.elems, Box::pin([]));
Poll::Ready(Err(e))
}
}
}
}
impl<F: TryFuture> FromIterator<F> for TryJoinAll<F> {
fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
try_join_all(iter)
}
}