1use crate::display_ownership::DisplayOwnership;
6use crate::focus_listener::FocusListener;
7use crate::input_handler::Handler;
8use crate::{input_device, input_handler, metrics};
9use anyhow::{Context, Error, format_err};
10use focus_chain_provider::FocusChainProviderPublisher;
11use fuchsia_fs::directory::{WatchEvent, Watcher};
12use fuchsia_inspect::NumericProperty;
13use fuchsia_inspect::health::Reporter;
14use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
15use futures::future::LocalBoxFuture;
16use futures::lock::Mutex;
17use futures::{StreamExt, TryStreamExt};
18use itertools::Itertools;
19use metrics_registry::*;
20use std::collections::HashMap;
21use std::path::PathBuf;
22use std::rc::Rc;
23use std::sync::atomic::{AtomicU32, Ordering};
24use std::sync::{Arc, LazyLock};
25use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
26
27static NEXT_DEVICE_ID: LazyLock<AtomicU32> = LazyLock::new(|| AtomicU32::new(10));
34
35fn get_next_device_id() -> u32 {
39 NEXT_DEVICE_ID.fetch_add(1, Ordering::SeqCst)
40}
41
42type BoxedInputDeviceBinding = Box<dyn input_device::InputDeviceBinding>;
43
44pub type InputDeviceBindingHashMap = Arc<Mutex<HashMap<u32, Vec<BoxedInputDeviceBinding>>>>;
47
48pub struct InputPipelineAssembly {
62 sender: UnboundedSender<input_device::InputEvent>,
65 receiver: UnboundedReceiver<input_device::InputEvent>,
68
69 handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>,
71
72 display_ownership_fut: Option<LocalBoxFuture<'static, ()>>,
74
75 focus_listener_fut: Option<LocalBoxFuture<'static, ()>>,
77
78 metrics_logger: metrics::MetricsLogger,
80}
81
82impl InputPipelineAssembly {
83 pub fn new(metrics_logger: metrics::MetricsLogger) -> Self {
86 let (sender, receiver) = mpsc::unbounded();
87 InputPipelineAssembly {
88 sender,
89 receiver,
90 handlers: vec![],
91 metrics_logger,
92 display_ownership_fut: None,
93 focus_listener_fut: None,
94 }
95 }
96
97 pub fn add_handler(mut self, handler: Rc<dyn input_handler::BatchInputHandler>) -> Self {
100 self.handlers.push(handler);
101 self
102 }
103
104 pub fn add_all_handlers(self, handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>) -> Self {
106 handlers.into_iter().fold(self, |assembly, handler| assembly.add_handler(handler))
107 }
108
109 pub fn add_display_ownership(
110 mut self,
111 display_ownership_event: zx::Event,
112 input_handlers_node: &fuchsia_inspect::Node,
113 ) -> InputPipelineAssembly {
114 let h = DisplayOwnership::new(display_ownership_event, input_handlers_node);
115 let metrics_logger_clone = self.metrics_logger.clone();
116 let h_clone = h.clone();
117 let sender_clone = self.sender.clone();
118 let display_ownership_fut = Box::pin(async move {
119 h_clone.clone().set_handler_healthy();
120 h_clone.clone()
121 .handle_ownership_change(sender_clone)
122 .await
123 .map_err(|e| {
124 metrics_logger_clone.log_error(
125 InputPipelineErrorMetricDimensionEvent::InputPipelineDisplayOwnershipIsNotSupposedToTerminate,
126 std::format!(
127 "display ownership is not supposed to terminate - this is likely a problem: {:?}", e));
128 })
129 .unwrap();
130 h_clone.set_handler_unhealthy("Receive loop terminated for handler: DisplayOwnership");
131 });
132 self.display_ownership_fut = Some(display_ownership_fut);
133 self.add_handler(h)
134 }
135
136 fn into_components(
142 self,
143 ) -> (
144 UnboundedSender<input_device::InputEvent>,
145 UnboundedReceiver<input_device::InputEvent>,
146 Vec<Rc<dyn input_handler::BatchInputHandler>>,
147 metrics::MetricsLogger,
148 Option<LocalBoxFuture<'static, ()>>,
149 Option<LocalBoxFuture<'static, ()>>,
150 ) {
151 (
152 self.sender,
153 self.receiver,
154 self.handlers,
155 self.metrics_logger,
156 self.display_ownership_fut,
157 self.focus_listener_fut,
158 )
159 }
160
161 pub fn add_focus_listener(
162 mut self,
163 focus_chain_publisher: FocusChainProviderPublisher,
164 ) -> Self {
165 let metrics_logger_clone = self.metrics_logger.clone();
166 let focus_listener_fut = Box::pin(async move {
167 if let Ok(mut focus_listener) =
168 FocusListener::new(focus_chain_publisher, metrics_logger_clone).map_err(|e| {
169 log::warn!(
170 "could not create focus listener, focus will not be dispatched: {:?}",
171 e
172 )
173 })
174 {
175 let _result = focus_listener
178 .dispatch_focus_changes()
179 .await
180 .map(|_| {
181 log::warn!("dispatch focus loop ended, focus will no longer be dispatched")
182 })
183 .map_err(|e| {
184 panic!("could not dispatch focus changes, this is a fatal error: {:?}", e)
185 });
186 }
187 });
188 self.focus_listener_fut = Some(focus_listener_fut);
189 self
190 }
191}
192
193pub struct InputPipeline {
221 pipeline_sender: UnboundedSender<input_device::InputEvent>,
225
226 device_event_sender: UnboundedSender<input_device::InputEvent>,
229
230 device_event_receiver: UnboundedReceiver<input_device::InputEvent>,
232
233 input_device_types: Vec<input_device::InputDeviceType>,
235
236 input_device_bindings: InputDeviceBindingHashMap,
238
239 inspect_node: fuchsia_inspect::Node,
242
243 metrics_logger: metrics::MetricsLogger,
245}
246
247impl InputPipeline {
248 fn new_common(
249 input_device_types: Vec<input_device::InputDeviceType>,
250 assembly: InputPipelineAssembly,
251 inspect_node: fuchsia_inspect::Node,
252 ) -> Self {
253 let (
254 pipeline_sender,
255 receiver,
256 handlers,
257 metrics_logger,
258 display_ownership_fut,
259 focus_listener_fut,
260 ) = assembly.into_components();
261
262 let mut handlers_count = handlers.len();
263 if let Some(fut) = display_ownership_fut {
265 fasync::Task::local(fut).detach();
266 handlers_count += 1;
267 }
268
269 if let Some(fut) = focus_listener_fut {
271 fasync::Task::local(fut).detach();
272 handlers_count += 1;
273 }
274
275 inspect_node.record_string("supported_input_devices", input_device_types.iter().join(", "));
277 inspect_node.record_uint("handlers_registered", handlers_count as u64);
278 inspect_node.record_uint("handlers_healthy", handlers_count as u64);
279
280 InputPipeline::run(receiver, handlers);
282
283 let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
284 let input_device_bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
285 InputPipeline {
286 pipeline_sender,
287 device_event_sender,
288 device_event_receiver,
289 input_device_types,
290 input_device_bindings,
291 inspect_node,
292 metrics_logger,
293 }
294 }
295
296 pub fn new_for_test(
304 input_device_types: Vec<input_device::InputDeviceType>,
305 assembly: InputPipelineAssembly,
306 ) -> Self {
307 let inspector = fuchsia_inspect::Inspector::default();
308 let root = inspector.root();
309 let test_node = root.create_child("input_pipeline");
310 Self::new_common(input_device_types, assembly, test_node)
311 }
312
313 pub fn new(
320 input_device_types: Vec<input_device::InputDeviceType>,
321 assembly: InputPipelineAssembly,
322 inspect_node: fuchsia_inspect::Node,
323 metrics_logger: metrics::MetricsLogger,
324 ) -> Result<Self, Error> {
325 let input_pipeline = Self::new_common(input_device_types, assembly, inspect_node);
326 let input_device_types = input_pipeline.input_device_types.clone();
327 let input_event_sender = input_pipeline.device_event_sender.clone();
328 let input_device_bindings = input_pipeline.input_device_bindings.clone();
329 let devices_node = input_pipeline.inspect_node.create_child("input_devices");
330 fasync::Task::local(async move {
331 match async {
334 let dir_proxy = fuchsia_fs::directory::open_in_namespace(
335 input_device::INPUT_REPORT_PATH,
336 fuchsia_fs::PERM_READABLE,
337 )
338 .with_context(|| format!("failed to open {}", input_device::INPUT_REPORT_PATH))?;
339 let device_watcher =
340 Watcher::new(&dir_proxy).await.context("failed to create watcher")?;
341 Self::watch_for_devices(
342 device_watcher,
343 dir_proxy,
344 input_device_types,
345 input_event_sender,
346 input_device_bindings,
347 &devices_node,
348 false, metrics_logger.clone(),
350 )
351 .await
352 .context("failed to watch for devices")
353 }
354 .await
355 {
356 Ok(()) => {}
357 Err(err) => {
358 metrics_logger.log_warn(
362 InputPipelineErrorMetricDimensionEvent::InputPipelineUnableToWatchForNewInputDevices,
363 std::format!(
364 "Input pipeline is unable to watch for new input devices: {:?}",
365 err
366 ));
367 }
368 }
369 })
370 .detach();
371
372 Ok(input_pipeline)
373 }
374
375 pub fn input_device_bindings(&self) -> &InputDeviceBindingHashMap {
377 &self.input_device_bindings
378 }
379
380 pub fn input_event_sender(&self) -> &UnboundedSender<input_device::InputEvent> {
383 &self.device_event_sender
384 }
385
386 pub fn input_device_types(&self) -> &Vec<input_device::InputDeviceType> {
388 &self.input_device_types
389 }
390
391 pub async fn handle_input_events(mut self) {
393 let metrics_logger_clone = self.metrics_logger.clone();
394 while let Some(input_event) = self.device_event_receiver.next().await {
395 if let Err(e) = self.pipeline_sender.unbounded_send(input_event) {
396 metrics_logger_clone.log_error(
397 InputPipelineErrorMetricDimensionEvent::InputPipelineCouldNotForwardEventFromDriver,
398 std::format!("could not forward event from driver: {:?}", &e));
399 }
400 }
401
402 metrics_logger_clone.log_error(
403 InputPipelineErrorMetricDimensionEvent::InputPipelineStopHandlingEvents,
404 "Input pipeline stopped handling input events.".to_string(),
405 );
406 }
407
408 async fn watch_for_devices(
424 mut device_watcher: Watcher,
425 dir_proxy: fio::DirectoryProxy,
426 device_types: Vec<input_device::InputDeviceType>,
427 input_event_sender: UnboundedSender<input_device::InputEvent>,
428 bindings: InputDeviceBindingHashMap,
429 input_devices_node: &fuchsia_inspect::Node,
430 break_on_idle: bool,
431 metrics_logger: metrics::MetricsLogger,
432 ) -> Result<(), Error> {
433 let devices_discovered = input_devices_node.create_uint("devices_discovered", 0);
435 let devices_connected = input_devices_node.create_uint("devices_connected", 0);
436 while let Some(msg) = device_watcher.try_next().await? {
437 if let Ok(filename) = msg.filename.into_os_string().into_string() {
438 if filename == "." {
439 continue;
440 }
441
442 let pathbuf = PathBuf::from(filename.clone());
443 match msg.event {
444 WatchEvent::EXISTING | WatchEvent::ADD_FILE => {
445 log::info!("found input device {}", filename);
446 devices_discovered.add(1);
447 let device_proxy =
448 input_device::get_device_from_dir_entry_path(&dir_proxy, &pathbuf)?;
449 add_device_bindings(
450 &device_types,
451 &filename,
452 device_proxy,
453 &input_event_sender,
454 &bindings,
455 get_next_device_id(),
456 input_devices_node,
457 Some(&devices_connected),
458 metrics_logger.clone(),
459 )
460 .await;
461 }
462 WatchEvent::IDLE => {
463 if break_on_idle {
464 break;
465 }
466 }
467 _ => (),
468 }
469 }
470 }
471 input_devices_node.record(devices_discovered);
473 input_devices_node.record(devices_connected);
474 Err(format_err!("Input pipeline stopped watching for new input devices."))
475 }
476
477 pub async fn handle_input_device_registry_request_stream(
492 mut stream: fidl_fuchsia_input_injection::InputDeviceRegistryRequestStream,
493 device_types: &Vec<input_device::InputDeviceType>,
494 input_event_sender: &UnboundedSender<input_device::InputEvent>,
495 bindings: &InputDeviceBindingHashMap,
496 input_devices_node: &fuchsia_inspect::Node,
497 metrics_logger: metrics::MetricsLogger,
498 ) -> Result<(), Error> {
499 while let Some(request) = stream
500 .try_next()
501 .await
502 .context("Error handling input device registry request stream")?
503 {
504 match request {
505 fidl_fuchsia_input_injection::InputDeviceRegistryRequest::Register {
506 device,
507 ..
508 } => {
509 let device_proxy = device.into_proxy();
511
512 let device_id = get_next_device_id();
513
514 add_device_bindings(
515 device_types,
516 &format!("input-device-registry-{}", device_id),
517 device_proxy,
518 input_event_sender,
519 bindings,
520 device_id,
521 input_devices_node,
522 None,
523 metrics_logger.clone(),
524 )
525 .await;
526 }
527 fidl_fuchsia_input_injection::InputDeviceRegistryRequest::RegisterAndGetDeviceInfo {
528 device,
529 responder,
530 .. } => {
531 let device_proxy = device.into_proxy();
533
534 let device_id = get_next_device_id();
535
536 add_device_bindings(
537 device_types,
538 &format!("input-device-registry-{}", device_id),
539 device_proxy,
540 input_event_sender,
541 bindings,
542 device_id,
543 input_devices_node,
544 None,
545 metrics_logger.clone(),
546 )
547 .await;
548
549 responder.send(fidl_fuchsia_input_injection::InputDeviceRegistryRegisterAndGetDeviceInfoResponse{
550 device_id: Some(device_id),
551 ..Default::default()
552 }).expect("Failed to respond to RegisterAndGetDeviceInfo request");
553 }
554 }
555 }
556
557 Ok(())
558 }
559
560 fn run(
562 mut receiver: UnboundedReceiver<input_device::InputEvent>,
563 handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>,
564 ) {
565 fasync::Task::local(async move {
566 for handler in &handlers {
567 handler.clone().set_handler_healthy();
568 }
569
570 use input_device::InputEventType;
571 use std::collections::HashMap;
572
573 let mut handlers_by_type: HashMap<
575 InputEventType,
576 Vec<Rc<dyn input_handler::BatchInputHandler>>,
577 > = HashMap::new();
578
579 let event_types = vec![
581 InputEventType::Keyboard,
582 InputEventType::LightSensor,
583 InputEventType::ConsumerControls,
584 InputEventType::Mouse,
585 InputEventType::TouchScreen,
586 InputEventType::Touchpad,
587 #[cfg(test)]
588 InputEventType::Fake,
589 ];
590
591 for event_type in event_types {
592 let handlers_for_type: Vec<Rc<dyn input_handler::BatchInputHandler>> = handlers
593 .iter()
594 .filter(|h| h.interest().contains(&event_type))
595 .cloned()
596 .collect();
597 handlers_by_type.insert(event_type, handlers_for_type);
598 }
599
600 while let Some(event) = receiver.next().await {
601 let event_type = InputEventType::from(&event.device_event);
602
603 let handlers = handlers_by_type.get(&event_type).unwrap();
605
606 let mut events = vec![event];
607 for handler in handlers {
608 let handler_name = handler.get_name();
609 events = {
610 let _async_trace = fuchsia_trace::async_enter!(
611 fuchsia_trace::Id::random(),
612 "input",
613 "handle_input_events",
614 "name" => handler_name
615 );
616 handler.clone().handle_input_events(events).await
617 };
618 }
619
620 for event in events {
621 if event.handled == input_device::Handled::No {
622 log::warn!("unhandled input event: {:?}", &event);
623 }
624 }
625 }
626 for handler in &handlers {
627 handler.clone().set_handler_unhealthy("Pipeline loop terminated");
628 }
629 panic!("Runner task is not supposed to terminate.")
630 })
631 .detach();
632 }
633}
634
635async fn add_device_bindings(
655 device_types: &Vec<input_device::InputDeviceType>,
656 filename: &String,
657 device_proxy: fidl_fuchsia_input_report::InputDeviceProxy,
658 input_event_sender: &UnboundedSender<input_device::InputEvent>,
659 bindings: &InputDeviceBindingHashMap,
660 device_id: u32,
661 input_devices_node: &fuchsia_inspect::Node,
662 devices_connected: Option<&fuchsia_inspect::UintProperty>,
663 metrics_logger: metrics::MetricsLogger,
664) {
665 let mut matched_device_types = vec![];
666 if let Ok(descriptor) = device_proxy.get_descriptor().await {
667 for device_type in device_types {
668 if input_device::is_device_type(&descriptor, *device_type).await {
669 matched_device_types.push(device_type);
670 match devices_connected {
671 Some(dev_connected) => {
672 let _ = dev_connected.add(1);
673 }
674 None => (),
675 };
676 }
677 }
678 if matched_device_types.is_empty() {
679 log::info!(
680 "device {} did not match any supported device types: {:?}",
681 filename,
682 device_types
683 );
684 let device_node = input_devices_node.create_child(format!("{}_Unsupported", filename));
685 let mut health = fuchsia_inspect::health::Node::new(&device_node);
686 health.set_unhealthy("Unsupported device type.");
687 device_node.record(health);
688 input_devices_node.record(device_node);
689 return;
690 }
691 } else {
692 metrics_logger.clone().log_error(
693 InputPipelineErrorMetricDimensionEvent::InputPipelineNoDeviceDescriptor,
694 std::format!("cannot bind device {} without a device descriptor", filename),
695 );
696 return;
697 }
698
699 log::info!(
700 "binding {} to device types: {}",
701 filename,
702 matched_device_types
703 .iter()
704 .fold(String::new(), |device_types_string, device_type| device_types_string
705 + &format!("{:?}, ", device_type))
706 );
707
708 let mut new_bindings: Vec<BoxedInputDeviceBinding> = vec![];
709 for device_type in matched_device_types {
710 let proxy = device_proxy.clone();
731 let device_node = input_devices_node.create_child(format!("{}_{}", filename, device_type));
732 match input_device::get_device_binding(
733 *device_type,
734 proxy,
735 device_id,
736 input_event_sender.clone(),
737 device_node,
738 metrics_logger.clone(),
739 )
740 .await
741 {
742 Ok(binding) => new_bindings.push(binding),
743 Err(e) => {
744 metrics_logger.log_error(
745 InputPipelineErrorMetricDimensionEvent::InputPipelineFailedToBind,
746 std::format!("failed to bind {} as {:?}: {}", filename, device_type, e),
747 );
748 }
749 }
750 }
751
752 if !new_bindings.is_empty() {
753 let mut bindings = bindings.lock().await;
754 bindings.entry(device_id).or_insert(Vec::new()).extend(new_bindings);
755 }
756}
757
758#[cfg(test)]
759mod tests {
760 use super::*;
761 use crate::input_device::{InputDeviceBinding, InputEventType};
762 use crate::utils::Position;
763 use crate::{
764 fake_input_device_binding, mouse_binding, mouse_model_database,
765 observe_fake_events_input_handler,
766 };
767 use async_trait::async_trait;
768 use diagnostics_assertions::AnyProperty;
769 use fidl::endpoints::{create_proxy_and_stream, create_request_stream};
770 use fuchsia_async as fasync;
771 use futures::FutureExt;
772 use pretty_assertions::assert_eq;
773 use rand::Rng;
774 use std::collections::HashSet;
775 use vfs::{pseudo_directory, service as pseudo_fs_service};
776
777 const COUNTS_PER_MM: u32 = 12;
778
779 fn send_input_event(
784 sender: UnboundedSender<input_device::InputEvent>,
785 ) -> input_device::InputEvent {
786 let mut rng = rand::rng();
787 let offset =
788 Position { x: rng.random_range(0..10) as f32, y: rng.random_range(0..10) as f32 };
789 let input_event = input_device::InputEvent {
790 device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
791 mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
792 millimeters: Position {
793 x: offset.x / COUNTS_PER_MM as f32,
794 y: offset.y / COUNTS_PER_MM as f32,
795 },
796 }),
797 None, None, mouse_binding::MousePhase::Move,
800 HashSet::new(),
801 HashSet::new(),
802 None, None, )),
805 device_descriptor: input_device::InputDeviceDescriptor::Mouse(
806 mouse_binding::MouseDeviceDescriptor {
807 device_id: 1,
808 absolute_x_range: None,
809 absolute_y_range: None,
810 wheel_v_range: None,
811 wheel_h_range: None,
812 buttons: None,
813 counts_per_mm: COUNTS_PER_MM,
814 },
815 ),
816 event_time: zx::MonotonicInstant::get(),
817 handled: input_device::Handled::No,
818 trace_id: None,
819 };
820 match sender.unbounded_send(input_event.clone()) {
821 Err(_) => assert!(false),
822 _ => {}
823 }
824
825 input_event
826 }
827
828 fn handle_input_device_request(
833 input_device_request: fidl_fuchsia_input_report::InputDeviceRequest,
834 ) {
835 match input_device_request {
836 fidl_fuchsia_input_report::InputDeviceRequest::GetDescriptor { responder } => {
837 let _ = responder.send(&fidl_fuchsia_input_report::DeviceDescriptor {
838 device_information: None,
839 mouse: Some(fidl_fuchsia_input_report::MouseDescriptor {
840 input: Some(fidl_fuchsia_input_report::MouseInputDescriptor {
841 movement_x: None,
842 movement_y: None,
843 scroll_v: None,
844 scroll_h: None,
845 buttons: Some(vec![0]),
846 position_x: None,
847 position_y: None,
848 ..Default::default()
849 }),
850 ..Default::default()
851 }),
852 sensor: None,
853 touch: None,
854 keyboard: None,
855 consumer_control: None,
856 ..Default::default()
857 });
858 }
859 _ => {}
860 }
861 }
862
863 #[fasync::run_singlethreaded(test)]
865 async fn multiple_devices_single_handler() {
866 let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
868 let first_device_binding =
869 fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
870 let second_device_binding =
871 fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
872
873 let (handler_event_sender, mut handler_event_receiver) =
875 futures::channel::mpsc::channel(100);
876 let input_handler = observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
877 handler_event_sender,
878 );
879
880 let (sender, receiver, handlers, _, _, _) =
882 InputPipelineAssembly::new(metrics::MetricsLogger::default())
883 .add_handler(input_handler)
884 .into_components();
885 let inspector = fuchsia_inspect::Inspector::default();
886 let test_node = inspector.root().create_child("input_pipeline");
887 let input_pipeline = InputPipeline {
888 pipeline_sender: sender,
889 device_event_sender,
890 device_event_receiver,
891 input_device_types: vec![],
892 input_device_bindings: Arc::new(Mutex::new(HashMap::new())),
893 inspect_node: test_node,
894 metrics_logger: metrics::MetricsLogger::default(),
895 };
896 InputPipeline::run(receiver, handlers);
897
898 let first_device_event = send_input_event(first_device_binding.input_event_sender());
900 let second_device_event = send_input_event(second_device_binding.input_event_sender());
901
902 fasync::Task::local(async {
904 input_pipeline.handle_input_events().await;
905 })
906 .detach();
907
908 let first_handled_event = handler_event_receiver.next().await;
910 assert_eq!(first_handled_event, Some(first_device_event));
911
912 let second_handled_event = handler_event_receiver.next().await;
913 assert_eq!(second_handled_event, Some(second_device_event));
914 }
915
916 #[fasync::run_singlethreaded(test)]
918 async fn single_device_multiple_handlers() {
919 let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
921 let input_device_binding =
922 fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
923
924 let (first_handler_event_sender, mut first_handler_event_receiver) =
926 futures::channel::mpsc::channel(100);
927 let first_input_handler =
928 observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
929 first_handler_event_sender,
930 );
931 let (second_handler_event_sender, mut second_handler_event_receiver) =
932 futures::channel::mpsc::channel(100);
933 let second_input_handler =
934 observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
935 second_handler_event_sender,
936 );
937
938 let (sender, receiver, handlers, _, _, _) =
940 InputPipelineAssembly::new(metrics::MetricsLogger::default())
941 .add_handler(first_input_handler)
942 .add_handler(second_input_handler)
943 .into_components();
944 let inspector = fuchsia_inspect::Inspector::default();
945 let test_node = inspector.root().create_child("input_pipeline");
946 let input_pipeline = InputPipeline {
947 pipeline_sender: sender,
948 device_event_sender,
949 device_event_receiver,
950 input_device_types: vec![],
951 input_device_bindings: Arc::new(Mutex::new(HashMap::new())),
952 inspect_node: test_node,
953 metrics_logger: metrics::MetricsLogger::default(),
954 };
955 InputPipeline::run(receiver, handlers);
956
957 let input_event = send_input_event(input_device_binding.input_event_sender());
959
960 fasync::Task::local(async {
962 input_pipeline.handle_input_events().await;
963 })
964 .detach();
965
966 let first_handler_event = first_handler_event_receiver.next().await;
968 assert_eq!(first_handler_event, Some(input_event.clone()));
969 let second_handler_event = second_handler_event_receiver.next().await;
970 assert_eq!(second_handler_event, Some(input_event));
971 }
972
973 #[fasync::run_singlethreaded(test)]
976 async fn watch_devices_one_match_exists() {
977 let mut count: i8 = 0;
979 let dir = pseudo_directory! {
980 "file_name" => pseudo_fs_service::host(
981 move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
982 async move {
983 while count < 3 {
984 if let Some(input_device_request) =
985 request_stream.try_next().await.unwrap()
986 {
987 handle_input_device_request(input_device_request);
988 count += 1;
989 }
990 }
991
992 }.boxed()
993 },
994 )
995 };
996
997 let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
999 let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1000 let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1003
1004 let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1005 let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1006 let supported_device_types = vec![input_device::InputDeviceType::Mouse];
1007
1008 let inspector = fuchsia_inspect::Inspector::default();
1009 let test_node = inspector.root().create_child("input_pipeline");
1010 test_node.record_string(
1011 "supported_input_devices",
1012 supported_device_types.clone().iter().join(", "),
1013 );
1014 let input_devices = test_node.create_child("input_devices");
1015 diagnostics_assertions::assert_data_tree!(inspector, root: {
1017 input_pipeline: {
1018 supported_input_devices: "Mouse",
1019 input_devices: {}
1020 }
1021 });
1022
1023 let _ = InputPipeline::watch_for_devices(
1024 device_watcher,
1025 dir_proxy_for_pipeline,
1026 supported_device_types,
1027 input_event_sender,
1028 bindings.clone(),
1029 &input_devices,
1030 true, metrics::MetricsLogger::default(),
1032 )
1033 .await;
1034
1035 let bindings_hashmap = bindings.lock().await;
1037 assert_eq!(bindings_hashmap.len(), 1);
1038 let bindings_vector = bindings_hashmap.get(&10);
1039 assert!(bindings_vector.is_some());
1040 assert_eq!(bindings_vector.unwrap().len(), 1);
1041 let boxed_mouse_binding = bindings_vector.unwrap().get(0);
1042 assert!(boxed_mouse_binding.is_some());
1043 assert_eq!(
1044 boxed_mouse_binding.unwrap().get_device_descriptor(),
1045 input_device::InputDeviceDescriptor::Mouse(mouse_binding::MouseDeviceDescriptor {
1046 device_id: 10,
1047 absolute_x_range: None,
1048 absolute_y_range: None,
1049 wheel_v_range: None,
1050 wheel_h_range: None,
1051 buttons: Some(vec![0]),
1052 counts_per_mm: mouse_model_database::db::DEFAULT_COUNTS_PER_MM,
1053 })
1054 );
1055
1056 diagnostics_assertions::assert_data_tree!(inspector, root: {
1058 input_pipeline: {
1059 supported_input_devices: "Mouse",
1060 input_devices: {
1061 devices_discovered: 1u64,
1062 devices_connected: 1u64,
1063 "file_name_Mouse": contains {
1064 reports_received_count: 0u64,
1065 reports_filtered_count: 0u64,
1066 events_generated: 0u64,
1067 last_received_timestamp_ns: 0u64,
1068 last_generated_timestamp_ns: 0u64,
1069 "fuchsia.inspect.Health": {
1070 status: "OK",
1071 start_timestamp_nanos: AnyProperty
1074 },
1075 }
1076 }
1077 }
1078 });
1079 }
1080
1081 #[fasync::run_singlethreaded(test)]
1084 async fn watch_devices_no_matches_exist() {
1085 let mut count: i8 = 0;
1087 let dir = pseudo_directory! {
1088 "file_name" => pseudo_fs_service::host(
1089 move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1090 async move {
1091 while count < 1 {
1092 if let Some(input_device_request) =
1093 request_stream.try_next().await.unwrap()
1094 {
1095 handle_input_device_request(input_device_request);
1096 count += 1;
1097 }
1098 }
1099
1100 }.boxed()
1101 },
1102 )
1103 };
1104
1105 let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1107 let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1108 let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1111
1112 let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1113 let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1114 let supported_device_types = vec![input_device::InputDeviceType::Keyboard];
1115
1116 let inspector = fuchsia_inspect::Inspector::default();
1117 let test_node = inspector.root().create_child("input_pipeline");
1118 test_node.record_string(
1119 "supported_input_devices",
1120 supported_device_types.clone().iter().join(", "),
1121 );
1122 let input_devices = test_node.create_child("input_devices");
1123 diagnostics_assertions::assert_data_tree!(inspector, root: {
1125 input_pipeline: {
1126 supported_input_devices: "Keyboard",
1127 input_devices: {}
1128 }
1129 });
1130
1131 let _ = InputPipeline::watch_for_devices(
1132 device_watcher,
1133 dir_proxy_for_pipeline,
1134 supported_device_types,
1135 input_event_sender,
1136 bindings.clone(),
1137 &input_devices,
1138 true, metrics::MetricsLogger::default(),
1140 )
1141 .await;
1142
1143 let bindings = bindings.lock().await;
1145 assert_eq!(bindings.len(), 0);
1146
1147 diagnostics_assertions::assert_data_tree!(inspector, root: {
1149 input_pipeline: {
1150 supported_input_devices: "Keyboard",
1151 input_devices: {
1152 devices_discovered: 1u64,
1153 devices_connected: 0u64,
1154 "file_name_Unsupported": {
1155 "fuchsia.inspect.Health": {
1156 status: "UNHEALTHY",
1157 message: "Unsupported device type.",
1158 start_timestamp_nanos: AnyProperty
1161 },
1162 }
1163 }
1164 }
1165 });
1166 }
1167
1168 #[fasync::run_singlethreaded(test)]
1171 async fn handle_input_device_registry_request_stream() {
1172 let (input_device_registry_proxy, input_device_registry_request_stream) =
1173 create_proxy_and_stream::<fidl_fuchsia_input_injection::InputDeviceRegistryMarker>();
1174 let (input_device_client_end, mut input_device_request_stream) =
1175 create_request_stream::<fidl_fuchsia_input_report::InputDeviceMarker>();
1176
1177 let device_types = vec![input_device::InputDeviceType::Mouse];
1178 let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1179 let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1180
1181 let mut count: i8 = 0;
1183 fasync::Task::local(async move {
1184 let _ = input_device_registry_proxy.register(input_device_client_end);
1186
1187 while count < 3 {
1188 if let Some(input_device_request) =
1189 input_device_request_stream.try_next().await.unwrap()
1190 {
1191 handle_input_device_request(input_device_request);
1192 count += 1;
1193 }
1194 }
1195
1196 input_device_registry_proxy.take_event_stream();
1198 })
1199 .detach();
1200
1201 let inspector = fuchsia_inspect::Inspector::default();
1202 let test_node = inspector.root().create_child("input_pipeline");
1203
1204 let bindings_clone = bindings.clone();
1206 let _ = InputPipeline::handle_input_device_registry_request_stream(
1207 input_device_registry_request_stream,
1208 &device_types,
1209 &input_event_sender,
1210 &bindings_clone,
1211 &test_node,
1212 metrics::MetricsLogger::default(),
1213 )
1214 .await;
1215
1216 let bindings = bindings.lock().await;
1218 assert_eq!(bindings.len(), 1);
1219 }
1220
1221 #[fasync::run_singlethreaded(test)]
1223 async fn check_inspect_node_has_correct_properties() {
1224 let device_types = vec![
1225 input_device::InputDeviceType::Touch,
1226 input_device::InputDeviceType::ConsumerControls,
1227 ];
1228 let inspector = fuchsia_inspect::Inspector::default();
1229 let test_node = inspector.root().create_child("input_pipeline");
1230 let (fake_handler_event_sender, _fake_handler_event_receiver) =
1232 futures::channel::mpsc::channel(100);
1233 let fake_input_handler =
1234 observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
1235 fake_handler_event_sender,
1236 );
1237 let assembly = InputPipelineAssembly::new(metrics::MetricsLogger::default())
1238 .add_handler(fake_input_handler);
1239 let _test_input_pipeline = InputPipeline::new(
1240 device_types,
1241 assembly,
1242 test_node,
1243 metrics::MetricsLogger::default(),
1244 );
1245 diagnostics_assertions::assert_data_tree!(inspector, root: {
1246 input_pipeline: {
1247 supported_input_devices: "Touch, ConsumerControls",
1248 handlers_registered: 1u64,
1249 handlers_healthy: 1u64,
1250 input_devices: {}
1251 }
1252 });
1253 }
1254
1255 struct SpecificInterestFakeHandler {
1256 interest_types: Vec<input_device::InputEventType>,
1257 event_sender: std::cell::RefCell<futures::channel::mpsc::Sender<input_device::InputEvent>>,
1258 }
1259
1260 impl SpecificInterestFakeHandler {
1261 pub fn new(
1262 interest_types: Vec<input_device::InputEventType>,
1263 event_sender: futures::channel::mpsc::Sender<input_device::InputEvent>,
1264 ) -> Rc<Self> {
1265 Rc::new(SpecificInterestFakeHandler {
1266 interest_types,
1267 event_sender: std::cell::RefCell::new(event_sender),
1268 })
1269 }
1270 }
1271
1272 impl Handler for SpecificInterestFakeHandler {
1273 fn set_handler_healthy(self: std::rc::Rc<Self>) {}
1274 fn set_handler_unhealthy(self: std::rc::Rc<Self>, _msg: &str) {}
1275 fn get_name(&self) -> &'static str {
1276 "SpecificInterestFakeHandler"
1277 }
1278
1279 fn interest(&self) -> Vec<input_device::InputEventType> {
1280 self.interest_types.clone()
1281 }
1282 }
1283
1284 #[async_trait(?Send)]
1285 impl input_handler::InputHandler for SpecificInterestFakeHandler {
1286 async fn handle_input_event(
1287 self: Rc<Self>,
1288 input_event: input_device::InputEvent,
1289 ) -> Vec<input_device::InputEvent> {
1290 match self.event_sender.borrow_mut().try_send(input_event.clone()) {
1291 Err(e) => panic!("SpecificInterestFakeHandler failed to send event: {:?}", e),
1292 Ok(_) => {}
1293 }
1294 vec![input_event]
1295 }
1296 }
1297
1298 #[fasync::run_singlethreaded(test)]
1299 async fn run_only_sends_events_to_interested_handlers() {
1300 let (mouse_sender, mut mouse_receiver) = futures::channel::mpsc::channel(1);
1302 let mouse_handler =
1303 SpecificInterestFakeHandler::new(vec![InputEventType::Mouse], mouse_sender);
1304
1305 let (fake_sender, mut fake_receiver) = futures::channel::mpsc::channel(1);
1307 let fake_handler =
1308 SpecificInterestFakeHandler::new(vec![InputEventType::Fake], fake_sender);
1309
1310 let (pipeline_sender, pipeline_receiver, handlers, _, _, _) =
1311 InputPipelineAssembly::new(metrics::MetricsLogger::default())
1312 .add_handler(mouse_handler)
1313 .add_handler(fake_handler)
1314 .into_components();
1315
1316 InputPipeline::run(pipeline_receiver, handlers);
1318
1319 let fake_event = input_device::InputEvent {
1321 device_event: input_device::InputDeviceEvent::Fake,
1322 device_descriptor: input_device::InputDeviceDescriptor::Fake,
1323 event_time: zx::MonotonicInstant::get(),
1324 handled: input_device::Handled::No,
1325 trace_id: None,
1326 };
1327
1328 pipeline_sender.unbounded_send(fake_event.clone()).expect("failed to send event");
1330
1331 let received_by_fake = fake_receiver.next().await;
1333 assert_eq!(received_by_fake, Some(fake_event));
1334
1335 assert!(mouse_receiver.try_next().is_err());
1337 }
1338}