async_helpers/
maybe_stream.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use futures::stream::{FusedStream, Stream};
6use futures::task::{Context, Poll};
7use std::fmt::Debug;
8use std::pin::Pin;
9
10/// A optional `Stream` that returns the values of the wrapped stream until the wrapped stream is
11/// exhausted.
12///
13/// MaybeStream yields items of type S::Item.
14/// The `Stream` implementation will return `Poll::Pending` if no stream is set.
15#[derive(Debug)]
16pub struct MaybeStream<S: Stream>(Option<S>);
17
18impl<S: Stream + Unpin> MaybeStream<S> {
19    /// Resets the underlying stream back into a Pending state and returns the Stream, if set.
20    pub fn take(&mut self) -> Option<S> {
21        self.0.take()
22    }
23
24    pub fn inner_mut(&mut self) -> Option<&mut S> {
25        self.0.as_mut()
26    }
27
28    pub fn is_some(&self) -> bool {
29        self.0.is_some()
30    }
31
32    /// Set the current stream.
33    ///
34    /// This method will not call `poll` on the submitted stream. The caller must ensure
35    /// that `poll_next` is called in order to receive wake-up notifications for the given
36    /// stream.
37    pub fn set(&mut self, stream: S) {
38        self.0 = Some(stream)
39    }
40
41    fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
42        Pin::new(self.0.as_mut().unwrap()).poll_next(cx)
43    }
44}
45
46impl<S: Stream> Default for MaybeStream<S> {
47    fn default() -> Self {
48        Self(None)
49    }
50}
51
52impl<S: Stream + Unpin> From<Option<S>> for MaybeStream<S> {
53    fn from(src: Option<S>) -> Self {
54        Self(src)
55    }
56}
57
58impl<S: Stream + Unpin> Stream for MaybeStream<S> {
59    type Item = S::Item;
60
61    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
62        if self.0.is_none() {
63            return Poll::Pending;
64        }
65        self.get_mut().poll_next(cx)
66    }
67}
68
69/// A MaybeStream with no inner stream is never done because a new stream can always be set with
70/// items.
71impl<S: FusedStream + Stream + Unpin> FusedStream for MaybeStream<S> {
72    fn is_terminated(&self) -> bool {
73        if self.0.is_none() {
74            false
75        } else {
76            self.0.as_ref().unwrap().is_terminated()
77        }
78    }
79}
80
81impl<S: Stream + std::fmt::Display> std::fmt::Display for MaybeStream<S> {
82    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83        let display_str = if let Some(st) = &self.0 { format!("{}", st) } else { "".to_string() };
84        write!(f, "MaybeStream({})", display_str)
85    }
86}
87
88#[cfg(test)]
89mod tests {
90    use super::*;
91
92    use fuchsia_async::TestExecutor;
93    use futures::stream::StreamExt;
94
95    struct CountStream {
96        count: usize,
97    }
98
99    impl CountStream {
100        fn new() -> CountStream {
101            CountStream { count: 0 }
102        }
103    }
104
105    impl Stream for CountStream {
106        type Item = usize;
107
108        fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
109            self.count += 1;
110            Poll::Ready(Some(self.count))
111        }
112    }
113
114    #[test]
115    fn maybestream() {
116        let mut exec = TestExecutor::new();
117
118        let mut s = MaybeStream::default();
119
120        let mut next_fut = s.next();
121        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut next_fut));
122        next_fut = s.next();
123        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut next_fut));
124
125        s.set(CountStream::new());
126
127        next_fut = s.next();
128        assert_eq!(Poll::Ready(Some(1)), exec.run_until_stalled(&mut next_fut));
129
130        next_fut = s.next();
131        assert_eq!(Poll::Ready(Some(2)), exec.run_until_stalled(&mut next_fut));
132    }
133}