futures_util/stream/try_stream/
try_flatten_unordered.rs

1use core::marker::PhantomData;
2use core::pin::Pin;
3
4use futures_core::ready;
5use futures_core::stream::{FusedStream, Stream, TryStream};
6use futures_core::task::{Context, Poll};
7#[cfg(feature = "sink")]
8use futures_sink::Sink;
9
10use pin_project_lite::pin_project;
11
12use crate::future::Either;
13use crate::stream::stream::flatten_unordered::{
14    FlattenUnorderedWithFlowController, FlowController, FlowStep,
15};
16use crate::stream::IntoStream;
17use crate::TryStreamExt;
18
19delegate_all!(
20    /// Stream for the [`try_flatten_unordered`](super::TryStreamExt::try_flatten_unordered) method.
21    TryFlattenUnordered<St>(
22        FlattenUnorderedWithFlowController<NestedTryStreamIntoEitherTryStream<St>, PropagateBaseStreamError<St>>
23    ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)]
24        + New[
25            |stream: St, limit: impl Into<Option<usize>>|
26                FlattenUnorderedWithFlowController::new(
27                    NestedTryStreamIntoEitherTryStream::new(stream),
28                    limit.into()
29                )
30        ]
31    where
32        St: TryStream,
33        St::Ok: TryStream,
34        St::Ok: Unpin,
35        <St::Ok as TryStream>::Error: From<St::Error>
36);
37
38pin_project! {
39    /// Emits either successful streams or single-item streams containing the underlying errors.
40    /// This's a wrapper for `FlattenUnordered` to reuse its logic over `TryStream`.
41    #[derive(Debug)]
42    #[must_use = "streams do nothing unless polled"]
43    pub struct NestedTryStreamIntoEitherTryStream<St>
44        where
45            St: TryStream,
46            St::Ok: TryStream,
47            St::Ok: Unpin,
48            <St::Ok as TryStream>::Error: From<St::Error>
49        {
50            #[pin]
51            stream: St
52        }
53}
54
55impl<St> NestedTryStreamIntoEitherTryStream<St>
56where
57    St: TryStream,
58    St::Ok: TryStream + Unpin,
59    <St::Ok as TryStream>::Error: From<St::Error>,
60{
61    fn new(stream: St) -> Self {
62        Self { stream }
63    }
64
65    delegate_access_inner!(stream, St, ());
66}
67
68/// Emits a single item immediately, then stream will be terminated.
69#[derive(Debug, Clone)]
70pub struct Single<T>(Option<T>);
71
72impl<T> Single<T> {
73    /// Constructs new `Single` with the given value.
74    fn new(val: T) -> Self {
75        Self(Some(val))
76    }
77
78    /// Attempts to take inner item immediately. Will always succeed if the stream isn't terminated.
79    fn next_immediate(&mut self) -> Option<T> {
80        self.0.take()
81    }
82}
83
84impl<T> Unpin for Single<T> {}
85
86impl<T> Stream for Single<T> {
87    type Item = T;
88
89    fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
90        Poll::Ready(self.0.take())
91    }
92
93    fn size_hint(&self) -> (usize, Option<usize>) {
94        self.0.as_ref().map_or((0, Some(0)), |_| (1, Some(1)))
95    }
96}
97
98/// Immediately propagates errors occurred in the base stream.
99#[derive(Debug, Clone, Copy)]
100pub struct PropagateBaseStreamError<St>(PhantomData<St>);
101
102type BaseStreamItem<St> = <NestedTryStreamIntoEitherTryStream<St> as Stream>::Item;
103type InnerStreamItem<St> = <BaseStreamItem<St> as Stream>::Item;
104
105impl<St> FlowController<BaseStreamItem<St>, InnerStreamItem<St>> for PropagateBaseStreamError<St>
106where
107    St: TryStream,
108    St::Ok: TryStream + Unpin,
109    <St::Ok as TryStream>::Error: From<St::Error>,
110{
111    fn next_step(item: BaseStreamItem<St>) -> FlowStep<BaseStreamItem<St>, InnerStreamItem<St>> {
112        match item {
113            // A new successful inner stream received
114            st @ Either::Left(_) => FlowStep::Continue(st),
115            // An error encountered
116            Either::Right(mut err) => FlowStep::Return(err.next_immediate().unwrap()),
117        }
118    }
119}
120
121type SingleStreamResult<St> = Single<Result<<St as TryStream>::Ok, <St as TryStream>::Error>>;
122
123impl<St> Stream for NestedTryStreamIntoEitherTryStream<St>
124where
125    St: TryStream,
126    St::Ok: TryStream + Unpin,
127    <St::Ok as TryStream>::Error: From<St::Error>,
128{
129    // Item is either an inner stream or a stream containing a single error.
130    // This will allow using `Either`'s `Stream` implementation as both branches are actually streams of `Result`'s.
131    type Item = Either<IntoStream<St::Ok>, SingleStreamResult<St::Ok>>;
132
133    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
134        let item = ready!(self.project().stream.try_poll_next(cx));
135
136        let out = match item {
137            Some(res) => match res {
138                // Emit successful inner stream as is
139                Ok(stream) => Either::Left(stream.into_stream()),
140                // Wrap an error into a stream containing a single item
141                err @ Err(_) => {
142                    let res = err.map(|_: St::Ok| unreachable!()).map_err(Into::into);
143
144                    Either::Right(Single::new(res))
145                }
146            },
147            None => return Poll::Ready(None),
148        };
149
150        Poll::Ready(Some(out))
151    }
152}
153
154impl<St> FusedStream for NestedTryStreamIntoEitherTryStream<St>
155where
156    St: TryStream + FusedStream,
157    St::Ok: TryStream + Unpin,
158    <St::Ok as TryStream>::Error: From<St::Error>,
159{
160    fn is_terminated(&self) -> bool {
161        self.stream.is_terminated()
162    }
163}
164
165// Forwarding impl of Sink from the underlying stream
166#[cfg(feature = "sink")]
167impl<St, Item> Sink<Item> for NestedTryStreamIntoEitherTryStream<St>
168where
169    St: TryStream + Sink<Item>,
170    St::Ok: TryStream + Unpin,
171    <St::Ok as TryStream>::Error: From<<St as TryStream>::Error>,
172{
173    type Error = <St as Sink<Item>>::Error;
174
175    delegate_sink!(stream, Item);
176}