async_test_helpers/
lib.rsuse async_utils::PollExt;
use fuchsia_async as fasync;
use futures::future::Either;
use futures::stream::{Stream, StreamExt};
use futures::task::Poll;
use futures::Future;
use std::pin::pin;
pub fn run_while<BackgroundFut, ResultFut, Out>(
exec: &mut fasync::TestExecutor,
background_fut: BackgroundFut,
result_fut: ResultFut,
) -> (Out, BackgroundFut)
where
BackgroundFut: Future + Unpin,
ResultFut: Future<Output = Out>,
{
let result_fut = pin!(result_fut);
let mut select_fut = futures::future::select(background_fut, result_fut);
loop {
match exec.run_until_stalled(&mut select_fut) {
Poll::Ready(Either::Right(r)) => return r,
Poll::Ready(Either::Left(_)) => panic!("Background future finished"),
Poll::Pending => {}
}
}
}
#[track_caller]
pub fn expect_stream_item<S: Stream + Unpin>(
exec: &mut fasync::TestExecutor,
stream: &mut S,
) -> S::Item {
exec.run_until_stalled(&mut stream.next()).expect("stream item").expect("not terminated")
}
#[track_caller]
pub fn expect_stream_pending<S: Stream + Unpin>(exec: &mut fasync::TestExecutor, stream: &mut S) {
let _ = exec.run_until_stalled(&mut stream.next()).expect_pending("stream pending");
}
#[track_caller]
pub fn expect_stream_terminated<S: Stream + Unpin>(
exec: &mut fasync::TestExecutor,
stream: &mut S,
) {
let result = exec.run_until_stalled(&mut stream.next()).expect("stream item");
assert!(result.is_none());
}