1use crate::autorepeater::Autorepeater;
6use crate::display_ownership::DisplayOwnership;
7use crate::focus_listener::FocusListener;
8use crate::{input_device, input_handler, metrics};
9use anyhow::{format_err, Context, Error};
10use focus_chain_provider::FocusChainProviderPublisher;
11use fuchsia_fs::directory::{WatchEvent, Watcher};
12use fuchsia_inspect::health::Reporter;
13use fuchsia_inspect::NumericProperty;
14use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
15use futures::lock::Mutex;
16use futures::{StreamExt, TryStreamExt};
17use itertools::Itertools;
18use metrics_registry::*;
19use std::collections::HashMap;
20use std::path::PathBuf;
21use std::rc::Rc;
22use std::sync::atomic::{AtomicU32, Ordering};
23use std::sync::{Arc, LazyLock};
24use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
25
26static NEXT_DEVICE_ID: LazyLock<AtomicU32> = LazyLock::new(|| AtomicU32::new(10));
33
34fn get_next_device_id() -> u32 {
38 NEXT_DEVICE_ID.fetch_add(1, Ordering::SeqCst)
39}
40
41type BoxedInputDeviceBinding = Box<dyn input_device::InputDeviceBinding>;
42
43pub type InputDeviceBindingHashMap = Arc<Mutex<HashMap<u32, Vec<BoxedInputDeviceBinding>>>>;
46
47pub struct InputPipelineAssembly {
61 sender: UnboundedSender<input_device::InputEvent>,
64 receiver: UnboundedReceiver<input_device::InputEvent>,
68 tasks: Vec<fuchsia_async::Task<()>>,
72
73 metrics_logger: metrics::MetricsLogger,
75}
76
77impl InputPipelineAssembly {
78 pub fn new(metrics_logger: metrics::MetricsLogger) -> Self {
81 let (sender, receiver) = mpsc::unbounded();
82 let tasks = vec![];
83 InputPipelineAssembly { sender, receiver, tasks, metrics_logger }
84 }
85
86 pub fn add_handler(self, handler: Rc<dyn input_handler::InputHandler>) -> Self {
90 let (sender, mut receiver, mut tasks, metrics_logger) = self.into_components();
91 let metrics_logger_clone = metrics_logger.clone();
92 let (next_sender, next_receiver) = mpsc::unbounded();
93 let handler_name = handler.get_name();
94 tasks.push(fasync::Task::local(async move {
95 handler.clone().set_handler_healthy();
96 while let Some(event) = receiver.next().await {
97 let out_events = {
101 let _async_trace = fuchsia_trace::async_enter!(
102 fuchsia_trace::Id::random(),
103 c"input",
104 c"handle_input_event",
105 "name" => handler_name
106 );
107 handler.clone().handle_input_event(event).await
108 };
109 for out_event in out_events.into_iter() {
110 if let Err(e) = next_sender.unbounded_send(out_event) {
111 metrics_logger_clone.log_error(
112 InputPipelineErrorMetricDimensionEvent::InputPipelineCouldNotForwardEvent,
113 std::format!(
114 "could not forward event output from handler: {:?}: {:?}",
115 handler_name,
116 e));
117 break;
119 }
120 }
121 }
122 handler.clone().set_handler_unhealthy(std::format!("Receive loop terminated for handler: {:?}", handler_name).as_str());
123 panic!("receive loop is not supposed to terminate for handler: {:?}", handler_name);
124 }));
125 receiver = next_receiver;
126 InputPipelineAssembly { sender, receiver, tasks, metrics_logger }
127 }
128
129 pub fn add_all_handlers(self, handlers: Vec<Rc<dyn input_handler::InputHandler>>) -> Self {
131 handlers.into_iter().fold(self, |assembly, handler| assembly.add_handler(handler))
132 }
133
134 pub fn add_display_ownership(
140 self,
141 display_ownership_event: zx::Event,
142 input_handlers_node: &fuchsia_inspect::Node,
143 ) -> InputPipelineAssembly {
144 let (sender, autorepeat_receiver, mut tasks, metrics_logger) = self.into_components();
145 let (autorepeat_sender, receiver) = mpsc::unbounded();
146 let h = DisplayOwnership::new(display_ownership_event, input_handlers_node);
147 let metrics_logger_clone = metrics_logger.clone();
148 tasks.push(fasync::Task::local(async move {
149 h.clone().set_handler_healthy();
150 h.clone().handle_input_events(autorepeat_receiver, autorepeat_sender)
151 .await
152 .map_err(|e| {
153 metrics_logger_clone.log_error(
154 InputPipelineErrorMetricDimensionEvent::InputPipelineDisplayOwnershipIsNotSupposedToTerminate,
155 std::format!(
156 "display ownership is not supposed to terminate - this is likely a problem: {:?}", e));
157 }).unwrap();
158 h.set_handler_unhealthy("Receive loop terminated for handler: DisplayOwnership");
159 }));
160 InputPipelineAssembly { sender, receiver, tasks, metrics_logger }
161 }
162
163 pub fn add_autorepeater(self, input_handlers_node: &fuchsia_inspect::Node) -> Self {
167 let (sender, autorepeat_receiver, mut tasks, metrics_logger) = self.into_components();
168 let (autorepeat_sender, receiver) = mpsc::unbounded();
169 let metrics_logger_clone = metrics_logger.clone();
170 let a = Autorepeater::new(autorepeat_receiver, input_handlers_node, metrics_logger.clone());
171 tasks.push(fasync::Task::local(async move {
172 a.clone().set_handler_healthy();
173 a.clone()
174 .run(autorepeat_sender)
175 .await
176 .map_err(|e| {
177 metrics_logger_clone.log_error(
178 InputPipelineErrorMetricDimensionEvent::InputPipelineAutorepeatRunningError,
179 std::format!("error while running autorepeater: {:?}", e),
180 );
181 })
182 .expect("autorepeater should never error out");
183 a.set_handler_unhealthy("Receive loop terminated for handler: Autorepeater");
184 }));
185 InputPipelineAssembly { sender, receiver, tasks, metrics_logger }
186 }
187
188 fn into_components(
194 self,
195 ) -> (
196 UnboundedSender<input_device::InputEvent>,
197 UnboundedReceiver<input_device::InputEvent>,
198 Vec<fuchsia_async::Task<()>>,
199 metrics::MetricsLogger,
200 ) {
201 (self.sender, self.receiver, self.tasks, self.metrics_logger)
202 }
203
204 pub fn add_focus_listener(self, focus_chain_publisher: FocusChainProviderPublisher) -> Self {
217 let (sender, receiver, mut tasks, metrics_logger) = self.into_components();
218 let metrics_logger_clone = metrics_logger.clone();
219 tasks.push(fasync::Task::local(async move {
220 if let Ok(mut focus_listener) =
221 FocusListener::new(focus_chain_publisher, metrics_logger_clone).map_err(|e| {
222 log::warn!(
223 "could not create focus listener, focus will not be dispatched: {:?}",
224 e
225 )
226 })
227 {
228 let _result = focus_listener
231 .dispatch_focus_changes()
232 .await
233 .map(|_| {
234 log::warn!("dispatch focus loop ended, focus will no longer be dispatched")
235 })
236 .map_err(|e| {
237 panic!("could not dispatch focus changes, this is a fatal error: {:?}", e)
238 });
239 }
240 }));
241 InputPipelineAssembly { sender, receiver, tasks, metrics_logger }
242 }
243}
244
245pub struct InputPipeline {
273 pipeline_sender: UnboundedSender<input_device::InputEvent>,
277
278 device_event_sender: UnboundedSender<input_device::InputEvent>,
281
282 device_event_receiver: UnboundedReceiver<input_device::InputEvent>,
284
285 input_device_types: Vec<input_device::InputDeviceType>,
287
288 input_device_bindings: InputDeviceBindingHashMap,
290
291 inspect_node: fuchsia_inspect::Node,
294
295 metrics_logger: metrics::MetricsLogger,
297}
298
299impl InputPipeline {
300 fn new_common(
303 input_device_types: Vec<input_device::InputDeviceType>,
304 assembly: InputPipelineAssembly,
305 inspect_node: fuchsia_inspect::Node,
306 ) -> Self {
307 let (pipeline_sender, receiver, tasks, metrics_logger) = assembly.into_components();
308
309 inspect_node.record_string("supported_input_devices", input_device_types.iter().join(", "));
311 inspect_node.record_uint("handlers_registered", tasks.len() as u64);
312 inspect_node.record_uint("handlers_healthy", tasks.len() as u64);
313
314 InputPipeline::catch_unhandled(receiver);
317
318 InputPipeline::run(tasks);
320
321 let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
322 let input_device_bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
323 InputPipeline {
324 pipeline_sender,
325 device_event_sender,
326 device_event_receiver,
327 input_device_types,
328 input_device_bindings,
329 inspect_node,
330 metrics_logger,
331 }
332 }
333
334 pub fn new_for_test(
342 input_device_types: Vec<input_device::InputDeviceType>,
343 assembly: InputPipelineAssembly,
344 ) -> Self {
345 let inspector = fuchsia_inspect::Inspector::default();
346 let root = inspector.root();
347 let test_node = root.create_child("input_pipeline");
348 Self::new_common(input_device_types, assembly, test_node)
349 }
350
351 pub fn new(
358 input_device_types: Vec<input_device::InputDeviceType>,
359 assembly: InputPipelineAssembly,
360 inspect_node: fuchsia_inspect::Node,
361 metrics_logger: metrics::MetricsLogger,
362 ) -> Result<Self, Error> {
363 let input_pipeline = Self::new_common(input_device_types, assembly, inspect_node);
364 let input_device_types = input_pipeline.input_device_types.clone();
365 let input_event_sender = input_pipeline.device_event_sender.clone();
366 let input_device_bindings = input_pipeline.input_device_bindings.clone();
367 let devices_node = input_pipeline.inspect_node.create_child("input_devices");
368 fasync::Task::local(async move {
369 match async {
372 let dir_proxy = fuchsia_fs::directory::open_in_namespace(
373 input_device::INPUT_REPORT_PATH,
374 fuchsia_fs::PERM_READABLE,
375 )
376 .with_context(|| format!("failed to open {}", input_device::INPUT_REPORT_PATH))?;
377 let device_watcher =
378 Watcher::new(&dir_proxy).await.context("failed to create watcher")?;
379 Self::watch_for_devices(
380 device_watcher,
381 dir_proxy,
382 input_device_types,
383 input_event_sender,
384 input_device_bindings,
385 &devices_node,
386 false, metrics_logger.clone(),
388 )
389 .await
390 .context("failed to watch for devices")
391 }
392 .await
393 {
394 Ok(()) => {}
395 Err(err) => {
396 metrics_logger.log_warn(
400 InputPipelineErrorMetricDimensionEvent::InputPipelineUnableToWatchForNewInputDevices,
401 std::format!(
402 "Input pipeline is unable to watch for new input devices: {:?}",
403 err
404 ));
405 }
406 }
407 })
408 .detach();
409
410 Ok(input_pipeline)
411 }
412
413 pub fn input_device_bindings(&self) -> &InputDeviceBindingHashMap {
415 &self.input_device_bindings
416 }
417
418 pub fn input_event_sender(&self) -> &UnboundedSender<input_device::InputEvent> {
421 &self.device_event_sender
422 }
423
424 pub fn input_device_types(&self) -> &Vec<input_device::InputDeviceType> {
426 &self.input_device_types
427 }
428
429 pub async fn handle_input_events(mut self) {
431 let metrics_logger_clone = self.metrics_logger.clone();
432 while let Some(input_event) = self.device_event_receiver.next().await {
433 if let Err(e) = self.pipeline_sender.unbounded_send(input_event) {
434 metrics_logger_clone.log_error(
435 InputPipelineErrorMetricDimensionEvent::InputPipelineCouldNotForwardEventFromDriver,
436 std::format!("could not forward event from driver: {:?}", &e));
437 }
438 }
439
440 metrics_logger_clone.log_error(
441 InputPipelineErrorMetricDimensionEvent::InputPipelineStopHandlingEvents,
442 "Input pipeline stopped handling input events.".to_string(),
443 );
444 }
445
446 async fn watch_for_devices(
462 mut device_watcher: Watcher,
463 dir_proxy: fio::DirectoryProxy,
464 device_types: Vec<input_device::InputDeviceType>,
465 input_event_sender: UnboundedSender<input_device::InputEvent>,
466 bindings: InputDeviceBindingHashMap,
467 input_devices_node: &fuchsia_inspect::Node,
468 break_on_idle: bool,
469 metrics_logger: metrics::MetricsLogger,
470 ) -> Result<(), Error> {
471 let devices_discovered = input_devices_node.create_uint("devices_discovered", 0);
473 let devices_connected = input_devices_node.create_uint("devices_connected", 0);
474 while let Some(msg) = device_watcher.try_next().await? {
475 if let Ok(filename) = msg.filename.into_os_string().into_string() {
476 if filename == "." {
477 continue;
478 }
479
480 let pathbuf = PathBuf::from(filename.clone());
481 match msg.event {
482 WatchEvent::EXISTING | WatchEvent::ADD_FILE => {
483 log::info!("found input device {}", filename);
484 devices_discovered.add(1);
485 let device_proxy =
486 input_device::get_device_from_dir_entry_path(&dir_proxy, &pathbuf)?;
487 add_device_bindings(
488 &device_types,
489 &filename,
490 device_proxy,
491 &input_event_sender,
492 &bindings,
493 get_next_device_id(),
494 input_devices_node,
495 Some(&devices_connected),
496 metrics_logger.clone(),
497 )
498 .await;
499 }
500 WatchEvent::IDLE => {
501 if break_on_idle {
502 break;
503 }
504 }
505 _ => (),
506 }
507 }
508 }
509 input_devices_node.record(devices_discovered);
511 input_devices_node.record(devices_connected);
512 Err(format_err!("Input pipeline stopped watching for new input devices."))
513 }
514
515 pub async fn handle_input_device_registry_request_stream(
530 mut stream: fidl_fuchsia_input_injection::InputDeviceRegistryRequestStream,
531 device_types: &Vec<input_device::InputDeviceType>,
532 input_event_sender: &UnboundedSender<input_device::InputEvent>,
533 bindings: &InputDeviceBindingHashMap,
534 input_devices_node: &fuchsia_inspect::Node,
535 metrics_logger: metrics::MetricsLogger,
536 ) -> Result<(), Error> {
537 while let Some(request) = stream
538 .try_next()
539 .await
540 .context("Error handling input device registry request stream")?
541 {
542 match request {
543 fidl_fuchsia_input_injection::InputDeviceRegistryRequest::Register {
544 device,
545 ..
546 } => {
547 let device_proxy = device.into_proxy();
549
550 let device_id = get_next_device_id();
551
552 add_device_bindings(
553 device_types,
554 &format!("input-device-registry-{}", device_id),
555 device_proxy,
556 input_event_sender,
557 bindings,
558 device_id,
559 input_devices_node,
560 None,
561 metrics_logger.clone(),
562 )
563 .await;
564 }
565 fidl_fuchsia_input_injection::InputDeviceRegistryRequest::RegisterAndGetDeviceInfo {
566 device,
567 responder,
568 .. } => {
569 let device_proxy = device.into_proxy();
571
572 let device_id = get_next_device_id();
573
574 add_device_bindings(
575 device_types,
576 &format!("input-device-registry-{}", device_id),
577 device_proxy,
578 input_event_sender,
579 bindings,
580 device_id,
581 input_devices_node,
582 None,
583 metrics_logger.clone(),
584 )
585 .await;
586
587 responder.send(fidl_fuchsia_input_injection::InputDeviceRegistryRegisterAndGetDeviceInfoResponse{
588 device_id: Some(device_id),
589 ..Default::default()
590 }).expect("Failed to respond to RegisterAndGetDeviceInfo request");
591 }
592 }
593 }
594
595 Ok(())
596 }
597
598 fn run(tasks: Vec<fuchsia_async::Task<()>>) {
600 fasync::Task::local(async move {
601 futures::future::join_all(tasks).await;
602 panic!("Runner task is not supposed to terminate.")
603 })
604 .detach();
605 }
606
607 fn catch_unhandled(mut receiver: UnboundedReceiver<input_device::InputEvent>) {
610 fasync::Task::local(async move {
611 while let Some(event) = receiver.next().await {
612 if event.handled == input_device::Handled::No {
613 log::warn!("unhandled input event: {:?}", &event);
614 }
615 }
616 panic!("unhandled event catcher is not supposed to terminate.");
617 })
618 .detach();
619 }
620}
621
622async fn add_device_bindings(
642 device_types: &Vec<input_device::InputDeviceType>,
643 filename: &String,
644 device_proxy: fidl_fuchsia_input_report::InputDeviceProxy,
645 input_event_sender: &UnboundedSender<input_device::InputEvent>,
646 bindings: &InputDeviceBindingHashMap,
647 device_id: u32,
648 input_devices_node: &fuchsia_inspect::Node,
649 devices_connected: Option<&fuchsia_inspect::UintProperty>,
650 metrics_logger: metrics::MetricsLogger,
651) {
652 let mut matched_device_types = vec![];
653 if let Ok(descriptor) = device_proxy.get_descriptor().await {
654 for device_type in device_types {
655 if input_device::is_device_type(&descriptor, *device_type).await {
656 matched_device_types.push(device_type);
657 match devices_connected {
658 Some(dev_connected) => {
659 let _ = dev_connected.add(1);
660 }
661 None => (),
662 };
663 }
664 }
665 if matched_device_types.is_empty() {
666 log::info!(
667 "device {} did not match any supported device types: {:?}",
668 filename,
669 device_types
670 );
671 let device_node = input_devices_node.create_child(format!("{}_Unsupported", filename));
672 let mut health = fuchsia_inspect::health::Node::new(&device_node);
673 health.set_unhealthy("Unsupported device type.");
674 device_node.record(health);
675 input_devices_node.record(device_node);
676 return;
677 }
678 } else {
679 metrics_logger.clone().log_error(
680 InputPipelineErrorMetricDimensionEvent::InputPipelineNoDeviceDescriptor,
681 std::format!("cannot bind device {} without a device descriptor", filename),
682 );
683 return;
684 }
685
686 log::info!(
687 "binding {} to device types: {}",
688 filename,
689 matched_device_types
690 .iter()
691 .fold(String::new(), |device_types_string, device_type| device_types_string
692 + &format!("{:?}, ", device_type))
693 );
694
695 let mut new_bindings: Vec<BoxedInputDeviceBinding> = vec![];
696 for device_type in matched_device_types {
697 let proxy = device_proxy.clone();
718 let device_node = input_devices_node.create_child(format!("{}_{}", filename, device_type));
719 match input_device::get_device_binding(
720 *device_type,
721 proxy,
722 device_id,
723 input_event_sender.clone(),
724 device_node,
725 metrics_logger.clone(),
726 )
727 .await
728 {
729 Ok(binding) => new_bindings.push(binding),
730 Err(e) => {
731 metrics_logger.log_error(
732 InputPipelineErrorMetricDimensionEvent::InputPipelineFailedToBind,
733 std::format!("failed to bind {} as {:?}: {}", filename, device_type, e),
734 );
735 }
736 }
737 }
738
739 if !new_bindings.is_empty() {
740 let mut bindings = bindings.lock().await;
741 bindings.entry(device_id).or_insert(Vec::new()).extend(new_bindings);
742 }
743}
744
745#[cfg(test)]
746mod tests {
747 use super::*;
748 use crate::input_device::InputDeviceBinding;
749 use crate::utils::Position;
750 use crate::{
751 fake_input_device_binding, mouse_binding, mouse_model_database,
752 observe_fake_events_input_handler,
753 };
754 use diagnostics_assertions::AnyProperty;
755 use fidl::endpoints::{create_proxy_and_stream, create_request_stream};
756 use fuchsia_async as fasync;
757 use futures::FutureExt;
758 use pretty_assertions::assert_eq;
759 use rand::Rng;
760 use std::collections::HashSet;
761 use vfs::{pseudo_directory, service as pseudo_fs_service};
762
763 const COUNTS_PER_MM: u32 = 12;
764
765 fn send_input_event(
770 sender: UnboundedSender<input_device::InputEvent>,
771 ) -> input_device::InputEvent {
772 let mut rng = rand::rng();
773 let offset =
774 Position { x: rng.random_range(0..10) as f32, y: rng.random_range(0..10) as f32 };
775 let input_event = input_device::InputEvent {
776 device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
777 mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
778 millimeters: Position {
779 x: offset.x / COUNTS_PER_MM as f32,
780 y: offset.y / COUNTS_PER_MM as f32,
781 },
782 }),
783 None, None, mouse_binding::MousePhase::Move,
786 HashSet::new(),
787 HashSet::new(),
788 None, )),
790 device_descriptor: input_device::InputDeviceDescriptor::Mouse(
791 mouse_binding::MouseDeviceDescriptor {
792 device_id: 1,
793 absolute_x_range: None,
794 absolute_y_range: None,
795 wheel_v_range: None,
796 wheel_h_range: None,
797 buttons: None,
798 counts_per_mm: COUNTS_PER_MM,
799 },
800 ),
801 event_time: zx::MonotonicInstant::get(),
802 handled: input_device::Handled::No,
803 trace_id: None,
804 };
805 match sender.unbounded_send(input_event.clone()) {
806 Err(_) => assert!(false),
807 _ => {}
808 }
809
810 input_event
811 }
812
813 fn handle_input_device_request(
818 input_device_request: fidl_fuchsia_input_report::InputDeviceRequest,
819 ) {
820 match input_device_request {
821 fidl_fuchsia_input_report::InputDeviceRequest::GetDescriptor { responder } => {
822 let _ = responder.send(&fidl_fuchsia_input_report::DeviceDescriptor {
823 device_information: None,
824 mouse: Some(fidl_fuchsia_input_report::MouseDescriptor {
825 input: Some(fidl_fuchsia_input_report::MouseInputDescriptor {
826 movement_x: None,
827 movement_y: None,
828 scroll_v: None,
829 scroll_h: None,
830 buttons: Some(vec![0]),
831 position_x: None,
832 position_y: None,
833 ..Default::default()
834 }),
835 ..Default::default()
836 }),
837 sensor: None,
838 touch: None,
839 keyboard: None,
840 consumer_control: None,
841 ..Default::default()
842 });
843 }
844 _ => {}
845 }
846 }
847
848 #[fasync::run_singlethreaded(test)]
850 async fn multiple_devices_single_handler() {
851 let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
853 let first_device_binding =
854 fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
855 let second_device_binding =
856 fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
857
858 let (handler_event_sender, mut handler_event_receiver) =
860 futures::channel::mpsc::channel(100);
861 let input_handler = observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
862 handler_event_sender,
863 );
864
865 let (sender, receiver, tasks, _) =
867 InputPipelineAssembly::new(metrics::MetricsLogger::default())
868 .add_handler(input_handler)
869 .into_components();
870 let inspector = fuchsia_inspect::Inspector::default();
871 let test_node = inspector.root().create_child("input_pipeline");
872 let input_pipeline = InputPipeline {
873 pipeline_sender: sender,
874 device_event_sender,
875 device_event_receiver,
876 input_device_types: vec![],
877 input_device_bindings: Arc::new(Mutex::new(HashMap::new())),
878 inspect_node: test_node,
879 metrics_logger: metrics::MetricsLogger::default(),
880 };
881 InputPipeline::catch_unhandled(receiver);
882 InputPipeline::run(tasks);
883
884 let first_device_event = send_input_event(first_device_binding.input_event_sender());
886 let second_device_event = send_input_event(second_device_binding.input_event_sender());
887
888 fasync::Task::local(async {
890 input_pipeline.handle_input_events().await;
891 })
892 .detach();
893
894 let first_handled_event = handler_event_receiver.next().await;
896 assert_eq!(first_handled_event, Some(first_device_event));
897
898 let second_handled_event = handler_event_receiver.next().await;
899 assert_eq!(second_handled_event, Some(second_device_event));
900 }
901
902 #[fasync::run_singlethreaded(test)]
904 async fn single_device_multiple_handlers() {
905 let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
907 let input_device_binding =
908 fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
909
910 let (first_handler_event_sender, mut first_handler_event_receiver) =
912 futures::channel::mpsc::channel(100);
913 let first_input_handler =
914 observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
915 first_handler_event_sender,
916 );
917 let (second_handler_event_sender, mut second_handler_event_receiver) =
918 futures::channel::mpsc::channel(100);
919 let second_input_handler =
920 observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
921 second_handler_event_sender,
922 );
923
924 let (sender, receiver, tasks, _) =
926 InputPipelineAssembly::new(metrics::MetricsLogger::default())
927 .add_handler(first_input_handler)
928 .add_handler(second_input_handler)
929 .into_components();
930 let inspector = fuchsia_inspect::Inspector::default();
931 let test_node = inspector.root().create_child("input_pipeline");
932 let input_pipeline = InputPipeline {
933 pipeline_sender: sender,
934 device_event_sender,
935 device_event_receiver,
936 input_device_types: vec![],
937 input_device_bindings: Arc::new(Mutex::new(HashMap::new())),
938 inspect_node: test_node,
939 metrics_logger: metrics::MetricsLogger::default(),
940 };
941 InputPipeline::catch_unhandled(receiver);
942 InputPipeline::run(tasks);
943
944 let input_event = send_input_event(input_device_binding.input_event_sender());
946
947 fasync::Task::local(async {
949 input_pipeline.handle_input_events().await;
950 })
951 .detach();
952
953 let first_handler_event = first_handler_event_receiver.next().await;
955 assert_eq!(first_handler_event, Some(input_event.clone()));
956 let second_handler_event = second_handler_event_receiver.next().await;
957 assert_eq!(second_handler_event, Some(input_event));
958 }
959
960 #[fasync::run_singlethreaded(test)]
963 async fn watch_devices_one_match_exists() {
964 let mut count: i8 = 0;
966 let dir = pseudo_directory! {
967 "file_name" => pseudo_fs_service::host(
968 move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
969 async move {
970 while count < 3 {
971 if let Some(input_device_request) =
972 request_stream.try_next().await.unwrap()
973 {
974 handle_input_device_request(input_device_request);
975 count += 1;
976 }
977 }
978
979 }.boxed()
980 },
981 )
982 };
983
984 let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
986 let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
987 let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
990
991 let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
992 let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
993 let supported_device_types = vec![input_device::InputDeviceType::Mouse];
994
995 let inspector = fuchsia_inspect::Inspector::default();
996 let test_node = inspector.root().create_child("input_pipeline");
997 test_node.record_string(
998 "supported_input_devices",
999 supported_device_types.clone().iter().join(", "),
1000 );
1001 let input_devices = test_node.create_child("input_devices");
1002 diagnostics_assertions::assert_data_tree!(inspector, root: {
1004 input_pipeline: {
1005 supported_input_devices: "Mouse",
1006 input_devices: {}
1007 }
1008 });
1009
1010 let _ = InputPipeline::watch_for_devices(
1011 device_watcher,
1012 dir_proxy_for_pipeline,
1013 supported_device_types,
1014 input_event_sender,
1015 bindings.clone(),
1016 &input_devices,
1017 true, metrics::MetricsLogger::default(),
1019 )
1020 .await;
1021
1022 let bindings_hashmap = bindings.lock().await;
1024 assert_eq!(bindings_hashmap.len(), 1);
1025 let bindings_vector = bindings_hashmap.get(&10);
1026 assert!(bindings_vector.is_some());
1027 assert_eq!(bindings_vector.unwrap().len(), 1);
1028 let boxed_mouse_binding = bindings_vector.unwrap().get(0);
1029 assert!(boxed_mouse_binding.is_some());
1030 assert_eq!(
1031 boxed_mouse_binding.unwrap().get_device_descriptor(),
1032 input_device::InputDeviceDescriptor::Mouse(mouse_binding::MouseDeviceDescriptor {
1033 device_id: 10,
1034 absolute_x_range: None,
1035 absolute_y_range: None,
1036 wheel_v_range: None,
1037 wheel_h_range: None,
1038 buttons: Some(vec![0]),
1039 counts_per_mm: mouse_model_database::db::DEFAULT_COUNTS_PER_MM,
1040 })
1041 );
1042
1043 diagnostics_assertions::assert_data_tree!(inspector, root: {
1045 input_pipeline: {
1046 supported_input_devices: "Mouse",
1047 input_devices: {
1048 devices_discovered: 1u64,
1049 devices_connected: 1u64,
1050 "file_name_Mouse": contains {
1051 reports_received_count: 0u64,
1052 reports_filtered_count: 0u64,
1053 events_generated: 0u64,
1054 last_received_timestamp_ns: 0u64,
1055 last_generated_timestamp_ns: 0u64,
1056 "fuchsia.inspect.Health": {
1057 status: "OK",
1058 start_timestamp_nanos: AnyProperty
1061 },
1062 }
1063 }
1064 }
1065 });
1066 }
1067
1068 #[fasync::run_singlethreaded(test)]
1071 async fn watch_devices_no_matches_exist() {
1072 let mut count: i8 = 0;
1074 let dir = pseudo_directory! {
1075 "file_name" => pseudo_fs_service::host(
1076 move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1077 async move {
1078 while count < 1 {
1079 if let Some(input_device_request) =
1080 request_stream.try_next().await.unwrap()
1081 {
1082 handle_input_device_request(input_device_request);
1083 count += 1;
1084 }
1085 }
1086
1087 }.boxed()
1088 },
1089 )
1090 };
1091
1092 let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1094 let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1095 let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1098
1099 let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1100 let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1101 let supported_device_types = vec![input_device::InputDeviceType::Keyboard];
1102
1103 let inspector = fuchsia_inspect::Inspector::default();
1104 let test_node = inspector.root().create_child("input_pipeline");
1105 test_node.record_string(
1106 "supported_input_devices",
1107 supported_device_types.clone().iter().join(", "),
1108 );
1109 let input_devices = test_node.create_child("input_devices");
1110 diagnostics_assertions::assert_data_tree!(inspector, root: {
1112 input_pipeline: {
1113 supported_input_devices: "Keyboard",
1114 input_devices: {}
1115 }
1116 });
1117
1118 let _ = InputPipeline::watch_for_devices(
1119 device_watcher,
1120 dir_proxy_for_pipeline,
1121 supported_device_types,
1122 input_event_sender,
1123 bindings.clone(),
1124 &input_devices,
1125 true, metrics::MetricsLogger::default(),
1127 )
1128 .await;
1129
1130 let bindings = bindings.lock().await;
1132 assert_eq!(bindings.len(), 0);
1133
1134 diagnostics_assertions::assert_data_tree!(inspector, root: {
1136 input_pipeline: {
1137 supported_input_devices: "Keyboard",
1138 input_devices: {
1139 devices_discovered: 1u64,
1140 devices_connected: 0u64,
1141 "file_name_Unsupported": {
1142 "fuchsia.inspect.Health": {
1143 status: "UNHEALTHY",
1144 message: "Unsupported device type.",
1145 start_timestamp_nanos: AnyProperty
1148 },
1149 }
1150 }
1151 }
1152 });
1153 }
1154
1155 #[fasync::run_singlethreaded(test)]
1158 async fn handle_input_device_registry_request_stream() {
1159 let (input_device_registry_proxy, input_device_registry_request_stream) =
1160 create_proxy_and_stream::<fidl_fuchsia_input_injection::InputDeviceRegistryMarker>();
1161 let (input_device_client_end, mut input_device_request_stream) =
1162 create_request_stream::<fidl_fuchsia_input_report::InputDeviceMarker>();
1163
1164 let device_types = vec![input_device::InputDeviceType::Mouse];
1165 let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1166 let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1167
1168 let mut count: i8 = 0;
1170 fasync::Task::local(async move {
1171 let _ = input_device_registry_proxy.register(input_device_client_end);
1173
1174 while count < 3 {
1175 if let Some(input_device_request) =
1176 input_device_request_stream.try_next().await.unwrap()
1177 {
1178 handle_input_device_request(input_device_request);
1179 count += 1;
1180 }
1181 }
1182
1183 input_device_registry_proxy.take_event_stream();
1185 })
1186 .detach();
1187
1188 let inspector = fuchsia_inspect::Inspector::default();
1189 let test_node = inspector.root().create_child("input_pipeline");
1190
1191 let bindings_clone = bindings.clone();
1193 let _ = InputPipeline::handle_input_device_registry_request_stream(
1194 input_device_registry_request_stream,
1195 &device_types,
1196 &input_event_sender,
1197 &bindings_clone,
1198 &test_node,
1199 metrics::MetricsLogger::default(),
1200 )
1201 .await;
1202
1203 let bindings = bindings.lock().await;
1205 assert_eq!(bindings.len(), 1);
1206 }
1207
1208 #[fasync::run_singlethreaded(test)]
1210 async fn check_inspect_node_has_correct_properties() {
1211 let device_types = vec![
1212 input_device::InputDeviceType::Touch,
1213 input_device::InputDeviceType::ConsumerControls,
1214 ];
1215 let inspector = fuchsia_inspect::Inspector::default();
1216 let test_node = inspector.root().create_child("input_pipeline");
1217 let (fake_handler_event_sender, _fake_handler_event_receiver) =
1219 futures::channel::mpsc::channel(100);
1220 let fake_input_handler =
1221 observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
1222 fake_handler_event_sender,
1223 );
1224 let assembly = InputPipelineAssembly::new(metrics::MetricsLogger::default())
1225 .add_handler(fake_input_handler);
1226 let _test_input_pipeline = InputPipeline::new(
1227 device_types,
1228 assembly,
1229 test_node,
1230 metrics::MetricsLogger::default(),
1231 );
1232 diagnostics_assertions::assert_data_tree!(inspector, root: {
1233 input_pipeline: {
1234 supported_input_devices: "Touch, ConsumerControls",
1235 handlers_registered: 1u64,
1236 handlers_healthy: 1u64,
1237 input_devices: {}
1238 }
1239 });
1240 }
1241}