futures_util/stream/stream/
cycle.rs

1use core::pin::Pin;
2use core::usize;
3use futures_core::ready;
4use futures_core::stream::{FusedStream, Stream};
5use futures_core::task::{Context, Poll};
6use pin_project_lite::pin_project;
7
8pin_project! {
9    /// Stream for the [`cycle`](super::StreamExt::cycle) method.
10    #[derive(Debug)]
11    #[must_use = "streams do nothing unless polled"]
12    pub struct Cycle<St> {
13        orig: St,
14        #[pin]
15        stream: St,
16    }
17}
18
19impl<St> Cycle<St>
20where
21    St: Clone + Stream,
22{
23    pub(super) fn new(stream: St) -> Self {
24        Self { orig: stream.clone(), stream }
25    }
26}
27
28impl<St> Stream for Cycle<St>
29where
30    St: Clone + Stream,
31{
32    type Item = St::Item;
33
34    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
35        let mut this = self.project();
36
37        match ready!(this.stream.as_mut().poll_next(cx)) {
38            None => {
39                this.stream.set(this.orig.clone());
40                this.stream.poll_next(cx)
41            }
42            item => Poll::Ready(item),
43        }
44    }
45
46    fn size_hint(&self) -> (usize, Option<usize>) {
47        // the cycle stream is either empty or infinite
48        match self.orig.size_hint() {
49            size @ (0, Some(0)) => size,
50            (0, _) => (0, None),
51            _ => (usize::max_value(), None),
52        }
53    }
54}
55
56impl<St> FusedStream for Cycle<St>
57where
58    St: Clone + Stream,
59{
60    fn is_terminated(&self) -> bool {
61        // the cycle stream is either empty or infinite
62        if let (0, Some(0)) = self.size_hint() {
63            true
64        } else {
65            false
66        }
67    }
68}