wlancfg_lib/util/
fuse_pending.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use futures::stream::{FusedStream, Stream, StreamExt};
6use futures::task::Poll;
7
8/// `Stream`s should indicate their termination by returning an item of `Poll::Ready(None)`.
9/// Once a Stream terminated it is generally not safe to poll the Stream any longer.
10/// Support for safely polling already terminated Streams is provided by the `Fuse` wrapper.
11/// The wrapper only polls not yet terminated Streams. If the underlying Stream already terminated
12/// Fuse immediately returns `Poll::Ready(None)` without further polling its wrapped Stream.
13/// Some Streams never terminate, such as `FuturesOrdered`. To indicate that there is currently
14/// no work scheduled, these Streams return `Poll::Ready(None)`. However, when these Streams are
15/// used in combination with Fuse the Streams would get polled once and immediately declared to have
16/// terminated due to their return value oof `Poll::Ready(None)`. Instead, such Streams should
17/// return `Poll::Pending`. `FusePending` provides this mapping.
18/// Note: This wrapper should only be used if the underlying Stream defined its behavior if no
19/// work is scheduled. Usually, such Streams are expected to never terminate.
20pub struct FusePending<F>(pub F);
21
22impl<F> std::ops::Deref for FusePending<F> {
23    type Target = F;
24
25    fn deref(&self) -> &Self::Target {
26        &self.0
27    }
28}
29
30impl<F> std::ops::DerefMut for FusePending<F> {
31    fn deref_mut(&mut self) -> &mut Self::Target {
32        &mut self.0
33    }
34}
35
36impl<F: Stream + std::marker::Unpin> Stream for FusePending<F> {
37    type Item = F::Item;
38
39    fn poll_next(
40        mut self: std::pin::Pin<&mut Self>,
41        cx: &mut std::task::Context<'_>,
42    ) -> futures::task::Poll<Option<Self::Item>> {
43        match self.0.poll_next_unpin(cx) {
44            Poll::Ready(None) => Poll::Pending,
45            other => other,
46        }
47    }
48}
49
50impl<F: Stream + Unpin> FusedStream for FusePending<F> {
51    fn is_terminated(&self) -> bool {
52        false
53    }
54}
55
56#[cfg(test)]
57mod tests {
58    use super::*;
59    use fuchsia_async as fasync;
60    use futures::channel::mpsc;
61    use futures::stream::FuturesOrdered;
62    use futures::{future, select};
63    use std::pin::pin;
64
65    #[fuchsia::test]
66    fn infinite_stream() {
67        let mut exec = fasync::TestExecutor::new();
68
69        let (sink, stream) = mpsc::unbounded();
70
71        let fut = do_correct_work(stream);
72        let mut fut = pin!(fut);
73
74        // Pass over explicit next() call.
75        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut fut));
76
77        sink.unbounded_send(42).expect("failed sending message");
78
79        assert_eq!(Poll::Ready(Ok(42)), exec.run_until_stalled(&mut fut));
80    }
81
82    #[fuchsia::test]
83    fn no_infinite_stream() {
84        let mut exec = fasync::TestExecutor::new();
85
86        let (sink, stream) = mpsc::unbounded();
87        let fut = do_broken_work(stream);
88        let mut fut = pin!(fut);
89
90        // Pass over explicit next() call.
91        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut fut));
92
93        sink.unbounded_send(42).expect("failed sending message");
94
95        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut fut));
96    }
97
98    async fn do_broken_work(mut stream: mpsc::UnboundedReceiver<u8>) -> Result<u8, ()> {
99        let mut queue = FuturesOrdered::new().fuse();
100
101        loop {
102            select! {
103                x = stream.next() => if let Some(x) = x {
104                    queue.get_mut().push_back(future::ok::<_, ()>(x));
105                },
106                x = queue.next() => if let Some(x) = x {
107                    return x;
108                }
109            }
110        }
111    }
112
113    async fn do_correct_work(mut stream: mpsc::UnboundedReceiver<u8>) -> Result<u8, ()> {
114        let mut queue = FusePending(FuturesOrdered::new());
115
116        loop {
117            select! {
118                x = stream.next() => if let Some(x) = x {
119                    queue.push_back(future::ok::<_, ()>(x));
120                },
121                x = queue.next() => if let Some(x) = x {
122                    return x;
123                }
124            }
125        }
126    }
127}