futures_util/stream/stream/
chain.rs

1use core::pin::Pin;
2use futures_core::ready;
3use futures_core::stream::{FusedStream, Stream};
4use futures_core::task::{Context, Poll};
5use pin_project_lite::pin_project;
6
7pin_project! {
8    /// Stream for the [`chain`](super::StreamExt::chain) method.
9    #[derive(Debug)]
10    #[must_use = "streams do nothing unless polled"]
11    pub struct Chain<St1, St2> {
12        #[pin]
13        first: Option<St1>,
14        #[pin]
15        second: St2,
16    }
17}
18
19// All interactions with `Pin<&mut Chain<..>>` happen through these methods
20impl<St1, St2> Chain<St1, St2>
21where
22    St1: Stream,
23    St2: Stream<Item = St1::Item>,
24{
25    pub(super) fn new(stream1: St1, stream2: St2) -> Self {
26        Self { first: Some(stream1), second: stream2 }
27    }
28}
29
30impl<St1, St2> FusedStream for Chain<St1, St2>
31where
32    St1: Stream,
33    St2: FusedStream<Item = St1::Item>,
34{
35    fn is_terminated(&self) -> bool {
36        self.first.is_none() && self.second.is_terminated()
37    }
38}
39
40impl<St1, St2> Stream for Chain<St1, St2>
41where
42    St1: Stream,
43    St2: Stream<Item = St1::Item>,
44{
45    type Item = St1::Item;
46
47    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
48        let mut this = self.project();
49        if let Some(first) = this.first.as_mut().as_pin_mut() {
50            if let Some(item) = ready!(first.poll_next(cx)) {
51                return Poll::Ready(Some(item));
52            }
53
54            this.first.set(None);
55        }
56        this.second.poll_next(cx)
57    }
58
59    fn size_hint(&self) -> (usize, Option<usize>) {
60        if let Some(first) = &self.first {
61            let (first_lower, first_upper) = first.size_hint();
62            let (second_lower, second_upper) = self.second.size_hint();
63
64            let lower = first_lower.saturating_add(second_lower);
65
66            let upper = match (first_upper, second_upper) {
67                (Some(x), Some(y)) => x.checked_add(y),
68                _ => None,
69            };
70
71            (lower, upper)
72        } else {
73            self.second.size_hint()
74        }
75    }
76}