async_helpers/
maybe_stream.rs
1use futures::stream::{FusedStream, Stream};
6use futures::task::{Context, Poll};
7use std::fmt::Debug;
8use std::pin::Pin;
9
10#[derive(Debug)]
16pub struct MaybeStream<S: Stream>(Option<S>);
17
18impl<S: Stream + Unpin> MaybeStream<S> {
19 pub fn take(&mut self) -> Option<S> {
21 self.0.take()
22 }
23
24 pub fn inner_mut(&mut self) -> Option<&mut S> {
25 self.0.as_mut()
26 }
27
28 pub fn is_some(&self) -> bool {
29 self.0.is_some()
30 }
31
32 pub fn set(&mut self, stream: S) {
38 self.0 = Some(stream)
39 }
40
41 fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
42 Pin::new(self.0.as_mut().unwrap()).poll_next(cx)
43 }
44}
45
46impl<S: Stream> Default for MaybeStream<S> {
47 fn default() -> Self {
48 Self(None)
49 }
50}
51
52impl<S: Stream + Unpin> From<Option<S>> for MaybeStream<S> {
53 fn from(src: Option<S>) -> Self {
54 Self(src)
55 }
56}
57
58impl<S: Stream + Unpin> Stream for MaybeStream<S> {
59 type Item = S::Item;
60
61 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
62 if self.0.is_none() {
63 return Poll::Pending;
64 }
65 self.get_mut().poll_next(cx)
66 }
67}
68
69impl<S: FusedStream + Stream + Unpin> FusedStream for MaybeStream<S> {
72 fn is_terminated(&self) -> bool {
73 if self.0.is_none() {
74 false
75 } else {
76 self.0.as_ref().unwrap().is_terminated()
77 }
78 }
79}
80
81impl<S: Stream + std::fmt::Display> std::fmt::Display for MaybeStream<S> {
82 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83 let display_str = if let Some(st) = &self.0 { format!("{}", st) } else { "".to_string() };
84 write!(f, "MaybeStream({})", display_str)
85 }
86}
87
88#[cfg(test)]
89mod tests {
90 use super::*;
91
92 use fuchsia_async::TestExecutor;
93 use futures::stream::StreamExt;
94
95 struct CountStream {
96 count: usize,
97 }
98
99 impl CountStream {
100 fn new() -> CountStream {
101 CountStream { count: 0 }
102 }
103 }
104
105 impl Stream for CountStream {
106 type Item = usize;
107
108 fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
109 self.count += 1;
110 Poll::Ready(Some(self.count))
111 }
112 }
113
114 #[test]
115 fn maybestream() {
116 let mut exec = TestExecutor::new();
117
118 let mut s = MaybeStream::default();
119
120 let mut next_fut = s.next();
121 assert_eq!(Poll::Pending, exec.run_until_stalled(&mut next_fut));
122 next_fut = s.next();
123 assert_eq!(Poll::Pending, exec.run_until_stalled(&mut next_fut));
124
125 s.set(CountStream::new());
126
127 next_fut = s.next();
128 assert_eq!(Poll::Ready(Some(1)), exec.run_until_stalled(&mut next_fut));
129
130 next_fut = s.next();
131 assert_eq!(Poll::Ready(Some(2)), exec.run_until_stalled(&mut next_fut));
132 }
133}