1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use futures::{
stream::{FusedStream, Stream, StreamExt},
task::Poll,
};
/// `Stream`s should indicate their termination by returning an item of `Poll::Ready(None)`.
/// Once a Stream terminated it is generally not safe to poll the Stream any longer.
/// Support for safely polling already terminated Streams is provided by the `Fuse` wrapper.
/// The wrapper only polls not yet terminated Streams. If the underlying Stream already terminated
/// Fuse immediately returns `Poll::Ready(None)` without further polling its wrapped Stream.
/// Some Streams never terminate, such as `FuturesOrdered`. To indicate that there is currently
/// no work scheduled, these Streams return `Poll::Ready(None)`. However, when these Streams are
/// used in combination with Fuse the Streams would get polled once and immediately declared to have
/// terminated due to their return value oof `Poll::Ready(None)`. Instead, such Streams should
/// return `Poll::Pending`. `FusePending` provides this mapping.
/// Note: This wrapper should only be used if the underlying Stream defined its behavior if no
/// work is scheduled. Usually, such Streams are expected to never terminate.
pub struct FusePending<F>(pub F);
impl<F> std::ops::Deref for FusePending<F> {
type Target = F;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<F> std::ops::DerefMut for FusePending<F> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<F: Stream + std::marker::Unpin> Stream for FusePending<F> {
type Item = F::Item;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> futures::task::Poll<Option<Self::Item>> {
match self.0.poll_next_unpin(cx) {
Poll::Ready(None) => Poll::Pending,
other => other,
}
}
}
impl<F: Stream + Unpin> FusedStream for FusePending<F> {
fn is_terminated(&self) -> bool {
false
}
}
#[cfg(test)]
mod tests {
use {
super::*,
fuchsia_async as fasync,
futures::{channel::mpsc, future, select, stream::FuturesOrdered},
std::pin::pin,
};
#[fuchsia::test]
fn infinite_stream() {
let mut exec = fasync::TestExecutor::new();
let (sink, stream) = mpsc::unbounded();
let fut = do_correct_work(stream);
let mut fut = pin!(fut);
// Pass over explicit next() call.
assert_eq!(Poll::Pending, exec.run_until_stalled(&mut fut));
sink.unbounded_send(42).expect("failed sending message");
assert_eq!(Poll::Ready(Ok(42)), exec.run_until_stalled(&mut fut));
}
#[fuchsia::test]
fn no_infinite_stream() {
let mut exec = fasync::TestExecutor::new();
let (sink, stream) = mpsc::unbounded();
let fut = do_broken_work(stream);
let mut fut = pin!(fut);
// Pass over explicit next() call.
assert_eq!(Poll::Pending, exec.run_until_stalled(&mut fut));
sink.unbounded_send(42).expect("failed sending message");
assert_eq!(Poll::Pending, exec.run_until_stalled(&mut fut));
}
async fn do_broken_work(mut stream: mpsc::UnboundedReceiver<u8>) -> Result<u8, ()> {
let mut queue = FuturesOrdered::new().fuse();
loop {
select! {
x = stream.next() => if let Some(x) = x {
queue.get_mut().push(future::ok::<_, ()>(x));
},
x = queue.next() => if let Some(x) = x {
return x;
}
}
}
}
async fn do_correct_work(mut stream: mpsc::UnboundedReceiver<u8>) -> Result<u8, ()> {
let mut queue = FusePending(FuturesOrdered::new());
loop {
select! {
x = stream.next() => if let Some(x) = x {
queue.push(future::ok::<_, ()>(x));
},
x = queue.next() => if let Some(x) = x {
return x;
}
}
}
}
}