1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

use futures::{
    stream::{FusedStream, Stream, StreamExt},
    task::Poll,
};

/// `Stream`s should indicate their termination by returning an item of `Poll::Ready(None)`.
/// Once a Stream terminated it is generally not safe to poll the Stream any longer.
/// Support for safely polling already terminated Streams is provided by the `Fuse` wrapper.
/// The wrapper only polls not yet terminated Streams. If the underlying Stream already terminated
/// Fuse immediately returns `Poll::Ready(None)` without further polling its wrapped Stream.
/// Some Streams never terminate, such as `FuturesOrdered`. To indicate that there is currently
/// no work scheduled, these Streams return `Poll::Ready(None)`. However, when these Streams are
/// used in combination with Fuse the Streams would get polled once and immediately declared to have
/// terminated due to their return value oof `Poll::Ready(None)`. Instead, such Streams should
/// return `Poll::Pending`. `FusePending` provides this mapping.
/// Note: This wrapper should only be used if the underlying Stream defined its behavior if no
/// work is scheduled. Usually, such Streams are expected to never terminate.
pub struct FusePending<F>(pub F);

impl<F> std::ops::Deref for FusePending<F> {
    type Target = F;

    fn deref(&self) -> &Self::Target {
        &self.0
    }
}

impl<F> std::ops::DerefMut for FusePending<F> {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.0
    }
}

impl<F: Stream + std::marker::Unpin> Stream for FusePending<F> {
    type Item = F::Item;

    fn poll_next(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> futures::task::Poll<Option<Self::Item>> {
        match self.0.poll_next_unpin(cx) {
            Poll::Ready(None) => Poll::Pending,
            other => other,
        }
    }
}

impl<F: Stream + Unpin> FusedStream for FusePending<F> {
    fn is_terminated(&self) -> bool {
        false
    }
}

#[cfg(test)]
mod tests {
    use {
        super::*,
        fuchsia_async as fasync,
        futures::{channel::mpsc, future, select, stream::FuturesOrdered},
        std::pin::pin,
    };

    #[fuchsia::test]
    fn infinite_stream() {
        let mut exec = fasync::TestExecutor::new();

        let (sink, stream) = mpsc::unbounded();

        let fut = do_correct_work(stream);
        let mut fut = pin!(fut);

        // Pass over explicit next() call.
        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut fut));

        sink.unbounded_send(42).expect("failed sending message");

        assert_eq!(Poll::Ready(Ok(42)), exec.run_until_stalled(&mut fut));
    }

    #[fuchsia::test]
    fn no_infinite_stream() {
        let mut exec = fasync::TestExecutor::new();

        let (sink, stream) = mpsc::unbounded();
        let fut = do_broken_work(stream);
        let mut fut = pin!(fut);

        // Pass over explicit next() call.
        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut fut));

        sink.unbounded_send(42).expect("failed sending message");

        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut fut));
    }

    async fn do_broken_work(mut stream: mpsc::UnboundedReceiver<u8>) -> Result<u8, ()> {
        let mut queue = FuturesOrdered::new().fuse();

        loop {
            select! {
                x = stream.next() => if let Some(x) = x {
                    queue.get_mut().push(future::ok::<_, ()>(x));
                },
                x = queue.next() => if let Some(x) = x {
                    return x;
                }
            }
        }
    }

    async fn do_correct_work(mut stream: mpsc::UnboundedReceiver<u8>) -> Result<u8, ()> {
        let mut queue = FusePending(FuturesOrdered::new());

        loop {
            select! {
                x = stream.next() => if let Some(x) = x {
                    queue.push(future::ok::<_, ()>(x));
                },
                x = queue.next() => if let Some(x) = x {
                    return x;
                }
            }
        }
    }
}