1use crate::autorepeater::Autorepeater;
6use crate::display_ownership::DisplayOwnership;
7use crate::focus_listener::FocusListener;
8use crate::{input_device, input_handler, metrics};
9use anyhow::{Context, Error, format_err};
10use focus_chain_provider::FocusChainProviderPublisher;
11use fuchsia_fs::directory::{WatchEvent, Watcher};
12use fuchsia_inspect::NumericProperty;
13use fuchsia_inspect::health::Reporter;
14use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
15use futures::lock::Mutex;
16use futures::{StreamExt, TryStreamExt};
17use itertools::Itertools;
18use metrics_registry::*;
19use std::collections::HashMap;
20use std::path::PathBuf;
21use std::rc::Rc;
22use std::sync::atomic::{AtomicU32, Ordering};
23use std::sync::{Arc, LazyLock};
24use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
25
26static NEXT_DEVICE_ID: LazyLock<AtomicU32> = LazyLock::new(|| AtomicU32::new(10));
33
34fn get_next_device_id() -> u32 {
38 NEXT_DEVICE_ID.fetch_add(1, Ordering::SeqCst)
39}
40
41type BoxedInputDeviceBinding = Box<dyn input_device::InputDeviceBinding>;
42
43pub type InputDeviceBindingHashMap = Arc<Mutex<HashMap<u32, Vec<BoxedInputDeviceBinding>>>>;
46
47pub struct InputPipelineAssembly {
61 sender: UnboundedSender<input_device::InputEvent>,
64 receiver: UnboundedReceiver<input_device::InputEvent>,
68 tasks: Vec<fuchsia_async::Task<()>>,
72
73 metrics_logger: metrics::MetricsLogger,
75}
76
77impl InputPipelineAssembly {
78 pub fn new(metrics_logger: metrics::MetricsLogger) -> Self {
81 let (sender, receiver) = mpsc::unbounded();
82 let tasks = vec![];
83 InputPipelineAssembly { sender, receiver, tasks, metrics_logger }
84 }
85
86 pub fn add_handler(self, handler: Rc<dyn input_handler::InputHandler>) -> Self {
90 let (sender, mut receiver, mut tasks, metrics_logger) = self.into_components();
91 let metrics_logger_clone = metrics_logger.clone();
92 let (next_sender, next_receiver) = mpsc::unbounded();
93 let handler_name = handler.get_name();
94 tasks.push(fasync::Task::local(async move {
95 handler.clone().set_handler_healthy();
96 while let Some(event) = receiver.next().await {
97 let out_events = {
101 let _async_trace = fuchsia_trace::async_enter!(
102 fuchsia_trace::Id::random(),
103 c"input",
104 c"handle_input_event",
105 "name" => handler_name
106 );
107 handler.clone().handle_input_event(event).await
108 };
109 for out_event in out_events.into_iter() {
110 if let Err(e) = next_sender.unbounded_send(out_event) {
111 metrics_logger_clone.log_error(
112 InputPipelineErrorMetricDimensionEvent::InputPipelineCouldNotForwardEvent,
113 std::format!(
114 "could not forward event output from handler: {:?}: {:?}",
115 handler_name,
116 e));
117 break;
119 }
120 }
121 }
122 handler.clone().set_handler_unhealthy(std::format!("Receive loop terminated for handler: {:?}", handler_name).as_str());
123 panic!("receive loop is not supposed to terminate for handler: {:?}", handler_name);
124 }));
125 receiver = next_receiver;
126 InputPipelineAssembly { sender, receiver, tasks, metrics_logger }
127 }
128
129 pub fn add_all_handlers(self, handlers: Vec<Rc<dyn input_handler::InputHandler>>) -> Self {
131 handlers.into_iter().fold(self, |assembly, handler| assembly.add_handler(handler))
132 }
133
134 pub fn add_display_ownership(
140 self,
141 display_ownership_event: zx::Event,
142 input_handlers_node: &fuchsia_inspect::Node,
143 ) -> InputPipelineAssembly {
144 let (sender, autorepeat_receiver, mut tasks, metrics_logger) = self.into_components();
145 let (autorepeat_sender, receiver) = mpsc::unbounded();
146 let h = DisplayOwnership::new(display_ownership_event, input_handlers_node);
147 let metrics_logger_clone = metrics_logger.clone();
148 tasks.push(fasync::Task::local(async move {
149 h.clone().set_handler_healthy();
150 h.clone().handle_input_events(autorepeat_receiver, autorepeat_sender)
151 .await
152 .map_err(|e| {
153 metrics_logger_clone.log_error(
154 InputPipelineErrorMetricDimensionEvent::InputPipelineDisplayOwnershipIsNotSupposedToTerminate,
155 std::format!(
156 "display ownership is not supposed to terminate - this is likely a problem: {:?}", e));
157 }).unwrap();
158 h.set_handler_unhealthy("Receive loop terminated for handler: DisplayOwnership");
159 }));
160 InputPipelineAssembly { sender, receiver, tasks, metrics_logger }
161 }
162
163 pub fn add_autorepeater(self, input_handlers_node: &fuchsia_inspect::Node) -> Self {
167 let (sender, autorepeat_receiver, mut tasks, metrics_logger) = self.into_components();
168 let (autorepeat_sender, receiver) = mpsc::unbounded();
169 let metrics_logger_clone = metrics_logger.clone();
170 let a = Autorepeater::new(autorepeat_receiver, input_handlers_node, metrics_logger.clone());
171 tasks.push(fasync::Task::local(async move {
172 a.clone().set_handler_healthy();
173 a.clone()
174 .run(autorepeat_sender)
175 .await
176 .map_err(|e| {
177 metrics_logger_clone.log_error(
178 InputPipelineErrorMetricDimensionEvent::InputPipelineAutorepeatRunningError,
179 std::format!("error while running autorepeater: {:?}", e),
180 );
181 })
182 .expect("autorepeater should never error out");
183 a.set_handler_unhealthy("Receive loop terminated for handler: Autorepeater");
184 }));
185 InputPipelineAssembly { sender, receiver, tasks, metrics_logger }
186 }
187
188 fn into_components(
194 self,
195 ) -> (
196 UnboundedSender<input_device::InputEvent>,
197 UnboundedReceiver<input_device::InputEvent>,
198 Vec<fuchsia_async::Task<()>>,
199 metrics::MetricsLogger,
200 ) {
201 (self.sender, self.receiver, self.tasks, self.metrics_logger)
202 }
203
204 pub fn add_focus_listener(self, focus_chain_publisher: FocusChainProviderPublisher) -> Self {
217 let (sender, receiver, mut tasks, metrics_logger) = self.into_components();
218 let metrics_logger_clone = metrics_logger.clone();
219 tasks.push(fasync::Task::local(async move {
220 if let Ok(mut focus_listener) =
221 FocusListener::new(focus_chain_publisher, metrics_logger_clone).map_err(|e| {
222 log::warn!(
223 "could not create focus listener, focus will not be dispatched: {:?}",
224 e
225 )
226 })
227 {
228 let _result = focus_listener
231 .dispatch_focus_changes()
232 .await
233 .map(|_| {
234 log::warn!("dispatch focus loop ended, focus will no longer be dispatched")
235 })
236 .map_err(|e| {
237 panic!("could not dispatch focus changes, this is a fatal error: {:?}", e)
238 });
239 }
240 }));
241 InputPipelineAssembly { sender, receiver, tasks, metrics_logger }
242 }
243}
244
245pub struct InputPipeline {
273 pipeline_sender: UnboundedSender<input_device::InputEvent>,
277
278 device_event_sender: UnboundedSender<input_device::InputEvent>,
281
282 device_event_receiver: UnboundedReceiver<input_device::InputEvent>,
284
285 input_device_types: Vec<input_device::InputDeviceType>,
287
288 input_device_bindings: InputDeviceBindingHashMap,
290
291 inspect_node: fuchsia_inspect::Node,
294
295 metrics_logger: metrics::MetricsLogger,
297}
298
299impl InputPipeline {
300 fn new_common(
303 input_device_types: Vec<input_device::InputDeviceType>,
304 assembly: InputPipelineAssembly,
305 inspect_node: fuchsia_inspect::Node,
306 ) -> Self {
307 let (pipeline_sender, receiver, tasks, metrics_logger) = assembly.into_components();
308
309 inspect_node.record_string("supported_input_devices", input_device_types.iter().join(", "));
311 inspect_node.record_uint("handlers_registered", tasks.len() as u64);
312 inspect_node.record_uint("handlers_healthy", tasks.len() as u64);
313
314 InputPipeline::catch_unhandled(receiver);
317
318 InputPipeline::run(tasks);
320
321 let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
322 let input_device_bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
323 InputPipeline {
324 pipeline_sender,
325 device_event_sender,
326 device_event_receiver,
327 input_device_types,
328 input_device_bindings,
329 inspect_node,
330 metrics_logger,
331 }
332 }
333
334 pub fn new_for_test(
342 input_device_types: Vec<input_device::InputDeviceType>,
343 assembly: InputPipelineAssembly,
344 ) -> Self {
345 let inspector = fuchsia_inspect::Inspector::default();
346 let root = inspector.root();
347 let test_node = root.create_child("input_pipeline");
348 Self::new_common(input_device_types, assembly, test_node)
349 }
350
351 pub fn new(
358 input_device_types: Vec<input_device::InputDeviceType>,
359 assembly: InputPipelineAssembly,
360 inspect_node: fuchsia_inspect::Node,
361 metrics_logger: metrics::MetricsLogger,
362 ) -> Result<Self, Error> {
363 let input_pipeline = Self::new_common(input_device_types, assembly, inspect_node);
364 let input_device_types = input_pipeline.input_device_types.clone();
365 let input_event_sender = input_pipeline.device_event_sender.clone();
366 let input_device_bindings = input_pipeline.input_device_bindings.clone();
367 let devices_node = input_pipeline.inspect_node.create_child("input_devices");
368 fasync::Task::local(async move {
369 match async {
372 let dir_proxy = fuchsia_fs::directory::open_in_namespace(
373 input_device::INPUT_REPORT_PATH,
374 fuchsia_fs::PERM_READABLE,
375 )
376 .with_context(|| format!("failed to open {}", input_device::INPUT_REPORT_PATH))?;
377 let device_watcher =
378 Watcher::new(&dir_proxy).await.context("failed to create watcher")?;
379 Self::watch_for_devices(
380 device_watcher,
381 dir_proxy,
382 input_device_types,
383 input_event_sender,
384 input_device_bindings,
385 &devices_node,
386 false, metrics_logger.clone(),
388 )
389 .await
390 .context("failed to watch for devices")
391 }
392 .await
393 {
394 Ok(()) => {}
395 Err(err) => {
396 metrics_logger.log_warn(
400 InputPipelineErrorMetricDimensionEvent::InputPipelineUnableToWatchForNewInputDevices,
401 std::format!(
402 "Input pipeline is unable to watch for new input devices: {:?}",
403 err
404 ));
405 }
406 }
407 })
408 .detach();
409
410 Ok(input_pipeline)
411 }
412
413 pub fn input_device_bindings(&self) -> &InputDeviceBindingHashMap {
415 &self.input_device_bindings
416 }
417
418 pub fn input_event_sender(&self) -> &UnboundedSender<input_device::InputEvent> {
421 &self.device_event_sender
422 }
423
424 pub fn input_device_types(&self) -> &Vec<input_device::InputDeviceType> {
426 &self.input_device_types
427 }
428
429 pub async fn handle_input_events(mut self) {
431 let metrics_logger_clone = self.metrics_logger.clone();
432 while let Some(input_event) = self.device_event_receiver.next().await {
433 if let Err(e) = self.pipeline_sender.unbounded_send(input_event) {
434 metrics_logger_clone.log_error(
435 InputPipelineErrorMetricDimensionEvent::InputPipelineCouldNotForwardEventFromDriver,
436 std::format!("could not forward event from driver: {:?}", &e));
437 }
438 }
439
440 metrics_logger_clone.log_error(
441 InputPipelineErrorMetricDimensionEvent::InputPipelineStopHandlingEvents,
442 "Input pipeline stopped handling input events.".to_string(),
443 );
444 }
445
446 async fn watch_for_devices(
462 mut device_watcher: Watcher,
463 dir_proxy: fio::DirectoryProxy,
464 device_types: Vec<input_device::InputDeviceType>,
465 input_event_sender: UnboundedSender<input_device::InputEvent>,
466 bindings: InputDeviceBindingHashMap,
467 input_devices_node: &fuchsia_inspect::Node,
468 break_on_idle: bool,
469 metrics_logger: metrics::MetricsLogger,
470 ) -> Result<(), Error> {
471 let devices_discovered = input_devices_node.create_uint("devices_discovered", 0);
473 let devices_connected = input_devices_node.create_uint("devices_connected", 0);
474 while let Some(msg) = device_watcher.try_next().await? {
475 if let Ok(filename) = msg.filename.into_os_string().into_string() {
476 if filename == "." {
477 continue;
478 }
479
480 let pathbuf = PathBuf::from(filename.clone());
481 match msg.event {
482 WatchEvent::EXISTING | WatchEvent::ADD_FILE => {
483 log::info!("found input device {}", filename);
484 devices_discovered.add(1);
485 let device_proxy =
486 input_device::get_device_from_dir_entry_path(&dir_proxy, &pathbuf)?;
487 add_device_bindings(
488 &device_types,
489 &filename,
490 device_proxy,
491 &input_event_sender,
492 &bindings,
493 get_next_device_id(),
494 input_devices_node,
495 Some(&devices_connected),
496 metrics_logger.clone(),
497 )
498 .await;
499 }
500 WatchEvent::IDLE => {
501 if break_on_idle {
502 break;
503 }
504 }
505 _ => (),
506 }
507 }
508 }
509 input_devices_node.record(devices_discovered);
511 input_devices_node.record(devices_connected);
512 Err(format_err!("Input pipeline stopped watching for new input devices."))
513 }
514
515 pub async fn handle_input_device_registry_request_stream(
530 mut stream: fidl_fuchsia_input_injection::InputDeviceRegistryRequestStream,
531 device_types: &Vec<input_device::InputDeviceType>,
532 input_event_sender: &UnboundedSender<input_device::InputEvent>,
533 bindings: &InputDeviceBindingHashMap,
534 input_devices_node: &fuchsia_inspect::Node,
535 metrics_logger: metrics::MetricsLogger,
536 ) -> Result<(), Error> {
537 while let Some(request) = stream
538 .try_next()
539 .await
540 .context("Error handling input device registry request stream")?
541 {
542 match request {
543 fidl_fuchsia_input_injection::InputDeviceRegistryRequest::Register {
544 device,
545 ..
546 } => {
547 let device_proxy = device.into_proxy();
549
550 let device_id = get_next_device_id();
551
552 add_device_bindings(
553 device_types,
554 &format!("input-device-registry-{}", device_id),
555 device_proxy,
556 input_event_sender,
557 bindings,
558 device_id,
559 input_devices_node,
560 None,
561 metrics_logger.clone(),
562 )
563 .await;
564 }
565 fidl_fuchsia_input_injection::InputDeviceRegistryRequest::RegisterAndGetDeviceInfo {
566 device,
567 responder,
568 .. } => {
569 let device_proxy = device.into_proxy();
571
572 let device_id = get_next_device_id();
573
574 add_device_bindings(
575 device_types,
576 &format!("input-device-registry-{}", device_id),
577 device_proxy,
578 input_event_sender,
579 bindings,
580 device_id,
581 input_devices_node,
582 None,
583 metrics_logger.clone(),
584 )
585 .await;
586
587 responder.send(fidl_fuchsia_input_injection::InputDeviceRegistryRegisterAndGetDeviceInfoResponse{
588 device_id: Some(device_id),
589 ..Default::default()
590 }).expect("Failed to respond to RegisterAndGetDeviceInfo request");
591 }
592 }
593 }
594
595 Ok(())
596 }
597
598 fn run(tasks: Vec<fuchsia_async::Task<()>>) {
600 fasync::Task::local(async move {
601 futures::future::join_all(tasks).await;
602 panic!("Runner task is not supposed to terminate.")
603 })
604 .detach();
605 }
606
607 fn catch_unhandled(mut receiver: UnboundedReceiver<input_device::InputEvent>) {
610 fasync::Task::local(async move {
611 while let Some(event) = receiver.next().await {
612 if event.handled == input_device::Handled::No {
613 log::warn!("unhandled input event: {:?}", &event);
614 }
615 }
616 panic!("unhandled event catcher is not supposed to terminate.");
617 })
618 .detach();
619 }
620}
621
622async fn add_device_bindings(
642 device_types: &Vec<input_device::InputDeviceType>,
643 filename: &String,
644 device_proxy: fidl_fuchsia_input_report::InputDeviceProxy,
645 input_event_sender: &UnboundedSender<input_device::InputEvent>,
646 bindings: &InputDeviceBindingHashMap,
647 device_id: u32,
648 input_devices_node: &fuchsia_inspect::Node,
649 devices_connected: Option<&fuchsia_inspect::UintProperty>,
650 metrics_logger: metrics::MetricsLogger,
651) {
652 let mut matched_device_types = vec![];
653 if let Ok(descriptor) = device_proxy.get_descriptor().await {
654 for device_type in device_types {
655 if input_device::is_device_type(&descriptor, *device_type).await {
656 matched_device_types.push(device_type);
657 match devices_connected {
658 Some(dev_connected) => {
659 let _ = dev_connected.add(1);
660 }
661 None => (),
662 };
663 }
664 }
665 if matched_device_types.is_empty() {
666 log::info!(
667 "device {} did not match any supported device types: {:?}",
668 filename,
669 device_types
670 );
671 let device_node = input_devices_node.create_child(format!("{}_Unsupported", filename));
672 let mut health = fuchsia_inspect::health::Node::new(&device_node);
673 health.set_unhealthy("Unsupported device type.");
674 device_node.record(health);
675 input_devices_node.record(device_node);
676 return;
677 }
678 } else {
679 metrics_logger.clone().log_error(
680 InputPipelineErrorMetricDimensionEvent::InputPipelineNoDeviceDescriptor,
681 std::format!("cannot bind device {} without a device descriptor", filename),
682 );
683 return;
684 }
685
686 log::info!(
687 "binding {} to device types: {}",
688 filename,
689 matched_device_types
690 .iter()
691 .fold(String::new(), |device_types_string, device_type| device_types_string
692 + &format!("{:?}, ", device_type))
693 );
694
695 let mut new_bindings: Vec<BoxedInputDeviceBinding> = vec![];
696 for device_type in matched_device_types {
697 let proxy = device_proxy.clone();
718 let device_node = input_devices_node.create_child(format!("{}_{}", filename, device_type));
719 match input_device::get_device_binding(
720 *device_type,
721 proxy,
722 device_id,
723 input_event_sender.clone(),
724 device_node,
725 metrics_logger.clone(),
726 )
727 .await
728 {
729 Ok(binding) => new_bindings.push(binding),
730 Err(e) => {
731 metrics_logger.log_error(
732 InputPipelineErrorMetricDimensionEvent::InputPipelineFailedToBind,
733 std::format!("failed to bind {} as {:?}: {}", filename, device_type, e),
734 );
735 }
736 }
737 }
738
739 if !new_bindings.is_empty() {
740 let mut bindings = bindings.lock().await;
741 bindings.entry(device_id).or_insert(Vec::new()).extend(new_bindings);
742 }
743}
744
745#[cfg(test)]
746mod tests {
747 use super::*;
748 use crate::input_device::InputDeviceBinding;
749 use crate::utils::Position;
750 use crate::{
751 fake_input_device_binding, mouse_binding, mouse_model_database,
752 observe_fake_events_input_handler,
753 };
754 use diagnostics_assertions::AnyProperty;
755 use fidl::endpoints::{create_proxy_and_stream, create_request_stream};
756 use fuchsia_async as fasync;
757 use futures::FutureExt;
758 use pretty_assertions::assert_eq;
759 use rand::Rng;
760 use std::collections::HashSet;
761 use vfs::{pseudo_directory, service as pseudo_fs_service};
762
763 const COUNTS_PER_MM: u32 = 12;
764
765 fn send_input_event(
770 sender: UnboundedSender<input_device::InputEvent>,
771 ) -> input_device::InputEvent {
772 let mut rng = rand::rng();
773 let offset =
774 Position { x: rng.random_range(0..10) as f32, y: rng.random_range(0..10) as f32 };
775 let input_event = input_device::InputEvent {
776 device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
777 mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
778 millimeters: Position {
779 x: offset.x / COUNTS_PER_MM as f32,
780 y: offset.y / COUNTS_PER_MM as f32,
781 },
782 }),
783 None, None, mouse_binding::MousePhase::Move,
786 HashSet::new(),
787 HashSet::new(),
788 None, None, )),
791 device_descriptor: input_device::InputDeviceDescriptor::Mouse(
792 mouse_binding::MouseDeviceDescriptor {
793 device_id: 1,
794 absolute_x_range: None,
795 absolute_y_range: None,
796 wheel_v_range: None,
797 wheel_h_range: None,
798 buttons: None,
799 counts_per_mm: COUNTS_PER_MM,
800 },
801 ),
802 event_time: zx::MonotonicInstant::get(),
803 handled: input_device::Handled::No,
804 trace_id: None,
805 };
806 match sender.unbounded_send(input_event.clone()) {
807 Err(_) => assert!(false),
808 _ => {}
809 }
810
811 input_event
812 }
813
814 fn handle_input_device_request(
819 input_device_request: fidl_fuchsia_input_report::InputDeviceRequest,
820 ) {
821 match input_device_request {
822 fidl_fuchsia_input_report::InputDeviceRequest::GetDescriptor { responder } => {
823 let _ = responder.send(&fidl_fuchsia_input_report::DeviceDescriptor {
824 device_information: None,
825 mouse: Some(fidl_fuchsia_input_report::MouseDescriptor {
826 input: Some(fidl_fuchsia_input_report::MouseInputDescriptor {
827 movement_x: None,
828 movement_y: None,
829 scroll_v: None,
830 scroll_h: None,
831 buttons: Some(vec![0]),
832 position_x: None,
833 position_y: None,
834 ..Default::default()
835 }),
836 ..Default::default()
837 }),
838 sensor: None,
839 touch: None,
840 keyboard: None,
841 consumer_control: None,
842 ..Default::default()
843 });
844 }
845 _ => {}
846 }
847 }
848
849 #[fasync::run_singlethreaded(test)]
851 async fn multiple_devices_single_handler() {
852 let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
854 let first_device_binding =
855 fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
856 let second_device_binding =
857 fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
858
859 let (handler_event_sender, mut handler_event_receiver) =
861 futures::channel::mpsc::channel(100);
862 let input_handler = observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
863 handler_event_sender,
864 );
865
866 let (sender, receiver, tasks, _) =
868 InputPipelineAssembly::new(metrics::MetricsLogger::default())
869 .add_handler(input_handler)
870 .into_components();
871 let inspector = fuchsia_inspect::Inspector::default();
872 let test_node = inspector.root().create_child("input_pipeline");
873 let input_pipeline = InputPipeline {
874 pipeline_sender: sender,
875 device_event_sender,
876 device_event_receiver,
877 input_device_types: vec![],
878 input_device_bindings: Arc::new(Mutex::new(HashMap::new())),
879 inspect_node: test_node,
880 metrics_logger: metrics::MetricsLogger::default(),
881 };
882 InputPipeline::catch_unhandled(receiver);
883 InputPipeline::run(tasks);
884
885 let first_device_event = send_input_event(first_device_binding.input_event_sender());
887 let second_device_event = send_input_event(second_device_binding.input_event_sender());
888
889 fasync::Task::local(async {
891 input_pipeline.handle_input_events().await;
892 })
893 .detach();
894
895 let first_handled_event = handler_event_receiver.next().await;
897 assert_eq!(first_handled_event, Some(first_device_event));
898
899 let second_handled_event = handler_event_receiver.next().await;
900 assert_eq!(second_handled_event, Some(second_device_event));
901 }
902
903 #[fasync::run_singlethreaded(test)]
905 async fn single_device_multiple_handlers() {
906 let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
908 let input_device_binding =
909 fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
910
911 let (first_handler_event_sender, mut first_handler_event_receiver) =
913 futures::channel::mpsc::channel(100);
914 let first_input_handler =
915 observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
916 first_handler_event_sender,
917 );
918 let (second_handler_event_sender, mut second_handler_event_receiver) =
919 futures::channel::mpsc::channel(100);
920 let second_input_handler =
921 observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
922 second_handler_event_sender,
923 );
924
925 let (sender, receiver, tasks, _) =
927 InputPipelineAssembly::new(metrics::MetricsLogger::default())
928 .add_handler(first_input_handler)
929 .add_handler(second_input_handler)
930 .into_components();
931 let inspector = fuchsia_inspect::Inspector::default();
932 let test_node = inspector.root().create_child("input_pipeline");
933 let input_pipeline = InputPipeline {
934 pipeline_sender: sender,
935 device_event_sender,
936 device_event_receiver,
937 input_device_types: vec![],
938 input_device_bindings: Arc::new(Mutex::new(HashMap::new())),
939 inspect_node: test_node,
940 metrics_logger: metrics::MetricsLogger::default(),
941 };
942 InputPipeline::catch_unhandled(receiver);
943 InputPipeline::run(tasks);
944
945 let input_event = send_input_event(input_device_binding.input_event_sender());
947
948 fasync::Task::local(async {
950 input_pipeline.handle_input_events().await;
951 })
952 .detach();
953
954 let first_handler_event = first_handler_event_receiver.next().await;
956 assert_eq!(first_handler_event, Some(input_event.clone()));
957 let second_handler_event = second_handler_event_receiver.next().await;
958 assert_eq!(second_handler_event, Some(input_event));
959 }
960
961 #[fasync::run_singlethreaded(test)]
964 async fn watch_devices_one_match_exists() {
965 let mut count: i8 = 0;
967 let dir = pseudo_directory! {
968 "file_name" => pseudo_fs_service::host(
969 move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
970 async move {
971 while count < 3 {
972 if let Some(input_device_request) =
973 request_stream.try_next().await.unwrap()
974 {
975 handle_input_device_request(input_device_request);
976 count += 1;
977 }
978 }
979
980 }.boxed()
981 },
982 )
983 };
984
985 let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
987 let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
988 let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
991
992 let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
993 let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
994 let supported_device_types = vec![input_device::InputDeviceType::Mouse];
995
996 let inspector = fuchsia_inspect::Inspector::default();
997 let test_node = inspector.root().create_child("input_pipeline");
998 test_node.record_string(
999 "supported_input_devices",
1000 supported_device_types.clone().iter().join(", "),
1001 );
1002 let input_devices = test_node.create_child("input_devices");
1003 diagnostics_assertions::assert_data_tree!(inspector, root: {
1005 input_pipeline: {
1006 supported_input_devices: "Mouse",
1007 input_devices: {}
1008 }
1009 });
1010
1011 let _ = InputPipeline::watch_for_devices(
1012 device_watcher,
1013 dir_proxy_for_pipeline,
1014 supported_device_types,
1015 input_event_sender,
1016 bindings.clone(),
1017 &input_devices,
1018 true, metrics::MetricsLogger::default(),
1020 )
1021 .await;
1022
1023 let bindings_hashmap = bindings.lock().await;
1025 assert_eq!(bindings_hashmap.len(), 1);
1026 let bindings_vector = bindings_hashmap.get(&10);
1027 assert!(bindings_vector.is_some());
1028 assert_eq!(bindings_vector.unwrap().len(), 1);
1029 let boxed_mouse_binding = bindings_vector.unwrap().get(0);
1030 assert!(boxed_mouse_binding.is_some());
1031 assert_eq!(
1032 boxed_mouse_binding.unwrap().get_device_descriptor(),
1033 input_device::InputDeviceDescriptor::Mouse(mouse_binding::MouseDeviceDescriptor {
1034 device_id: 10,
1035 absolute_x_range: None,
1036 absolute_y_range: None,
1037 wheel_v_range: None,
1038 wheel_h_range: None,
1039 buttons: Some(vec![0]),
1040 counts_per_mm: mouse_model_database::db::DEFAULT_COUNTS_PER_MM,
1041 })
1042 );
1043
1044 diagnostics_assertions::assert_data_tree!(inspector, root: {
1046 input_pipeline: {
1047 supported_input_devices: "Mouse",
1048 input_devices: {
1049 devices_discovered: 1u64,
1050 devices_connected: 1u64,
1051 "file_name_Mouse": contains {
1052 reports_received_count: 0u64,
1053 reports_filtered_count: 0u64,
1054 events_generated: 0u64,
1055 last_received_timestamp_ns: 0u64,
1056 last_generated_timestamp_ns: 0u64,
1057 "fuchsia.inspect.Health": {
1058 status: "OK",
1059 start_timestamp_nanos: AnyProperty
1062 },
1063 }
1064 }
1065 }
1066 });
1067 }
1068
1069 #[fasync::run_singlethreaded(test)]
1072 async fn watch_devices_no_matches_exist() {
1073 let mut count: i8 = 0;
1075 let dir = pseudo_directory! {
1076 "file_name" => pseudo_fs_service::host(
1077 move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1078 async move {
1079 while count < 1 {
1080 if let Some(input_device_request) =
1081 request_stream.try_next().await.unwrap()
1082 {
1083 handle_input_device_request(input_device_request);
1084 count += 1;
1085 }
1086 }
1087
1088 }.boxed()
1089 },
1090 )
1091 };
1092
1093 let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1095 let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1096 let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1099
1100 let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1101 let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1102 let supported_device_types = vec![input_device::InputDeviceType::Keyboard];
1103
1104 let inspector = fuchsia_inspect::Inspector::default();
1105 let test_node = inspector.root().create_child("input_pipeline");
1106 test_node.record_string(
1107 "supported_input_devices",
1108 supported_device_types.clone().iter().join(", "),
1109 );
1110 let input_devices = test_node.create_child("input_devices");
1111 diagnostics_assertions::assert_data_tree!(inspector, root: {
1113 input_pipeline: {
1114 supported_input_devices: "Keyboard",
1115 input_devices: {}
1116 }
1117 });
1118
1119 let _ = InputPipeline::watch_for_devices(
1120 device_watcher,
1121 dir_proxy_for_pipeline,
1122 supported_device_types,
1123 input_event_sender,
1124 bindings.clone(),
1125 &input_devices,
1126 true, metrics::MetricsLogger::default(),
1128 )
1129 .await;
1130
1131 let bindings = bindings.lock().await;
1133 assert_eq!(bindings.len(), 0);
1134
1135 diagnostics_assertions::assert_data_tree!(inspector, root: {
1137 input_pipeline: {
1138 supported_input_devices: "Keyboard",
1139 input_devices: {
1140 devices_discovered: 1u64,
1141 devices_connected: 0u64,
1142 "file_name_Unsupported": {
1143 "fuchsia.inspect.Health": {
1144 status: "UNHEALTHY",
1145 message: "Unsupported device type.",
1146 start_timestamp_nanos: AnyProperty
1149 },
1150 }
1151 }
1152 }
1153 });
1154 }
1155
1156 #[fasync::run_singlethreaded(test)]
1159 async fn handle_input_device_registry_request_stream() {
1160 let (input_device_registry_proxy, input_device_registry_request_stream) =
1161 create_proxy_and_stream::<fidl_fuchsia_input_injection::InputDeviceRegistryMarker>();
1162 let (input_device_client_end, mut input_device_request_stream) =
1163 create_request_stream::<fidl_fuchsia_input_report::InputDeviceMarker>();
1164
1165 let device_types = vec![input_device::InputDeviceType::Mouse];
1166 let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1167 let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1168
1169 let mut count: i8 = 0;
1171 fasync::Task::local(async move {
1172 let _ = input_device_registry_proxy.register(input_device_client_end);
1174
1175 while count < 3 {
1176 if let Some(input_device_request) =
1177 input_device_request_stream.try_next().await.unwrap()
1178 {
1179 handle_input_device_request(input_device_request);
1180 count += 1;
1181 }
1182 }
1183
1184 input_device_registry_proxy.take_event_stream();
1186 })
1187 .detach();
1188
1189 let inspector = fuchsia_inspect::Inspector::default();
1190 let test_node = inspector.root().create_child("input_pipeline");
1191
1192 let bindings_clone = bindings.clone();
1194 let _ = InputPipeline::handle_input_device_registry_request_stream(
1195 input_device_registry_request_stream,
1196 &device_types,
1197 &input_event_sender,
1198 &bindings_clone,
1199 &test_node,
1200 metrics::MetricsLogger::default(),
1201 )
1202 .await;
1203
1204 let bindings = bindings.lock().await;
1206 assert_eq!(bindings.len(), 1);
1207 }
1208
1209 #[fasync::run_singlethreaded(test)]
1211 async fn check_inspect_node_has_correct_properties() {
1212 let device_types = vec![
1213 input_device::InputDeviceType::Touch,
1214 input_device::InputDeviceType::ConsumerControls,
1215 ];
1216 let inspector = fuchsia_inspect::Inspector::default();
1217 let test_node = inspector.root().create_child("input_pipeline");
1218 let (fake_handler_event_sender, _fake_handler_event_receiver) =
1220 futures::channel::mpsc::channel(100);
1221 let fake_input_handler =
1222 observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
1223 fake_handler_event_sender,
1224 );
1225 let assembly = InputPipelineAssembly::new(metrics::MetricsLogger::default())
1226 .add_handler(fake_input_handler);
1227 let _test_input_pipeline = InputPipeline::new(
1228 device_types,
1229 assembly,
1230 test_node,
1231 metrics::MetricsLogger::default(),
1232 );
1233 diagnostics_assertions::assert_data_tree!(inspector, root: {
1234 input_pipeline: {
1235 supported_input_devices: "Touch, ConsumerControls",
1236 handlers_registered: 1u64,
1237 handlers_healthy: 1u64,
1238 input_devices: {}
1239 }
1240 });
1241 }
1242}