use crate::input_reports_reader::InputReportsReader;
use anyhow::{Context as _, Error};
use async_utils::event::Event as AsyncEvent;
use fidl::endpoints::ServerEnd;
use fidl::Error as FidlError;
use fidl_fuchsia_input_report::{
DeviceDescriptor, FeatureReport, InputDeviceRequest, InputDeviceRequestStream, InputReport,
InputReportsReaderMarker,
};
use fuchsia_async as fasync;
use futures::channel::mpsc;
use futures::{future, pin_mut, StreamExt, TryFutureExt};
pub type DeviceId = u32;
pub(crate) struct InputDevice {
report_sender: futures::channel::mpsc::UnboundedSender<InputReport>,
_input_device_task: fasync::Task<()>,
pub device_id: DeviceId,
}
impl InputDevice {
pub(super) fn new(
request_stream: InputDeviceRequestStream,
descriptor: DeviceDescriptor,
got_input_reports_reader: AsyncEvent,
) -> Self {
let (report_sender, report_receiver) = mpsc::unbounded::<InputReport>();
let input_device_task = fasync::Task::local(Self::serve_reports(
request_stream,
descriptor,
report_receiver,
got_input_reports_reader,
));
Self { report_sender, _input_device_task: input_device_task, device_id: 0 }
}
pub(super) fn send_input_report(&self, input_report: InputReport) -> Result<(), Error> {
self.report_sender
.unbounded_send(input_report)
.context("failed to send input report to reader")
}
#[cfg(test)]
pub(super) async fn flush(self) {
let Self { _input_device_task: input_device_task, report_sender, .. } = self;
std::mem::drop(report_sender); input_device_task.await
}
async fn serve_reports(
request_stream: InputDeviceRequestStream,
descriptor: DeviceDescriptor,
report_receiver: mpsc::UnboundedReceiver<InputReport>,
got_input_reports_reader: AsyncEvent,
) {
let mut input_reports_reader_server_end_stream = request_stream.filter_map(|r| {
future::ready(Self::handle_device_request(
r,
&descriptor,
got_input_reports_reader.clone(),
))
});
let input_reports_reader_fut = {
let reader_server_end = input_reports_reader_server_end_stream
.next()
.await
.unwrap_or_else(|| panic!("stream ended without a call to GetInputReportsReader"));
InputReportsReader { request_stream: reader_server_end.into_stream(), report_receiver }
.into_future()
};
pin_mut!(input_reports_reader_fut);
let input_device_server_fut = async {
match input_reports_reader_server_end_stream.next().await {
Some(_server_end) => {
panic!("InputDevice does not support multiple GetInputReportsReader calls")
}
None => Ok(()),
}
};
pin_mut!(input_device_server_fut);
future::select(
input_device_server_fut.and_then(|_: ()| future::pending()),
input_reports_reader_fut,
)
.await
.factor_first()
.0
.unwrap_or_else(|e| panic!("processing FIDL requests: {e}"))
}
fn handle_device_request(
request: Result<InputDeviceRequest, FidlError>,
descriptor: &DeviceDescriptor,
got_input_reports_reader: AsyncEvent,
) -> Option<ServerEnd<InputReportsReaderMarker>> {
match request {
Ok(InputDeviceRequest::GetInputReportsReader { reader: reader_server_end, .. }) => {
let _ = got_input_reports_reader.signal();
Some(reader_server_end)
}
Ok(InputDeviceRequest::GetDescriptor { responder }) => {
match responder.send(&descriptor) {
Ok(()) => None,
Err(e) => panic!("failed to send GetDescriptor response: {e}"),
}
}
Ok(InputDeviceRequest::GetFeatureReport { responder }) => {
match responder.send(Ok(&FeatureReport::default())) {
Ok(()) => None,
Err(e) => panic!("failed to send GetFeatureReport response: {e}"),
}
}
Err(e) => {
panic!("failed to read `InputReportsReader` request: {:?}", &e);
}
_ => {
panic!(
"InputDevice::handle_device_request does not support this request: {:?}",
&request
);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use fidl::endpoints;
use fidl_fuchsia_input_report::{DeviceDescriptor, InputDeviceMarker};
use fuchsia_async as fasync;
mod responds_to_get_feature_report_request {
use super::*;
#[fasync::run_until_stalled(test)]
async fn single_request_before_call_to_get_feature_report() -> Result<(), Error> {
let (proxy, request_stream) = endpoints::create_proxy_and_stream::<InputDeviceMarker>();
let input_device_server_fut = Box::new(InputDevice::new(
request_stream,
DeviceDescriptor::default(),
AsyncEvent::new(),
))
.flush();
let get_feature_report_fut = proxy.get_feature_report();
let (_input_reports_reader_proxy, input_reports_reader_server_end) =
endpoints::create_proxy::<InputReportsReaderMarker>();
let _ = proxy.get_input_reports_reader(input_reports_reader_server_end);
std::mem::drop(proxy); let (_, get_feature_report_result) =
future::join(input_device_server_fut, get_feature_report_fut).await;
assert_eq!(
get_feature_report_result.context("fidl error")?,
Ok(FeatureReport::default())
);
Ok(())
}
}
mod responds_to_get_descriptor_request {
use super::utils::{make_input_device_proxy_and_struct, make_touchscreen_descriptor};
use super::*;
use assert_matches::assert_matches;
use futures::task::Poll;
#[fasync::run_until_stalled(test)]
async fn single_request_before_call_to_get_input_reports_reader() -> Result<(), Error> {
let (proxy, request_stream) = endpoints::create_proxy_and_stream::<InputDeviceMarker>();
let input_device_server_fut = Box::new(InputDevice::new(
request_stream,
make_touchscreen_descriptor(),
AsyncEvent::new(),
))
.flush();
let get_descriptor_fut = proxy.get_descriptor();
let (_input_reports_reader_proxy, input_reports_reader_server_end) =
endpoints::create_proxy::<InputReportsReaderMarker>();
let _ = proxy.get_input_reports_reader(input_reports_reader_server_end);
std::mem::drop(proxy); let (_, get_descriptor_result) =
future::join(input_device_server_fut, get_descriptor_fut).await;
assert_eq!(get_descriptor_result.context("fidl error")?, make_touchscreen_descriptor());
Ok(())
}
#[test]
fn multiple_requests_before_call_to_get_input_reports_reader() -> Result<(), Error> {
let mut executor = fasync::TestExecutor::new();
let (proxy, request_stream) = endpoints::create_proxy_and_stream::<InputDeviceMarker>();
let input_device_server_fut = Box::new(InputDevice::new(
request_stream,
make_touchscreen_descriptor(),
AsyncEvent::new(),
))
.flush();
pin_mut!(input_device_server_fut);
let mut get_descriptor_fut = proxy.get_descriptor();
assert_matches!(
executor.run_until_stalled(&mut input_device_server_fut),
Poll::Pending
);
std::mem::drop(executor.run_until_stalled(&mut get_descriptor_fut));
let mut get_descriptor_fut = proxy.get_descriptor();
let _ = executor.run_until_stalled(&mut input_device_server_fut);
assert_matches!(
executor.run_until_stalled(&mut get_descriptor_fut),
Poll::Ready(Ok(_))
);
Ok(())
}
#[test]
fn after_call_to_get_input_reports_reader_with_report_pending() -> Result<(), Error> {
let mut executor = fasync::TestExecutor::new();
let (input_device_proxy, input_device, got_input_reports_reader) =
make_input_device_proxy_and_struct();
input_device
.send_input_report(InputReport {
event_time: None,
touch: None,
..Default::default()
})
.context("internal error queuing input event")?;
let input_device_server_fut = input_device.flush();
pin_mut!(input_device_server_fut);
let (_input_reports_reader_proxy, input_reports_reader_server_end) =
endpoints::create_proxy::<InputReportsReaderMarker>();
input_device_proxy
.get_input_reports_reader(input_reports_reader_server_end)
.context("sending get_input_reports_reader request")?;
assert_matches!(
executor.run_until_stalled(&mut input_device_server_fut),
Poll::Pending
);
let mut get_descriptor_fut = input_device_proxy.get_descriptor();
assert_matches!(
executor.run_until_stalled(&mut input_device_server_fut),
Poll::Pending
);
assert_matches!(executor.run_until_stalled(&mut get_descriptor_fut), Poll::Ready(_));
let mut got_input_reports_reader_fut = got_input_reports_reader.wait();
assert_matches!(
executor.run_until_stalled(&mut got_input_reports_reader_fut),
Poll::Ready(_)
);
Ok(())
}
}
mod future_resolution {
use super::utils::{make_input_device_proxy_and_struct, make_input_reports_reader_proxy};
use super::*;
use futures::task::Poll;
mod resolves_after_all_reports_are_sent_to_input_reports_reader {
use super::*;
use assert_matches::assert_matches;
#[test]
fn if_device_request_channel_was_closed() {
let mut executor = fasync::TestExecutor::new();
let (input_device_proxy, input_device, _got_input_reports_reader) =
make_input_device_proxy_and_struct();
let input_reports_reader_proxy =
make_input_reports_reader_proxy(&input_device_proxy);
input_device
.send_input_report(InputReport {
event_time: None,
touch: None,
..Default::default()
})
.expect("queuing input report");
let _input_reports_fut = input_reports_reader_proxy.read_input_reports();
let input_device_fut = input_device.flush();
pin_mut!(input_device_fut);
std::mem::drop(input_device_proxy); assert_matches!(executor.run_until_stalled(&mut input_device_fut), Poll::Ready(()));
}
#[test]
fn even_if_device_request_channel_is_open() {
let mut executor = fasync::TestExecutor::new();
let (input_device_proxy, input_device, _got_input_reports_reader) =
make_input_device_proxy_and_struct();
let input_reports_reader_proxy =
make_input_reports_reader_proxy(&input_device_proxy);
input_device
.send_input_report(InputReport {
event_time: None,
touch: None,
..Default::default()
})
.expect("queuing input report");
let _input_reports_fut = input_reports_reader_proxy.read_input_reports();
let input_device_fut = input_device.flush();
pin_mut!(input_device_fut);
assert_matches!(executor.run_until_stalled(&mut input_device_fut), Poll::Ready(()));
}
#[test]
fn even_if_reports_was_empty_and_device_request_channel_is_open() {
let mut executor = fasync::TestExecutor::new();
let (input_device_proxy, input_device, _got_input_reports_reader) =
make_input_device_proxy_and_struct();
let input_reports_reader_proxy =
make_input_reports_reader_proxy(&input_device_proxy);
let _input_reports_fut = input_reports_reader_proxy.read_input_reports();
let input_device_fut = input_device.flush();
pin_mut!(input_device_fut);
assert_matches!(executor.run_until_stalled(&mut input_device_fut), Poll::Ready(()));
}
}
mod panics_if_peer_closed_device_channel_without_calling_get_input_reports_reader {
use super::*;
use assert_matches::assert_matches;
#[test]
#[should_panic]
fn if_reports_were_available() {
let mut executor = fasync::TestExecutor::new();
let (input_device_proxy, input_device, _got_input_reports_reader) =
make_input_device_proxy_and_struct();
input_device
.send_input_report(InputReport {
event_time: None,
touch: None,
..Default::default()
})
.expect("queuing input report");
let input_device_fut = input_device.flush();
pin_mut!(input_device_fut);
std::mem::drop(input_device_proxy);
assert_matches!(executor.run_until_stalled(&mut input_device_fut), Poll::Pending);
}
#[test]
#[should_panic]
fn even_if_no_reports_were_available() {
let mut executor = fasync::TestExecutor::new();
let (input_device_proxy, input_device, _got_input_reports_reader) =
make_input_device_proxy_and_struct();
let input_device_fut = input_device.flush();
pin_mut!(input_device_fut);
std::mem::drop(input_device_proxy);
assert_matches!(executor.run_until_stalled(&mut input_device_fut), Poll::Pending);
}
}
mod is_pending_if_peer_has_device_channel_open_and_has_not_called_get_input_reports_reader {
use super::*;
use assert_matches::assert_matches;
#[test]
fn if_reports_were_available() {
let mut executor = fasync::TestExecutor::new();
let (_input_device_proxy, input_device, _got_input_reports_reader) =
make_input_device_proxy_and_struct();
input_device
.send_input_report(InputReport {
event_time: None,
touch: None,
..Default::default()
})
.expect("queuing input report");
let input_device_fut = input_device.flush();
pin_mut!(input_device_fut);
assert_matches!(executor.run_until_stalled(&mut input_device_fut), Poll::Pending)
}
#[test]
fn even_if_no_reports_were_available() {
let mut executor = fasync::TestExecutor::new();
let (_input_device_proxy, input_device, _got_input_reports_reader) =
make_input_device_proxy_and_struct();
let input_device_fut = input_device.flush();
pin_mut!(input_device_fut);
assert_matches!(executor.run_until_stalled(&mut input_device_fut), Poll::Pending)
}
#[test]
fn even_if_get_device_descriptor_has_been_called() {
let mut executor = fasync::TestExecutor::new();
let (input_device_proxy, input_device, _got_input_reports_reader) =
make_input_device_proxy_and_struct();
let input_device_fut = input_device.flush();
pin_mut!(input_device_fut);
let _get_descriptor_fut = input_device_proxy.get_descriptor();
assert_matches!(executor.run_until_stalled(&mut input_device_fut), Poll::Pending)
}
}
mod is_pending_if_peer_has_not_read_any_reports_when_a_report_is_available {
use super::*;
use assert_matches::assert_matches;
#[test]
fn if_device_request_channel_is_open() {
let mut executor = fasync::TestExecutor::new();
let (input_device_proxy, input_device, _got_input_reports_reader) =
make_input_device_proxy_and_struct();
let _input_reports_reader_proxy =
make_input_reports_reader_proxy(&input_device_proxy);
input_device
.send_input_report(InputReport {
event_time: None,
touch: None,
..Default::default()
})
.expect("queuing input report");
let input_device_fut = input_device.flush();
pin_mut!(input_device_fut);
assert_matches!(executor.run_until_stalled(&mut input_device_fut), Poll::Pending)
}
#[test]
fn even_if_device_channel_is_closed() {
let mut executor = fasync::TestExecutor::new();
let (input_device_proxy, input_device, _got_input_reports_reader) =
make_input_device_proxy_and_struct();
let _input_reports_reader_proxy =
make_input_reports_reader_proxy(&input_device_proxy);
input_device
.send_input_report(InputReport {
event_time: None,
touch: None,
..Default::default()
})
.expect("queuing input report");
let input_device_fut = input_device.flush();
std::mem::drop(input_device_proxy); pin_mut!(input_device_fut);
assert_matches!(executor.run_until_stalled(&mut input_device_fut), Poll::Pending)
}
}
mod is_pending_if_peer_did_not_read_all_reports {
use super::*;
use assert_matches::assert_matches;
use fidl_fuchsia_input_report::MAX_DEVICE_REPORT_COUNT;
#[test]
fn if_device_request_channel_is_open() {
let mut executor = fasync::TestExecutor::new();
let (input_device_proxy, input_device, _got_input_reports_reader) =
make_input_device_proxy_and_struct();
let input_reports_reader_proxy =
make_input_reports_reader_proxy(&input_device_proxy);
(0..=MAX_DEVICE_REPORT_COUNT).for_each(|_| {
input_device
.send_input_report(InputReport {
event_time: None,
touch: None,
..Default::default()
})
.expect("queuing input report");
});
let _input_reports_fut = input_reports_reader_proxy.read_input_reports();
let input_device_fut = input_device.flush();
pin_mut!(input_device_fut);
assert_matches!(executor.run_until_stalled(&mut input_device_fut), Poll::Pending)
}
#[test]
fn even_if_device_request_channel_is_closed() {
let mut executor = fasync::TestExecutor::new();
let (input_device_proxy, input_device, _got_input_reports_reader) =
make_input_device_proxy_and_struct();
let input_reports_reader_proxy =
make_input_reports_reader_proxy(&input_device_proxy);
(0..=MAX_DEVICE_REPORT_COUNT).for_each(|_| {
input_device
.send_input_report(InputReport {
event_time: None,
touch: None,
..Default::default()
})
.expect("queuing input report");
});
let _input_reports_fut = input_reports_reader_proxy.read_input_reports();
let input_device_fut = input_device.flush();
pin_mut!(input_device_fut);
std::mem::drop(input_device_proxy); assert_matches!(executor.run_until_stalled(&mut input_device_fut), Poll::Pending)
}
}
}
mod utils {
use {
super::*,
fidl_fuchsia_input_report::{
Axis, ContactInputDescriptor, InputDeviceProxy, InputReportsReaderProxy, Range,
TouchDescriptor, TouchInputDescriptor, TouchType, Unit, UnitType,
},
};
pub(super) fn make_touchscreen_descriptor() -> DeviceDescriptor {
DeviceDescriptor {
touch: Some(TouchDescriptor {
input: Some(TouchInputDescriptor {
contacts: Some(
std::iter::repeat(ContactInputDescriptor {
position_x: Some(Axis {
range: Range { min: -1000, max: 1000 },
unit: Unit { type_: UnitType::Other, exponent: 0 },
}),
position_y: Some(Axis {
range: Range { min: -1000, max: 1000 },
unit: Unit { type_: UnitType::Other, exponent: 0 },
}),
contact_width: Some(Axis {
range: Range { min: -1000, max: 1000 },
unit: Unit { type_: UnitType::Other, exponent: 0 },
}),
contact_height: Some(Axis {
range: Range { min: -1000, max: 1000 },
unit: Unit { type_: UnitType::Other, exponent: 0 },
}),
..Default::default()
})
.take(10)
.collect(),
),
max_contacts: Some(10),
touch_type: Some(TouchType::Touchscreen),
buttons: Some(vec![]),
..Default::default()
}),
..Default::default()
}),
..Default::default()
}
}
pub(super) fn make_input_device_proxy_and_struct(
) -> (InputDeviceProxy, Box<InputDevice>, AsyncEvent) {
let (input_device_proxy, input_device_request_stream) =
endpoints::create_proxy_and_stream::<InputDeviceMarker>();
let got_input_reports_reader = AsyncEvent::new();
let input_device = Box::new(InputDevice::new(
input_device_request_stream,
DeviceDescriptor::default(),
got_input_reports_reader.clone(),
));
(input_device_proxy, input_device, got_input_reports_reader)
}
pub(super) fn make_input_reports_reader_proxy(
input_device_proxy: &InputDeviceProxy,
) -> InputReportsReaderProxy {
let (input_reports_reader_proxy, input_reports_reader_server_end) =
endpoints::create_proxy::<InputReportsReaderMarker>();
input_device_proxy
.get_input_reports_reader(input_reports_reader_server_end)
.expect("sending get_input_reports_reader request");
input_reports_reader_proxy
}
}
}