futures_util/stream/try_stream/
try_unfold.rs

1use super::assert_stream;
2use core::fmt;
3use core::pin::Pin;
4use futures_core::future::TryFuture;
5use futures_core::ready;
6use futures_core::stream::Stream;
7use futures_core::task::{Context, Poll};
8use pin_project_lite::pin_project;
9
10/// Creates a `TryStream` from a seed and a closure returning a `TryFuture`.
11///
12/// This function is the dual for the `TryStream::try_fold()` adapter: while
13/// `TryStream::try_fold()` reduces a `TryStream` to one single value,
14/// `try_unfold()` creates a `TryStream` from a seed value.
15///
16/// `try_unfold()` will call the provided closure with the provided seed, then
17/// wait for the returned `TryFuture` to complete with `(a, b)`. It will then
18/// yield the value `a`, and use `b` as the next internal state.
19///
20/// If the closure returns `None` instead of `Some(TryFuture)`, then the
21/// `try_unfold()` will stop producing items and return `Poll::Ready(None)` in
22/// future calls to `poll()`.
23///
24/// In case of error generated by the returned `TryFuture`, the error will be
25/// returned by the `TryStream`. The `TryStream` will then yield
26/// `Poll::Ready(None)` in future calls to `poll()`.
27///
28/// This function can typically be used when wanting to go from the "world of
29/// futures" to the "world of streams": the provided closure can build a
30/// `TryFuture` using other library functions working on futures, and
31/// `try_unfold()` will turn it into a `TryStream` by repeating the operation.
32///
33/// # Example
34///
35/// ```
36/// # #[derive(Debug, PartialEq)]
37/// # struct SomeError;
38/// # futures::executor::block_on(async {
39/// use futures::stream::{self, TryStreamExt};
40///
41/// let stream = stream::try_unfold(0, |state| async move {
42///     if state < 0 {
43///         return Err(SomeError);
44///     }
45///
46///     if state <= 2 {
47///         let next_state = state + 1;
48///         let yielded = state * 2;
49///         Ok(Some((yielded, next_state)))
50///     } else {
51///         Ok(None)
52///     }
53/// });
54///
55/// let result: Result<Vec<i32>, _> = stream.try_collect().await;
56/// assert_eq!(result, Ok(vec![0, 2, 4]));
57/// # });
58/// ```
59pub fn try_unfold<T, F, Fut, Item>(init: T, f: F) -> TryUnfold<T, F, Fut>
60where
61    F: FnMut(T) -> Fut,
62    Fut: TryFuture<Ok = Option<(Item, T)>>,
63{
64    assert_stream::<Result<Item, Fut::Error>, _>(TryUnfold { f, state: Some(init), fut: None })
65}
66
67pin_project! {
68    /// Stream for the [`try_unfold`] function.
69    #[must_use = "streams do nothing unless polled"]
70    pub struct TryUnfold<T, F, Fut> {
71        f: F,
72        state: Option<T>,
73        #[pin]
74        fut: Option<Fut>,
75    }
76}
77
78impl<T, F, Fut> fmt::Debug for TryUnfold<T, F, Fut>
79where
80    T: fmt::Debug,
81    Fut: fmt::Debug,
82{
83    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84        f.debug_struct("TryUnfold").field("state", &self.state).field("fut", &self.fut).finish()
85    }
86}
87
88impl<T, F, Fut, Item> Stream for TryUnfold<T, F, Fut>
89where
90    F: FnMut(T) -> Fut,
91    Fut: TryFuture<Ok = Option<(Item, T)>>,
92{
93    type Item = Result<Item, Fut::Error>;
94
95    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
96        let mut this = self.project();
97
98        if let Some(state) = this.state.take() {
99            this.fut.set(Some((this.f)(state)));
100        }
101
102        match this.fut.as_mut().as_pin_mut() {
103            None => {
104                // The future previously errored
105                Poll::Ready(None)
106            }
107            Some(future) => {
108                let step = ready!(future.try_poll(cx));
109                this.fut.set(None);
110
111                match step {
112                    Ok(Some((item, next_state))) => {
113                        *this.state = Some(next_state);
114                        Poll::Ready(Some(Ok(item)))
115                    }
116                    Ok(None) => Poll::Ready(None),
117                    Err(e) => Poll::Ready(Some(Err(e))),
118                }
119            }
120        }
121    }
122}