use fidl::endpoints::RequestStream;
use fuchsia_async as fasync;
use fuchsia_sync::Mutex;
use futures::channel::oneshot::{self, Receiver};
use futures::{ready, Stream, StreamExt};
use pin_project_lite::pin_project;
use std::future::Future as _;
use std::pin::Pin;
use std::sync::{Arc, Weak};
use std::task::{Context, Poll};
use zx::MonotonicDuration;
pub fn until_stalled<RS: RequestStream>(
request_stream: RS,
debounce_interval: MonotonicDuration,
) -> (impl StreamAndControlHandle<RS, <RS as Stream>::Item>, Receiver<Option<zx::Channel>>) {
let (sender, receiver) = oneshot::channel();
let stream = StallableRequestStream::new(request_stream, debounce_interval, move |channel| {
let _ = sender.send(channel);
});
(stream, receiver)
}
pub trait StreamAndControlHandle<RS, Item>: Stream<Item = Item> {
fn control_handle(&self) -> WeakControlHandle<RS>;
}
pin_project! {
pub struct StallableRequestStream<RS, F> {
stream: Arc<Mutex<Option<RS>>>,
debounce_interval: MonotonicDuration,
unbind_callback: Option<F>,
#[pin]
timer: Option<fasync::Timer>,
}
}
impl<RS, F> StallableRequestStream<RS, F> {
pub fn new(stream: RS, debounce_interval: MonotonicDuration, unbind_callback: F) -> Self {
Self {
stream: Arc::new(Mutex::new(Some(stream))),
debounce_interval,
unbind_callback: Some(unbind_callback),
timer: None,
}
}
}
impl<RS: RequestStream + Unpin, F: FnOnce(Option<zx::Channel>) + Unpin>
StreamAndControlHandle<RS, RS::Item> for StallableRequestStream<RS, F>
{
fn control_handle(&self) -> WeakControlHandle<RS> {
WeakControlHandle { stream: Arc::downgrade(&self.stream) }
}
}
pub struct WeakControlHandle<RS> {
stream: Weak<Mutex<Option<RS>>>,
}
impl<RS> WeakControlHandle<RS>
where
RS: RequestStream,
{
pub fn use_control_handle<User, R>(&self, user: User) -> Option<R>
where
User: FnOnce(RS::ControlHandle) -> R,
{
self.stream
.upgrade()
.as_ref()
.map(|stream| stream.lock().as_ref().map(|stream| user(stream.control_handle())))
.flatten()
}
}
impl<RS: RequestStream + Unpin, F: FnOnce(Option<zx::Channel>) + Unpin> Stream
for StallableRequestStream<RS, F>
{
type Item = <RS as Stream>::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let poll_result = self
.stream
.as_ref()
.lock()
.as_mut()
.expect("Stream already resolved")
.poll_next_unpin(cx);
let mut this = self.project();
match poll_result {
Poll::Ready(message) => {
this.timer.set(None);
if message.is_none() {
this.unbind_callback.take().unwrap()(None);
}
Poll::Ready(message)
}
Poll::Pending => {
let debounce_interval = *this.debounce_interval;
loop {
if this.timer.is_none() {
this.timer.set(Some(fasync::Timer::new(debounce_interval)));
}
ready!(this.timer.as_mut().as_pin_mut().unwrap().poll(cx));
this.timer.set(None);
let (inner, is_terminated) = this.stream.lock().take().unwrap().into_inner();
match Arc::try_unwrap(inner) {
Ok(inner) => {
this.unbind_callback.take().unwrap()(Some(
inner.into_channel().into_zx_channel(),
));
return Poll::Ready(None);
}
Err(inner) => {
*this.stream.lock() = Some(RS::from_inner(inner, is_terminated));
}
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert_matches::assert_matches;
use fasync::TestExecutor;
use fidl::endpoints::Proxy;
use fidl::AsHandleRef;
use futures::{FutureExt, TryStreamExt};
use std::pin::pin;
use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
#[fuchsia::test(allow_stalls = false)]
async fn no_message() {
let initial = fasync::MonotonicInstant::from_nanos(0);
TestExecutor::advance_to(initial).await;
const DURATION_NANOS: i64 = 1_000_000;
let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
let (_proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
let (stream, stalled) = until_stalled(stream, idle_duration);
let mut stream = pin!(stream);
assert_matches!(
futures::join!(
stream.next(),
TestExecutor::advance_to(initial + idle_duration).then(|()| stalled)
),
(None, Ok(Some(_)))
);
}
#[fuchsia::test(allow_stalls = false)]
async fn strong_control_handle_blocks_stalling() {
let initial = fasync::MonotonicInstant::from_nanos(0);
TestExecutor::advance_to(initial).await;
const DURATION_NANOS: i64 = 1_000_000;
let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
let (_proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
let (stream, mut stalled) = until_stalled(stream, idle_duration);
let strong_control_handle: fio::DirectoryControlHandle =
stream.control_handle().use_control_handle(|x| x).unwrap();
TestExecutor::advance_to(initial + idle_duration * 2).await;
let mut stream = pin!(stream.fuse());
futures::select! {
_ = stream.next() => unreachable!(),
_ = stalled => unreachable!(),
default => {},
}
drop(strong_control_handle);
assert_matches!(
futures::join!(
stream.next(),
TestExecutor::advance_to(initial + idle_duration * 4).then(|()| stalled)
),
(None, Ok(Some(_)))
);
}
#[fuchsia::test(allow_stalls = false)]
async fn weak_control_handle() {
let initial = fasync::MonotonicInstant::from_nanos(0);
TestExecutor::advance_to(initial).await;
const DURATION_NANOS: i64 = 1_000_000;
let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
let (_proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
let (stream, stalled) = until_stalled(stream, idle_duration);
let weak_control_handle = stream.control_handle();
let mut stream = pin!(stream);
assert_matches!(
futures::join!(
stream.next(),
TestExecutor::advance_to(initial + idle_duration).then(|()| stalled)
),
(None, Ok(Some(_)))
);
weak_control_handle.use_control_handle(|_| unreachable!());
}
#[fuchsia::test(allow_stalls = false)]
async fn one_message() {
let initial = fasync::MonotonicInstant::from_nanos(0);
TestExecutor::advance_to(initial).await;
const DURATION_NANOS: i64 = 1_000_000;
let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
let (stream, stalled) = until_stalled(stream, idle_duration);
let mut stalled = pin!(stalled);
assert_matches!(TestExecutor::poll_until_stalled(&mut stalled).await, Poll::Pending);
let _ = proxy.get_flags();
let mut stream = pin!(stream);
let mut message = pin!(stream.next());
let message = TestExecutor::poll_until_stalled(&mut message).await;
let Poll::Ready(Some(Ok(fio::DirectoryRequest::GetFlags { responder }))) = message else {
panic!("Unexpected {message:?}");
};
responder.send(zx::Status::OK.into_raw(), fio::OpenFlags::empty()).unwrap();
TestExecutor::advance_to(initial + idle_duration * 2).await;
assert!(TestExecutor::poll_until_stalled(&mut stalled).await.is_pending());
let mut message = pin!(stream.next());
assert_matches!(TestExecutor::poll_until_stalled(&mut message).await, Poll::Pending);
assert_matches!(TestExecutor::poll_until_stalled(&mut stalled).await, Poll::Pending);
TestExecutor::advance_to(initial + idle_duration * 3).await;
assert_matches!(message.await, None);
assert_matches!(stalled.await, Ok(Some(_)));
}
#[fuchsia::test(allow_stalls = false)]
async fn pending_reply_blocks_stalling() {
let initial = fasync::MonotonicInstant::from_nanos(0);
TestExecutor::advance_to(initial).await;
const DURATION_NANOS: i64 = 1_000_000;
let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
let (stream, mut stalled) = until_stalled(stream, idle_duration);
let mut stream = pin!(stream.fuse());
let _ = proxy.get_flags();
let message_with_pending_reply = stream.next().await.unwrap();
let Ok(fio::DirectoryRequest::GetFlags { responder, .. }) = message_with_pending_reply
else {
panic!("Unexpected {message_with_pending_reply:?}");
};
TestExecutor::advance_to(initial + idle_duration * 2).await;
futures::select! {
_ = stream.next() => unreachable!(),
_ = stalled => unreachable!(),
default => {},
}
responder.send(zx::Status::OK.into_raw(), fio::OpenFlags::empty()).unwrap();
assert_matches!(
futures::join!(
stream.next(),
TestExecutor::advance_to(initial + idle_duration * 3).then(|()| stalled)
),
(None, Ok(Some(_)))
);
}
#[fuchsia::test(allow_stalls = false)]
async fn completed_stream() {
let initial = fasync::MonotonicInstant::from_nanos(0);
TestExecutor::advance_to(initial).await;
const DURATION_NANOS: i64 = 1_000_000;
let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
let (mut stream, stalled) = until_stalled(stream, idle_duration);
let mut stalled = pin!(stalled);
assert_matches!(TestExecutor::poll_until_stalled(&mut stalled).await, Poll::Pending);
drop(proxy);
let mut stream = pin!(stream);
{
assert_matches!(stream.next().await, None);
drop(stream);
}
assert_matches!(stalled.await, Ok(None));
}
#[fuchsia::test(allow_stalls = false)]
async fn end_to_end() {
let initial = fasync::MonotonicInstant::from_nanos(0);
TestExecutor::advance_to(initial).await;
use fidl_fuchsia_component_client_test::{ProtocolAMarker, ProtocolARequest};
const DURATION_NANOS: i64 = 40_000_000;
let idle_duration = MonotonicDuration::from_nanos(DURATION_NANOS);
let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<ProtocolAMarker>();
let (stream, stalled) = until_stalled(stream, idle_duration);
let task = fasync::Task::spawn(async move {
let mut stream = pin!(stream);
while let Some(request) = stream.try_next().await.unwrap() {
match request {
ProtocolARequest::Foo { responder } => responder.send().unwrap(),
}
}
});
let stalled = fasync::Task::spawn(stalled).map(Arc::new).shared();
let request_duration = MonotonicDuration::from_nanos(DURATION_NANOS / 2);
const NUM_REQUESTS: usize = 5;
let mut deadline = initial;
for _ in 0..NUM_REQUESTS {
proxy.foo().await.unwrap();
deadline += request_duration;
TestExecutor::advance_to(deadline).await;
assert!(stalled.clone().now_or_never().is_none());
}
deadline += idle_duration;
TestExecutor::advance_to(deadline).await;
let server_end = stalled.await;
task.await;
let client = proxy.into_channel().unwrap().into_zx_channel();
assert_eq!(
client.basic_info().unwrap().koid,
(*server_end).as_ref().unwrap().as_ref().unwrap().basic_info().unwrap().related_koid
);
}
}