wlancfg_lib/util/
fuse_pending.rs1use futures::stream::{FusedStream, Stream, StreamExt};
6use futures::task::Poll;
7
8pub struct FusePending<F>(pub F);
21
22impl<F> std::ops::Deref for FusePending<F> {
23 type Target = F;
24
25 fn deref(&self) -> &Self::Target {
26 &self.0
27 }
28}
29
30impl<F> std::ops::DerefMut for FusePending<F> {
31 fn deref_mut(&mut self) -> &mut Self::Target {
32 &mut self.0
33 }
34}
35
36impl<F: Stream + std::marker::Unpin> Stream for FusePending<F> {
37 type Item = F::Item;
38
39 fn poll_next(
40 mut self: std::pin::Pin<&mut Self>,
41 cx: &mut std::task::Context<'_>,
42 ) -> futures::task::Poll<Option<Self::Item>> {
43 match self.0.poll_next_unpin(cx) {
44 Poll::Ready(None) => Poll::Pending,
45 other => other,
46 }
47 }
48}
49
50impl<F: Stream + Unpin> FusedStream for FusePending<F> {
51 fn is_terminated(&self) -> bool {
52 false
53 }
54}
55
56#[cfg(test)]
57mod tests {
58 use super::*;
59 use fuchsia_async as fasync;
60 use futures::channel::mpsc;
61 use futures::stream::FuturesOrdered;
62 use futures::{future, select};
63 use std::pin::pin;
64
65 #[fuchsia::test]
66 fn infinite_stream() {
67 let mut exec = fasync::TestExecutor::new();
68
69 let (sink, stream) = mpsc::unbounded();
70
71 let fut = do_correct_work(stream);
72 let mut fut = pin!(fut);
73
74 assert_eq!(Poll::Pending, exec.run_until_stalled(&mut fut));
76
77 sink.unbounded_send(42).expect("failed sending message");
78
79 assert_eq!(Poll::Ready(Ok(42)), exec.run_until_stalled(&mut fut));
80 }
81
82 #[fuchsia::test]
83 fn no_infinite_stream() {
84 let mut exec = fasync::TestExecutor::new();
85
86 let (sink, stream) = mpsc::unbounded();
87 let fut = do_broken_work(stream);
88 let mut fut = pin!(fut);
89
90 assert_eq!(Poll::Pending, exec.run_until_stalled(&mut fut));
92
93 sink.unbounded_send(42).expect("failed sending message");
94
95 assert_eq!(Poll::Pending, exec.run_until_stalled(&mut fut));
96 }
97
98 async fn do_broken_work(mut stream: mpsc::UnboundedReceiver<u8>) -> Result<u8, ()> {
99 let mut queue = FuturesOrdered::new().fuse();
100
101 loop {
102 select! {
103 x = stream.next() => if let Some(x) = x {
104 queue.get_mut().push_back(future::ok::<_, ()>(x));
105 },
106 x = queue.next() => if let Some(x) = x {
107 return x;
108 }
109 }
110 }
111 }
112
113 async fn do_correct_work(mut stream: mpsc::UnboundedReceiver<u8>) -> Result<u8, ()> {
114 let mut queue = FusePending(FuturesOrdered::new());
115
116 loop {
117 select! {
118 x = stream.next() => if let Some(x) = x {
119 queue.push_back(future::ok::<_, ()>(x));
120 },
121 x = queue.next() => if let Some(x) = x {
122 return x;
123 }
124 }
125 }
126 }
127}