use anyhow::{format_err, Context as _, Error};
use fidl_fuchsia_input_report::{
InputReport, InputReportsReaderRequest, InputReportsReaderRequestStream,
};
use futures::{StreamExt, TryStreamExt};
use std::convert::TryFrom as _;
pub(super) struct InputReportsReader {
pub(super) request_stream: InputReportsReaderRequestStream,
pub(super) report_receiver: futures::channel::mpsc::UnboundedReceiver<InputReport>,
}
impl InputReportsReader {
pub(super) async fn into_future(self) -> Result<(), Error> {
let chunk_size = usize::try_from(fidl_fuchsia_input_report::MAX_DEVICE_REPORT_COUNT)
.context("converting MAX_DEVICE_REPORT_COUNT to usize")?;
let mut reports = self.report_receiver.ready_chunks(chunk_size).fuse();
self.request_stream
.zip(reports.by_ref())
.map(|(request, reports)| match request {
Ok(request) => Ok((request, reports)),
Err(e) => Err(anyhow::Error::from(e).context("while reading reader request")),
})
.try_for_each(|request_and_reports| async {
match request_and_reports {
(InputReportsReaderRequest::ReadInputReports { responder }, reports) => {
responder
.send(Ok(&reports))
.map_err(anyhow::Error::from)
.context("while sending reports")
}
}
})
.await?;
match reports.is_done() {
true => Ok(()),
false => Err(format_err!("request_stream terminated with reports still pending")),
}
}
}
#[cfg(test)]
mod tests {
use super::{InputReport, InputReportsReader};
use anyhow::{Context as _, Error};
use fidl::endpoints;
use fidl_fuchsia_input_report::{InputReportsReaderMarker, MAX_DEVICE_REPORT_COUNT};
use fuchsia_async as fasync;
use futures::future;
mod report_count {
use super::*;
use futures::pin_mut;
use futures::task::Poll;
#[fasync::run_until_stalled(test)]
async fn serves_single_report() -> Result<(), Error> {
let (proxy, request_stream) =
endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
let (report_sender, report_receiver) =
futures::channel::mpsc::unbounded::<InputReport>();
let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
report_sender
.unbounded_send(InputReport::default())
.expect("sending empty InputReport");
let reports_fut = proxy.read_input_reports();
std::mem::drop(proxy); std::mem::drop(report_sender); let (_, reports_result) = future::join(reader_fut, reports_fut).await;
let reports = reports_result
.expect("fidl error")
.map_err(zx::Status::from_raw)
.expect("service error");
assert_eq!(reports.len(), 1, "incorrect reports length");
Ok(())
}
#[fasync::run_until_stalled(test)]
async fn serves_max_report_count_reports() -> Result<(), Error> {
let max_reports = usize::try_from(MAX_DEVICE_REPORT_COUNT)
.context("internal error converting MAX_DEVICE_REPORT_COUNT to usize")?;
let (proxy, request_stream) =
endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
let (report_sender, report_receiver) =
futures::channel::mpsc::unbounded::<InputReport>();
let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
for _ in 0..max_reports {
report_sender
.unbounded_send(InputReport::default())
.expect("sending empty InputReport");
}
let reports_fut = proxy.read_input_reports();
std::mem::drop(proxy); std::mem::drop(report_sender); let (_, reports_result) = future::join(reader_fut, reports_fut).await;
let reports = reports_result
.expect("fidl error")
.map_err(zx::Status::from_raw)
.expect("service error");
assert_eq!(reports.len(), max_reports, "incorrect reports length");
Ok(())
}
#[test]
fn splits_overflowed_reports_to_next_read() -> Result<(), Error> {
let mut executor = fasync::TestExecutor::new();
let max_reports = usize::try_from(MAX_DEVICE_REPORT_COUNT)
.context("internal error converting MAX_DEVICE_REPORT_COUNT to usize")?;
let (proxy, request_stream) =
endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
let (report_sender, report_receiver) =
futures::channel::mpsc::unbounded::<InputReport>();
let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
for _ in 0..max_reports + 1 {
report_sender
.unbounded_send(InputReport::default())
.expect("sending empty InputReport");
}
pin_mut!(reader_fut);
let reports_fut = proxy.read_input_reports();
let _ = executor.run_until_stalled(&mut reader_fut);
pin_mut!(reports_fut);
match executor.run_until_stalled(&mut reports_fut) {
Poll::Pending => panic!("read did not complete (1st query)"),
Poll::Ready(res) => {
let reports = res
.expect("fidl error")
.map_err(zx::Status::from_raw)
.expect("service error");
assert_eq!(reports.len(), max_reports, "incorrect reports length (1st query)");
}
}
let reports_fut = proxy.read_input_reports();
std::mem::drop(report_sender); let _ = executor.run_until_stalled(&mut reader_fut);
pin_mut!(reports_fut);
match executor.run_until_stalled(&mut reports_fut) {
Poll::Pending => panic!("read did not complete (2nd query)"),
Poll::Ready(res) => {
let reports = res
.expect("fidl error")
.map_err(zx::Status::from_raw)
.expect("service error");
assert_eq!(reports.len(), 1, "incorrect reports length (2nd query)");
}
}
Ok(())
}
}
mod future_resolution {
use super::*;
use assert_matches::assert_matches;
#[fasync::run_until_stalled(test)]
async fn resolves_to_ok_when_all_reports_are_written() -> Result<(), Error> {
let (proxy, request_stream) =
endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
let (report_sender, report_receiver) =
futures::channel::mpsc::unbounded::<InputReport>();
let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
report_sender
.unbounded_send(InputReport::default())
.expect("sending empty InputReport");
let _reports_fut = proxy.read_input_reports();
std::mem::drop(report_sender); assert_matches!(reader_fut.await, Ok(()));
Ok(())
}
#[fasync::run_until_stalled(test)]
async fn resolves_to_err_when_request_stream_is_terminated_before_reports_are_written(
) -> Result<(), Error> {
let (proxy, request_stream) =
endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
let (report_sender, report_receiver) =
futures::channel::mpsc::unbounded::<InputReport>();
let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
report_sender
.unbounded_send(InputReport::default())
.expect("sending empty InputReport");
std::mem::drop(proxy); assert_matches!(reader_fut.await, Err(_));
Ok(())
}
#[fasync::run_until_stalled(test)]
async fn resolves_to_err_if_request_stream_yields_error() -> Result<(), Error> {
let (client_end, request_stream) =
endpoints::create_request_stream::<InputReportsReaderMarker>();
let (report_sender, report_receiver) =
futures::channel::mpsc::unbounded::<InputReport>();
let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
report_sender
.unbounded_send(InputReport::default())
.expect("sending empty InputReport");
std::mem::drop(report_sender); client_end
.into_channel()
.write(b"not a valid FIDL message", &mut [])
.expect("internal error writing to channel");
assert_matches!(reader_fut.await, Err(_)); Ok(())
}
#[fasync::run_until_stalled(test)]
async fn immediately_resolves_to_ok_when_reports_is_initially_empty() -> Result<(), Error> {
let (_proxy, request_stream) =
endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
let (report_sender, report_receiver) =
futures::channel::mpsc::unbounded::<InputReport>();
let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
std::mem::drop(report_sender); assert_matches!(reader_fut.await, Ok(()));
Ok(())
}
}
mod fidl_interactions {
use super::*;
use assert_matches::assert_matches;
use futures::pin_mut;
use futures::task::Poll;
#[test]
fn closes_channel_after_reports_are_consumed() -> Result<(), Error> {
let mut executor = fasync::TestExecutor::new();
let (proxy, request_stream) =
endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
let (report_sender, report_receiver) =
futures::channel::mpsc::unbounded::<InputReport>();
let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
report_sender
.unbounded_send(InputReport::default())
.expect("sending empty InputReport");
let reports_fut = proxy.read_input_reports();
std::mem::drop(report_sender); let futures = future::join(reader_fut, reports_fut);
pin_mut!(futures);
std::mem::drop(executor.run_until_stalled(&mut futures));
assert_matches!(
executor.run_until_stalled(&mut proxy.read_input_reports()),
Poll::Ready(Err(fidl::Error::ClientChannelClosed { .. }))
);
Ok(())
}
#[fasync::run_until_stalled(test)]
async fn preserves_query_order() -> Result<(), Error> {
let max_reports = usize::try_from(MAX_DEVICE_REPORT_COUNT)
.context("internal error converting MAX_DEVICE_REPORT_COUNT to usize")?;
let (proxy, request_stream) =
endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
let (report_sender, report_receiver) =
futures::channel::mpsc::unbounded::<InputReport>();
let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
for _ in 0..max_reports + 1 {
report_sender
.unbounded_send(InputReport::default())
.expect("sending empty InputReport");
}
let first_reports_fut = proxy.read_input_reports();
let second_reports_fut = proxy.read_input_reports();
std::mem::drop(proxy); std::mem::drop(report_sender); let (_, first_reports_result, second_reports_result) =
futures::join!(reader_fut, first_reports_fut, second_reports_fut);
let first_reports = first_reports_result
.expect("fidl error")
.map_err(zx::Status::from_raw)
.expect("service error");
let second_reports = second_reports_result
.expect("fidl error")
.map_err(zx::Status::from_raw)
.expect("service error");
assert_eq!(first_reports.len(), max_reports, "incorrect reports length (1st query)");
assert_eq!(second_reports.len(), 1, "incorrect reports length (2nd query)");
Ok(())
}
}
#[fasync::run_until_stalled(test)]
async fn preserves_report_order() -> Result<(), Error> {
let (proxy, request_stream) =
endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
let (report_sender, report_receiver) = futures::channel::mpsc::unbounded::<InputReport>();
let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
report_sender
.unbounded_send(InputReport { event_time: Some(1), ..Default::default() })
.expect("sending first InputReport");
report_sender
.unbounded_send(InputReport { event_time: Some(2), ..Default::default() })
.expect("sending second InputReport");
let reports_fut = proxy.read_input_reports();
std::mem::drop(report_sender); assert_eq!(
future::join(reader_fut, reports_fut)
.await
.1
.expect("fidl error")
.map_err(zx::Status::from_raw)
.expect("service error")
.iter()
.map(|report| report.event_time)
.collect::<Vec<_>>(),
[Some(1), Some(2)]
);
Ok(())
}
}