1use crate::display_ownership::DisplayOwnership;
6use crate::focus_listener::FocusListener;
7use crate::input_device::InputPipelineFeatureFlags;
8use crate::input_handler::Handler;
9use crate::{input_device, input_handler, metrics};
10use anyhow::{Context, Error, format_err};
11use focus_chain_provider::FocusChainProviderPublisher;
12use fuchsia_fs::directory::{WatchEvent, Watcher};
13use fuchsia_inspect::NumericProperty;
14use fuchsia_inspect::health::Reporter;
15use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
16use futures::future::LocalBoxFuture;
17use futures::lock::Mutex;
18use futures::{StreamExt, TryStreamExt};
19use itertools::Itertools;
20use metrics_registry::*;
21use std::collections::HashMap;
22use std::path::PathBuf;
23use std::rc::Rc;
24use std::sync::atomic::{AtomicU32, Ordering};
25use std::sync::{Arc, LazyLock};
26use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
27
28static NEXT_DEVICE_ID: LazyLock<AtomicU32> = LazyLock::new(|| AtomicU32::new(10));
35
36fn get_next_device_id() -> u32 {
40 NEXT_DEVICE_ID.fetch_add(1, Ordering::SeqCst)
41}
42
43type BoxedInputDeviceBinding = Box<dyn input_device::InputDeviceBinding>;
44
45pub type InputDeviceBindingHashMap = Arc<Mutex<HashMap<u32, Vec<BoxedInputDeviceBinding>>>>;
48
49pub struct InputPipelineAssembly {
63 sender: UnboundedSender<Vec<input_device::InputEvent>>,
66 receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
69
70 handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>,
72
73 display_ownership_fut: Option<LocalBoxFuture<'static, ()>>,
75
76 focus_listener_fut: Option<LocalBoxFuture<'static, ()>>,
78
79 metrics_logger: metrics::MetricsLogger,
81}
82
83impl InputPipelineAssembly {
84 pub fn new(metrics_logger: metrics::MetricsLogger) -> Self {
87 let (sender, receiver) = mpsc::unbounded();
88 InputPipelineAssembly {
89 sender,
90 receiver,
91 handlers: vec![],
92 metrics_logger,
93 display_ownership_fut: None,
94 focus_listener_fut: None,
95 }
96 }
97
98 pub fn add_handler(mut self, handler: Rc<dyn input_handler::BatchInputHandler>) -> Self {
101 self.handlers.push(handler);
102 self
103 }
104
105 pub fn add_all_handlers(self, handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>) -> Self {
107 handlers.into_iter().fold(self, |assembly, handler| assembly.add_handler(handler))
108 }
109
110 pub fn add_display_ownership(
111 mut self,
112 display_ownership_event: zx::Event,
113 input_handlers_node: &fuchsia_inspect::Node,
114 ) -> InputPipelineAssembly {
115 let h = DisplayOwnership::new(
116 display_ownership_event,
117 input_handlers_node,
118 self.metrics_logger.clone(),
119 );
120 let metrics_logger_clone = self.metrics_logger.clone();
121 let h_clone = h.clone();
122 let sender_clone = self.sender.clone();
123 let display_ownership_fut = Box::pin(async move {
124 h_clone.clone().set_handler_healthy();
125 h_clone.clone()
126 .handle_ownership_change(sender_clone)
127 .await
128 .map_err(|e| {
129 metrics_logger_clone.log_error(
130 InputPipelineErrorMetricDimensionEvent::InputPipelineDisplayOwnershipIsNotSupposedToTerminate,
131 std::format!(
132 "display ownership is not supposed to terminate - this is likely a problem: {:?}", e));
133 })
134 .unwrap();
135 h_clone.set_handler_unhealthy("Receive loop terminated for handler: DisplayOwnership");
136 });
137 self.display_ownership_fut = Some(display_ownership_fut);
138 self.add_handler(h)
139 }
140
141 fn into_components(
147 self,
148 ) -> (
149 UnboundedSender<Vec<input_device::InputEvent>>,
150 UnboundedReceiver<Vec<input_device::InputEvent>>,
151 Vec<Rc<dyn input_handler::BatchInputHandler>>,
152 metrics::MetricsLogger,
153 Option<LocalBoxFuture<'static, ()>>,
154 Option<LocalBoxFuture<'static, ()>>,
155 ) {
156 (
157 self.sender,
158 self.receiver,
159 self.handlers,
160 self.metrics_logger,
161 self.display_ownership_fut,
162 self.focus_listener_fut,
163 )
164 }
165
166 pub fn add_focus_listener(
167 mut self,
168 focus_chain_publisher: FocusChainProviderPublisher,
169 ) -> Self {
170 let metrics_logger_clone = self.metrics_logger.clone();
171 let focus_listener_fut = Box::pin(async move {
172 if let Ok(mut focus_listener) =
173 FocusListener::new(focus_chain_publisher, metrics_logger_clone).map_err(|e| {
174 log::warn!(
175 "could not create focus listener, focus will not be dispatched: {:?}",
176 e
177 )
178 })
179 {
180 let _result = focus_listener
183 .dispatch_focus_changes()
184 .await
185 .map(|_| {
186 log::warn!("dispatch focus loop ended, focus will no longer be dispatched")
187 })
188 .map_err(|e| {
189 panic!("could not dispatch focus changes, this is a fatal error: {:?}", e)
190 });
191 }
192 });
193 self.focus_listener_fut = Some(focus_listener_fut);
194 self
195 }
196}
197
198pub struct InputPipeline {
226 pipeline_sender: UnboundedSender<Vec<input_device::InputEvent>>,
230
231 device_event_sender: UnboundedSender<Vec<input_device::InputEvent>>,
234
235 device_event_receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
237
238 input_device_types: Vec<input_device::InputDeviceType>,
240
241 input_device_bindings: InputDeviceBindingHashMap,
243
244 inspect_node: fuchsia_inspect::Node,
247
248 metrics_logger: metrics::MetricsLogger,
250
251 pub feature_flags: input_device::InputPipelineFeatureFlags,
253}
254
255impl InputPipeline {
256 fn new_common(
257 input_device_types: Vec<input_device::InputDeviceType>,
258 assembly: InputPipelineAssembly,
259 inspect_node: fuchsia_inspect::Node,
260 feature_flags: input_device::InputPipelineFeatureFlags,
261 ) -> Self {
262 let (
263 pipeline_sender,
264 receiver,
265 handlers,
266 metrics_logger,
267 display_ownership_fut,
268 focus_listener_fut,
269 ) = assembly.into_components();
270
271 let mut handlers_count = handlers.len();
272 if let Some(fut) = display_ownership_fut {
274 fasync::Task::local(fut).detach();
275 handlers_count += 1;
276 }
277
278 if let Some(fut) = focus_listener_fut {
280 fasync::Task::local(fut).detach();
281 handlers_count += 1;
282 }
283
284 inspect_node.record_string("supported_input_devices", input_device_types.iter().join(", "));
286 inspect_node.record_uint("handlers_registered", handlers_count as u64);
287 inspect_node.record_uint("handlers_healthy", handlers_count as u64);
288
289 InputPipeline::run(receiver, handlers, metrics_logger.clone());
291
292 let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
293 let input_device_bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
294 InputPipeline {
295 pipeline_sender,
296 device_event_sender,
297 device_event_receiver,
298 input_device_types,
299 input_device_bindings,
300 inspect_node,
301 metrics_logger,
302 feature_flags,
303 }
304 }
305
306 pub fn new_for_test(
314 input_device_types: Vec<input_device::InputDeviceType>,
315 assembly: InputPipelineAssembly,
316 ) -> Self {
317 let inspector = fuchsia_inspect::Inspector::default();
318 let root = inspector.root();
319 let test_node = root.create_child("input_pipeline");
320 Self::new_common(
321 input_device_types,
322 assembly,
323 test_node,
324 input_device::InputPipelineFeatureFlags { enable_merge_touch_events: false },
325 )
326 }
327
328 pub fn new(
335 input_device_types: Vec<input_device::InputDeviceType>,
336 assembly: InputPipelineAssembly,
337 inspect_node: fuchsia_inspect::Node,
338 feature_flags: input_device::InputPipelineFeatureFlags,
339 metrics_logger: metrics::MetricsLogger,
340 ) -> Result<Self, Error> {
341 let input_pipeline =
342 Self::new_common(input_device_types, assembly, inspect_node, feature_flags);
343 let input_device_types = input_pipeline.input_device_types.clone();
344 let input_event_sender = input_pipeline.device_event_sender.clone();
345 let input_device_bindings = input_pipeline.input_device_bindings.clone();
346 let devices_node = input_pipeline.inspect_node.create_child("input_devices");
347 let feature_flags = input_pipeline.feature_flags.clone();
348 fasync::Task::local(async move {
349 match async {
352 let dir_proxy = fuchsia_fs::directory::open_in_namespace(
353 input_device::INPUT_REPORT_PATH,
354 fuchsia_fs::PERM_READABLE,
355 )
356 .with_context(|| format!("failed to open {}", input_device::INPUT_REPORT_PATH))?;
357 let device_watcher =
358 Watcher::new(&dir_proxy).await.context("failed to create watcher")?;
359 Self::watch_for_devices(
360 device_watcher,
361 dir_proxy,
362 input_device_types,
363 input_event_sender,
364 input_device_bindings,
365 &devices_node,
366 false, feature_flags,
368 metrics_logger.clone(),
369 )
370 .await
371 .context("failed to watch for devices")
372 }
373 .await
374 {
375 Ok(()) => {}
376 Err(err) => {
377 metrics_logger.log_warn(
381 InputPipelineErrorMetricDimensionEvent::InputPipelineUnableToWatchForNewInputDevices,
382 std::format!(
383 "Input pipeline is unable to watch for new input devices: {:?}",
384 err
385 ));
386 }
387 }
388 })
389 .detach();
390
391 Ok(input_pipeline)
392 }
393
394 pub fn input_device_bindings(&self) -> &InputDeviceBindingHashMap {
396 &self.input_device_bindings
397 }
398
399 pub fn input_event_sender(&self) -> &UnboundedSender<Vec<input_device::InputEvent>> {
402 &self.device_event_sender
403 }
404
405 pub fn input_device_types(&self) -> &Vec<input_device::InputDeviceType> {
407 &self.input_device_types
408 }
409
410 pub async fn handle_input_events(mut self) {
412 let metrics_logger_clone = self.metrics_logger.clone();
413 while let Some(input_event) = self.device_event_receiver.next().await {
414 if let Err(e) = self.pipeline_sender.unbounded_send(input_event) {
415 metrics_logger_clone.log_error(
416 InputPipelineErrorMetricDimensionEvent::InputPipelineCouldNotForwardEventFromDriver,
417 std::format!("could not forward event from driver: {:?}", &e));
418 }
419 }
420
421 metrics_logger_clone.log_error(
422 InputPipelineErrorMetricDimensionEvent::InputPipelineStopHandlingEvents,
423 "Input pipeline stopped handling input events.".to_string(),
424 );
425 }
426
427 async fn watch_for_devices(
443 mut device_watcher: Watcher,
444 dir_proxy: fio::DirectoryProxy,
445 device_types: Vec<input_device::InputDeviceType>,
446 input_event_sender: UnboundedSender<Vec<input_device::InputEvent>>,
447 bindings: InputDeviceBindingHashMap,
448 input_devices_node: &fuchsia_inspect::Node,
449 break_on_idle: bool,
450 feature_flags: input_device::InputPipelineFeatureFlags,
451 metrics_logger: metrics::MetricsLogger,
452 ) -> Result<(), Error> {
453 let devices_discovered = input_devices_node.create_uint("devices_discovered", 0);
455 let devices_connected = input_devices_node.create_uint("devices_connected", 0);
456 while let Some(msg) = device_watcher.try_next().await? {
457 if let Ok(filename) = msg.filename.into_os_string().into_string() {
458 if filename == "." {
459 continue;
460 }
461
462 let pathbuf = PathBuf::from(filename.clone());
463 match msg.event {
464 WatchEvent::EXISTING | WatchEvent::ADD_FILE => {
465 log::info!("found input device {}", filename);
466 devices_discovered.add(1);
467 let device_proxy =
468 input_device::get_device_from_dir_entry_path(&dir_proxy, &pathbuf)?;
469 add_device_bindings(
470 &device_types,
471 &filename,
472 device_proxy,
473 &input_event_sender,
474 &bindings,
475 get_next_device_id(),
476 input_devices_node,
477 Some(&devices_connected),
478 feature_flags.clone(),
479 metrics_logger.clone(),
480 )
481 .await;
482 }
483 WatchEvent::IDLE => {
484 if break_on_idle {
485 break;
486 }
487 }
488 _ => (),
489 }
490 }
491 }
492 input_devices_node.record(devices_discovered);
494 input_devices_node.record(devices_connected);
495 Err(format_err!("Input pipeline stopped watching for new input devices."))
496 }
497
498 pub async fn handle_input_device_registry_request_stream(
513 mut stream: fidl_fuchsia_input_injection::InputDeviceRegistryRequestStream,
514 device_types: &Vec<input_device::InputDeviceType>,
515 input_event_sender: &UnboundedSender<Vec<input_device::InputEvent>>,
516 bindings: &InputDeviceBindingHashMap,
517 input_devices_node: &fuchsia_inspect::Node,
518 feature_flags: input_device::InputPipelineFeatureFlags,
519 metrics_logger: metrics::MetricsLogger,
520 ) -> Result<(), Error> {
521 while let Some(request) = stream
522 .try_next()
523 .await
524 .context("Error handling input device registry request stream")?
525 {
526 match request {
527 fidl_fuchsia_input_injection::InputDeviceRegistryRequest::Register {
528 device,
529 ..
530 } => {
531 let device_proxy = device.into_proxy();
533
534 let device_id = get_next_device_id();
535
536 add_device_bindings(
537 device_types,
538 &format!("input-device-registry-{}", device_id),
539 device_proxy,
540 input_event_sender,
541 bindings,
542 device_id,
543 input_devices_node,
544 None,
545 feature_flags.clone(),
546 metrics_logger.clone(),
547 )
548 .await;
549 }
550 fidl_fuchsia_input_injection::InputDeviceRegistryRequest::RegisterAndGetDeviceInfo {
551 device,
552 responder,
553 .. } => {
554 let device_proxy = device.into_proxy();
556
557 let device_id = get_next_device_id();
558
559 add_device_bindings(
560 device_types,
561 &format!("input-device-registry-{}", device_id),
562 device_proxy,
563 input_event_sender,
564 bindings,
565 device_id,
566 input_devices_node,
567 None,
568 feature_flags.clone(),
569 metrics_logger.clone(),
570 )
571 .await;
572
573 responder.send(fidl_fuchsia_input_injection::InputDeviceRegistryRegisterAndGetDeviceInfoResponse{
574 device_id: Some(device_id),
575 ..Default::default()
576 }).expect("Failed to respond to RegisterAndGetDeviceInfo request");
577 }
578 }
579 }
580
581 Ok(())
582 }
583
584 fn run(
586 mut receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
587 handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>,
588 metrics_logger: metrics::MetricsLogger,
589 ) {
590 fasync::Task::local(async move {
591 for handler in &handlers {
592 handler.clone().set_handler_healthy();
593 }
594
595 use input_device::InputEventType;
596 use std::collections::HashMap;
597
598 let mut handlers_by_type: HashMap<
600 InputEventType,
601 Vec<Rc<dyn input_handler::BatchInputHandler>>,
602 > = HashMap::new();
603
604 let event_types = vec![
606 InputEventType::Keyboard,
607 InputEventType::LightSensor,
608 InputEventType::ConsumerControls,
609 InputEventType::Mouse,
610 InputEventType::TouchScreen,
611 InputEventType::Touchpad,
612 #[cfg(test)]
613 InputEventType::Fake,
614 ];
615
616 for event_type in event_types {
617 let handlers_for_type: Vec<Rc<dyn input_handler::BatchInputHandler>> = handlers
618 .iter()
619 .filter(|h| h.interest().contains(&event_type))
620 .cloned()
621 .collect();
622 handlers_by_type.insert(event_type, handlers_for_type);
623 }
624
625 while let Some(events) = receiver.next().await {
626 if events.is_empty() {
627 continue;
628 }
629
630 let mut groups_seen = 0;
631 for (event_type, event_group) in events
632 .into_iter()
633 .chunk_by(|e| InputEventType::from(&e.device_event))
634 .into_iter()
635 {
636 groups_seen += 1;
637 if groups_seen == 2 {
638 metrics_logger.log_error(
639 InputPipelineErrorMetricDimensionEvent::InputFrameContainsMultipleTypesOfEvents,
640 "it is not recommended to contain multiple types of events in 1 send".to_string(),
641 );
642 }
643 let mut events_in_group: Vec<_> = event_group.collect();
644
645 let handlers = handlers_by_type.get(&event_type).unwrap();
647
648 for handler in handlers {
649 events_in_group =
650 handler.clone().handle_input_events(events_in_group).await;
651 }
652
653 for event in events_in_group {
654 if event.handled == input_device::Handled::No {
655 log::warn!("unhandled input event: {:?}", &event);
656 }
657 if let Some(trace_id) = event.trace_id {
658 fuchsia_trace::flow_end!(
659 "input",
660 "event_in_input_pipeline",
661 trace_id.into()
662 );
663 }
664 }
665 }
666 }
667 for handler in &handlers {
668 handler.clone().set_handler_unhealthy("Pipeline loop terminated");
669 }
670 panic!("Runner task is not supposed to terminate.")
671 })
672 .detach();
673 }
674}
675
676async fn add_device_bindings(
696 device_types: &Vec<input_device::InputDeviceType>,
697 filename: &String,
698 device_proxy: fidl_fuchsia_input_report::InputDeviceProxy,
699 input_event_sender: &UnboundedSender<Vec<input_device::InputEvent>>,
700 bindings: &InputDeviceBindingHashMap,
701 device_id: u32,
702 input_devices_node: &fuchsia_inspect::Node,
703 devices_connected: Option<&fuchsia_inspect::UintProperty>,
704 feature_flags: InputPipelineFeatureFlags,
705 metrics_logger: metrics::MetricsLogger,
706) {
707 let mut matched_device_types = vec![];
708 if let Ok(descriptor) = device_proxy.get_descriptor().await {
709 for device_type in device_types {
710 if input_device::is_device_type(&descriptor, *device_type).await {
711 matched_device_types.push(device_type);
712 match devices_connected {
713 Some(dev_connected) => {
714 let _ = dev_connected.add(1);
715 }
716 None => (),
717 };
718 }
719 }
720 if matched_device_types.is_empty() {
721 log::info!(
722 "device {} did not match any supported device types: {:?}",
723 filename,
724 device_types
725 );
726 let device_node = input_devices_node.create_child(format!("{}_Unsupported", filename));
727 let mut health = fuchsia_inspect::health::Node::new(&device_node);
728 health.set_unhealthy("Unsupported device type.");
729 device_node.record(health);
730 input_devices_node.record(device_node);
731 return;
732 }
733 } else {
734 metrics_logger.clone().log_error(
735 InputPipelineErrorMetricDimensionEvent::InputPipelineNoDeviceDescriptor,
736 std::format!("cannot bind device {} without a device descriptor", filename),
737 );
738 return;
739 }
740
741 log::info!(
742 "binding {} to device types: {}",
743 filename,
744 matched_device_types
745 .iter()
746 .fold(String::new(), |device_types_string, device_type| device_types_string
747 + &format!("{:?}, ", device_type))
748 );
749
750 let mut new_bindings: Vec<BoxedInputDeviceBinding> = vec![];
751 for device_type in matched_device_types {
752 let proxy = device_proxy.clone();
773 let device_node = input_devices_node.create_child(format!("{}_{}", filename, device_type));
774 match input_device::get_device_binding(
775 *device_type,
776 proxy,
777 device_id,
778 input_event_sender.clone(),
779 device_node,
780 feature_flags.clone(),
781 metrics_logger.clone(),
782 )
783 .await
784 {
785 Ok(binding) => new_bindings.push(binding),
786 Err(e) => {
787 metrics_logger.log_error(
788 InputPipelineErrorMetricDimensionEvent::InputPipelineFailedToBind,
789 std::format!("failed to bind {} as {:?}: {}", filename, device_type, e),
790 );
791 }
792 }
793 }
794
795 if !new_bindings.is_empty() {
796 let mut bindings = bindings.lock().await;
797 bindings.entry(device_id).or_insert(Vec::new()).extend(new_bindings);
798 }
799}
800
801#[cfg(test)]
802mod tests {
803 use super::*;
804 use crate::input_device::{InputDeviceBinding, InputEventType};
805 use crate::utils::Position;
806 use crate::{
807 fake_input_device_binding, mouse_binding, mouse_model_database,
808 observe_fake_events_input_handler,
809 };
810 use async_trait::async_trait;
811 use diagnostics_assertions::AnyProperty;
812 use fidl::endpoints::{create_proxy_and_stream, create_request_stream};
813 use fuchsia_async as fasync;
814 use futures::FutureExt;
815 use pretty_assertions::assert_eq;
816 use rand::Rng;
817 use std::collections::HashSet;
818 use vfs::{pseudo_directory, service as pseudo_fs_service};
819
820 const COUNTS_PER_MM: u32 = 12;
821
822 fn send_input_event(
827 sender: UnboundedSender<Vec<input_device::InputEvent>>,
828 ) -> Vec<input_device::InputEvent> {
829 let mut rng = rand::rng();
830 let offset =
831 Position { x: rng.random_range(0..10) as f32, y: rng.random_range(0..10) as f32 };
832 let input_event = input_device::InputEvent {
833 device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
834 mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
835 millimeters: Position {
836 x: offset.x / COUNTS_PER_MM as f32,
837 y: offset.y / COUNTS_PER_MM as f32,
838 },
839 }),
840 None, None, mouse_binding::MousePhase::Move,
843 HashSet::new(),
844 HashSet::new(),
845 None, None, )),
848 device_descriptor: input_device::InputDeviceDescriptor::Mouse(
849 mouse_binding::MouseDeviceDescriptor {
850 device_id: 1,
851 absolute_x_range: None,
852 absolute_y_range: None,
853 wheel_v_range: None,
854 wheel_h_range: None,
855 buttons: None,
856 counts_per_mm: COUNTS_PER_MM,
857 },
858 ),
859 event_time: zx::MonotonicInstant::get(),
860 handled: input_device::Handled::No,
861 trace_id: None,
862 };
863 match sender.unbounded_send(vec![input_event.clone()]) {
864 Err(_) => assert!(false),
865 _ => {}
866 }
867
868 vec![input_event]
869 }
870
871 fn handle_input_device_request(
876 input_device_request: fidl_fuchsia_input_report::InputDeviceRequest,
877 ) {
878 match input_device_request {
879 fidl_fuchsia_input_report::InputDeviceRequest::GetDescriptor { responder } => {
880 let _ = responder.send(&fidl_fuchsia_input_report::DeviceDescriptor {
881 device_information: None,
882 mouse: Some(fidl_fuchsia_input_report::MouseDescriptor {
883 input: Some(fidl_fuchsia_input_report::MouseInputDescriptor {
884 movement_x: None,
885 movement_y: None,
886 scroll_v: None,
887 scroll_h: None,
888 buttons: Some(vec![0]),
889 position_x: None,
890 position_y: None,
891 ..Default::default()
892 }),
893 ..Default::default()
894 }),
895 sensor: None,
896 touch: None,
897 keyboard: None,
898 consumer_control: None,
899 ..Default::default()
900 });
901 }
902 _ => {}
903 }
904 }
905
906 #[fasync::run_singlethreaded(test)]
908 async fn multiple_devices_single_handler() {
909 let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
911 let first_device_binding =
912 fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
913 let second_device_binding =
914 fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
915
916 let (handler_event_sender, mut handler_event_receiver) =
918 futures::channel::mpsc::channel(100);
919 let input_handler = observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
920 handler_event_sender,
921 );
922
923 let (sender, receiver, handlers, _, _, _) =
925 InputPipelineAssembly::new(metrics::MetricsLogger::default())
926 .add_handler(input_handler)
927 .into_components();
928 let inspector = fuchsia_inspect::Inspector::default();
929 let test_node = inspector.root().create_child("input_pipeline");
930 let input_pipeline = InputPipeline {
931 pipeline_sender: sender,
932 device_event_sender,
933 device_event_receiver,
934 input_device_types: vec![],
935 input_device_bindings: Arc::new(Mutex::new(HashMap::new())),
936 inspect_node: test_node,
937 metrics_logger: metrics::MetricsLogger::default(),
938 feature_flags: input_device::InputPipelineFeatureFlags::default(),
939 };
940 InputPipeline::run(receiver, handlers, metrics::MetricsLogger::default());
941
942 let first_device_events = send_input_event(first_device_binding.input_event_sender());
944 let second_device_events = send_input_event(second_device_binding.input_event_sender());
945
946 fasync::Task::local(async {
948 input_pipeline.handle_input_events().await;
949 })
950 .detach();
951
952 let first_handled_event = handler_event_receiver.next().await;
954 assert_eq!(first_handled_event, first_device_events.into_iter().next());
955
956 let second_handled_event = handler_event_receiver.next().await;
957 assert_eq!(second_handled_event, second_device_events.into_iter().next());
958 }
959
960 #[fasync::run_singlethreaded(test)]
962 async fn single_device_multiple_handlers() {
963 let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
965 let input_device_binding =
966 fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
967
968 let (first_handler_event_sender, mut first_handler_event_receiver) =
970 futures::channel::mpsc::channel(100);
971 let first_input_handler =
972 observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
973 first_handler_event_sender,
974 );
975 let (second_handler_event_sender, mut second_handler_event_receiver) =
976 futures::channel::mpsc::channel(100);
977 let second_input_handler =
978 observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
979 second_handler_event_sender,
980 );
981
982 let (sender, receiver, handlers, _, _, _) =
984 InputPipelineAssembly::new(metrics::MetricsLogger::default())
985 .add_handler(first_input_handler)
986 .add_handler(second_input_handler)
987 .into_components();
988 let inspector = fuchsia_inspect::Inspector::default();
989 let test_node = inspector.root().create_child("input_pipeline");
990 let input_pipeline = InputPipeline {
991 pipeline_sender: sender,
992 device_event_sender,
993 device_event_receiver,
994 input_device_types: vec![],
995 input_device_bindings: Arc::new(Mutex::new(HashMap::new())),
996 inspect_node: test_node,
997 metrics_logger: metrics::MetricsLogger::default(),
998 feature_flags: input_device::InputPipelineFeatureFlags::default(),
999 };
1000 InputPipeline::run(receiver, handlers, metrics::MetricsLogger::default());
1001
1002 let input_events = send_input_event(input_device_binding.input_event_sender());
1004
1005 fasync::Task::local(async {
1007 input_pipeline.handle_input_events().await;
1008 })
1009 .detach();
1010
1011 let expected_event = input_events.into_iter().next();
1013 let first_handler_event = first_handler_event_receiver.next().await;
1014 assert_eq!(first_handler_event, expected_event);
1015 let second_handler_event = second_handler_event_receiver.next().await;
1016 assert_eq!(second_handler_event, expected_event);
1017 }
1018
1019 #[fasync::run_singlethreaded(test)]
1022 async fn watch_devices_one_match_exists() {
1023 let mut count: i8 = 0;
1025 let dir = pseudo_directory! {
1026 "file_name" => pseudo_fs_service::host(
1027 move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1028 async move {
1029 while count < 3 {
1030 if let Some(input_device_request) =
1031 request_stream.try_next().await.unwrap()
1032 {
1033 handle_input_device_request(input_device_request);
1034 count += 1;
1035 }
1036 }
1037
1038 }.boxed()
1039 },
1040 )
1041 };
1042
1043 let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1045 let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1046 let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1049
1050 let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1051 let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1052 let supported_device_types = vec![input_device::InputDeviceType::Mouse];
1053
1054 let inspector = fuchsia_inspect::Inspector::default();
1055 let test_node = inspector.root().create_child("input_pipeline");
1056 test_node.record_string(
1057 "supported_input_devices",
1058 supported_device_types.clone().iter().join(", "),
1059 );
1060 let input_devices = test_node.create_child("input_devices");
1061 diagnostics_assertions::assert_data_tree!(inspector, root: {
1063 input_pipeline: {
1064 supported_input_devices: "Mouse",
1065 input_devices: {}
1066 }
1067 });
1068
1069 let _ = InputPipeline::watch_for_devices(
1070 device_watcher,
1071 dir_proxy_for_pipeline,
1072 supported_device_types,
1073 input_event_sender,
1074 bindings.clone(),
1075 &input_devices,
1076 true, InputPipelineFeatureFlags { enable_merge_touch_events: false },
1078 metrics::MetricsLogger::default(),
1079 )
1080 .await;
1081
1082 let bindings_hashmap = bindings.lock().await;
1084 assert_eq!(bindings_hashmap.len(), 1);
1085 let bindings_vector = bindings_hashmap.get(&10);
1086 assert!(bindings_vector.is_some());
1087 assert_eq!(bindings_vector.unwrap().len(), 1);
1088 let boxed_mouse_binding = bindings_vector.unwrap().get(0);
1089 assert!(boxed_mouse_binding.is_some());
1090 assert_eq!(
1091 boxed_mouse_binding.unwrap().get_device_descriptor(),
1092 input_device::InputDeviceDescriptor::Mouse(mouse_binding::MouseDeviceDescriptor {
1093 device_id: 10,
1094 absolute_x_range: None,
1095 absolute_y_range: None,
1096 wheel_v_range: None,
1097 wheel_h_range: None,
1098 buttons: Some(vec![0]),
1099 counts_per_mm: mouse_model_database::db::DEFAULT_COUNTS_PER_MM,
1100 })
1101 );
1102
1103 diagnostics_assertions::assert_data_tree!(inspector, root: {
1105 input_pipeline: {
1106 supported_input_devices: "Mouse",
1107 input_devices: {
1108 devices_discovered: 1u64,
1109 devices_connected: 1u64,
1110 "file_name_Mouse": contains {
1111 reports_received_count: 0u64,
1112 reports_filtered_count: 0u64,
1113 events_generated: 0u64,
1114 last_received_timestamp_ns: 0u64,
1115 last_generated_timestamp_ns: 0u64,
1116 "fuchsia.inspect.Health": {
1117 status: "OK",
1118 start_timestamp_nanos: AnyProperty
1121 },
1122 }
1123 }
1124 }
1125 });
1126 }
1127
1128 #[fasync::run_singlethreaded(test)]
1131 async fn watch_devices_no_matches_exist() {
1132 let mut count: i8 = 0;
1134 let dir = pseudo_directory! {
1135 "file_name" => pseudo_fs_service::host(
1136 move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1137 async move {
1138 while count < 1 {
1139 if let Some(input_device_request) =
1140 request_stream.try_next().await.unwrap()
1141 {
1142 handle_input_device_request(input_device_request);
1143 count += 1;
1144 }
1145 }
1146
1147 }.boxed()
1148 },
1149 )
1150 };
1151
1152 let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1154 let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1155 let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1158
1159 let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1160 let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1161 let supported_device_types = vec![input_device::InputDeviceType::Keyboard];
1162
1163 let inspector = fuchsia_inspect::Inspector::default();
1164 let test_node = inspector.root().create_child("input_pipeline");
1165 test_node.record_string(
1166 "supported_input_devices",
1167 supported_device_types.clone().iter().join(", "),
1168 );
1169 let input_devices = test_node.create_child("input_devices");
1170 diagnostics_assertions::assert_data_tree!(inspector, root: {
1172 input_pipeline: {
1173 supported_input_devices: "Keyboard",
1174 input_devices: {}
1175 }
1176 });
1177
1178 let _ = InputPipeline::watch_for_devices(
1179 device_watcher,
1180 dir_proxy_for_pipeline,
1181 supported_device_types,
1182 input_event_sender,
1183 bindings.clone(),
1184 &input_devices,
1185 true, InputPipelineFeatureFlags { enable_merge_touch_events: false },
1187 metrics::MetricsLogger::default(),
1188 )
1189 .await;
1190
1191 let bindings = bindings.lock().await;
1193 assert_eq!(bindings.len(), 0);
1194
1195 diagnostics_assertions::assert_data_tree!(inspector, root: {
1197 input_pipeline: {
1198 supported_input_devices: "Keyboard",
1199 input_devices: {
1200 devices_discovered: 1u64,
1201 devices_connected: 0u64,
1202 "file_name_Unsupported": {
1203 "fuchsia.inspect.Health": {
1204 status: "UNHEALTHY",
1205 message: "Unsupported device type.",
1206 start_timestamp_nanos: AnyProperty
1209 },
1210 }
1211 }
1212 }
1213 });
1214 }
1215
1216 #[fasync::run_singlethreaded(test)]
1219 async fn handle_input_device_registry_request_stream() {
1220 let (input_device_registry_proxy, input_device_registry_request_stream) =
1221 create_proxy_and_stream::<fidl_fuchsia_input_injection::InputDeviceRegistryMarker>();
1222 let (input_device_client_end, mut input_device_request_stream) =
1223 create_request_stream::<fidl_fuchsia_input_report::InputDeviceMarker>();
1224
1225 let device_types = vec![input_device::InputDeviceType::Mouse];
1226 let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1227 let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1228
1229 let mut count: i8 = 0;
1231 fasync::Task::local(async move {
1232 let _ = input_device_registry_proxy.register(input_device_client_end);
1234
1235 while count < 3 {
1236 if let Some(input_device_request) =
1237 input_device_request_stream.try_next().await.unwrap()
1238 {
1239 handle_input_device_request(input_device_request);
1240 count += 1;
1241 }
1242 }
1243
1244 input_device_registry_proxy.take_event_stream();
1246 })
1247 .detach();
1248
1249 let inspector = fuchsia_inspect::Inspector::default();
1250 let test_node = inspector.root().create_child("input_pipeline");
1251
1252 let bindings_clone = bindings.clone();
1254 let _ = InputPipeline::handle_input_device_registry_request_stream(
1255 input_device_registry_request_stream,
1256 &device_types,
1257 &input_event_sender,
1258 &bindings_clone,
1259 &test_node,
1260 InputPipelineFeatureFlags { enable_merge_touch_events: false },
1261 metrics::MetricsLogger::default(),
1262 )
1263 .await;
1264
1265 let bindings = bindings.lock().await;
1267 assert_eq!(bindings.len(), 1);
1268 }
1269
1270 #[fasync::run_singlethreaded(test)]
1272 async fn check_inspect_node_has_correct_properties() {
1273 let device_types = vec![
1274 input_device::InputDeviceType::Touch,
1275 input_device::InputDeviceType::ConsumerControls,
1276 ];
1277 let inspector = fuchsia_inspect::Inspector::default();
1278 let test_node = inspector.root().create_child("input_pipeline");
1279 let (fake_handler_event_sender, _fake_handler_event_receiver) =
1281 futures::channel::mpsc::channel(100);
1282 let fake_input_handler =
1283 observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
1284 fake_handler_event_sender,
1285 );
1286 let assembly = InputPipelineAssembly::new(metrics::MetricsLogger::default())
1287 .add_handler(fake_input_handler);
1288 let _test_input_pipeline = InputPipeline::new(
1289 device_types,
1290 assembly,
1291 test_node,
1292 InputPipelineFeatureFlags { enable_merge_touch_events: false },
1293 metrics::MetricsLogger::default(),
1294 );
1295 diagnostics_assertions::assert_data_tree!(inspector, root: {
1296 input_pipeline: {
1297 supported_input_devices: "Touch, ConsumerControls",
1298 handlers_registered: 1u64,
1299 handlers_healthy: 1u64,
1300 input_devices: {}
1301 }
1302 });
1303 }
1304
1305 struct SpecificInterestFakeHandler {
1306 interest_types: Vec<input_device::InputEventType>,
1307 event_sender: std::cell::RefCell<futures::channel::mpsc::Sender<input_device::InputEvent>>,
1308 }
1309
1310 impl SpecificInterestFakeHandler {
1311 pub fn new(
1312 interest_types: Vec<input_device::InputEventType>,
1313 event_sender: futures::channel::mpsc::Sender<input_device::InputEvent>,
1314 ) -> Rc<Self> {
1315 Rc::new(SpecificInterestFakeHandler {
1316 interest_types,
1317 event_sender: std::cell::RefCell::new(event_sender),
1318 })
1319 }
1320 }
1321
1322 impl Handler for SpecificInterestFakeHandler {
1323 fn set_handler_healthy(self: std::rc::Rc<Self>) {}
1324 fn set_handler_unhealthy(self: std::rc::Rc<Self>, _msg: &str) {}
1325 fn get_name(&self) -> &'static str {
1326 "SpecificInterestFakeHandler"
1327 }
1328
1329 fn interest(&self) -> Vec<input_device::InputEventType> {
1330 self.interest_types.clone()
1331 }
1332 }
1333
1334 #[async_trait(?Send)]
1335 impl input_handler::InputHandler for SpecificInterestFakeHandler {
1336 async fn handle_input_event(
1337 self: Rc<Self>,
1338 input_event: input_device::InputEvent,
1339 ) -> Vec<input_device::InputEvent> {
1340 match self.event_sender.borrow_mut().try_send(input_event.clone()) {
1341 Err(e) => panic!("SpecificInterestFakeHandler failed to send event: {:?}", e),
1342 Ok(_) => {}
1343 }
1344 vec![input_event]
1345 }
1346 }
1347
1348 #[fasync::run_singlethreaded(test)]
1349 async fn run_only_sends_events_to_interested_handlers() {
1350 let (mouse_sender, mut mouse_receiver) = futures::channel::mpsc::channel(1);
1352 let mouse_handler =
1353 SpecificInterestFakeHandler::new(vec![InputEventType::Mouse], mouse_sender);
1354
1355 let (fake_sender, mut fake_receiver) = futures::channel::mpsc::channel(1);
1357 let fake_handler =
1358 SpecificInterestFakeHandler::new(vec![InputEventType::Fake], fake_sender);
1359
1360 let (pipeline_sender, pipeline_receiver, handlers, _, _, _) =
1361 InputPipelineAssembly::new(metrics::MetricsLogger::default())
1362 .add_handler(mouse_handler)
1363 .add_handler(fake_handler)
1364 .into_components();
1365
1366 InputPipeline::run(pipeline_receiver, handlers, metrics::MetricsLogger::default());
1368
1369 let fake_event = input_device::InputEvent {
1371 device_event: input_device::InputDeviceEvent::Fake,
1372 device_descriptor: input_device::InputDeviceDescriptor::Fake,
1373 event_time: zx::MonotonicInstant::get(),
1374 handled: input_device::Handled::No,
1375 trace_id: None,
1376 };
1377
1378 pipeline_sender.unbounded_send(vec![fake_event.clone()]).expect("failed to send event");
1380
1381 let received_by_fake = fake_receiver.next().await;
1383 assert_eq!(received_by_fake, Some(fake_event));
1384
1385 assert!(mouse_receiver.try_next().is_err());
1387 }
1388
1389 fn create_mouse_event(x: f32, y: f32) -> input_device::InputEvent {
1390 input_device::InputEvent {
1391 device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
1392 mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
1393 millimeters: Position { x, y },
1394 }),
1395 None,
1396 None,
1397 mouse_binding::MousePhase::Move,
1398 HashSet::new(),
1399 HashSet::new(),
1400 None,
1401 None,
1402 )),
1403 device_descriptor: input_device::InputDeviceDescriptor::Mouse(
1404 mouse_binding::MouseDeviceDescriptor {
1405 device_id: 1,
1406 absolute_x_range: None,
1407 absolute_y_range: None,
1408 wheel_v_range: None,
1409 wheel_h_range: None,
1410 buttons: None,
1411 counts_per_mm: 1,
1412 },
1413 ),
1414 event_time: zx::MonotonicInstant::get(),
1415 handled: input_device::Handled::No,
1416 trace_id: None,
1417 }
1418 }
1419
1420 #[fasync::run_singlethreaded(test)]
1421 async fn run_mixed_event_types_dispatched_correctly() {
1422 let (mouse_sender, mut mouse_receiver) = futures::channel::mpsc::channel(10);
1424 let mouse_handler =
1425 SpecificInterestFakeHandler::new(vec![InputEventType::Mouse], mouse_sender);
1426
1427 let (fake_sender, mut fake_receiver) = futures::channel::mpsc::channel(10);
1429 let fake_handler =
1430 SpecificInterestFakeHandler::new(vec![InputEventType::Fake], fake_sender);
1431
1432 let (pipeline_sender, pipeline_receiver, handlers, _, _, _) =
1433 InputPipelineAssembly::new(metrics::MetricsLogger::default())
1434 .add_handler(mouse_handler)
1435 .add_handler(fake_handler)
1436 .into_components();
1437
1438 InputPipeline::run(pipeline_receiver, handlers, metrics::MetricsLogger::default());
1440
1441 let mouse_event_1 = create_mouse_event(1.0, 1.0);
1443 let mouse_event_2 = create_mouse_event(2.0, 2.0);
1444 let mouse_event_3 = create_mouse_event(3.0, 3.0);
1445
1446 let fake_event_1 = input_device::InputEvent {
1447 device_event: input_device::InputDeviceEvent::Fake,
1448 device_descriptor: input_device::InputDeviceDescriptor::Fake,
1449 event_time: zx::MonotonicInstant::get(),
1450 handled: input_device::Handled::No,
1451 trace_id: None,
1452 };
1453
1454 let mixed_batch = vec![
1457 mouse_event_1.clone(),
1458 mouse_event_2.clone(),
1459 fake_event_1.clone(),
1460 mouse_event_3.clone(),
1461 ];
1462 pipeline_sender.unbounded_send(mixed_batch).expect("failed to send events");
1463
1464 assert_eq!(mouse_receiver.next().await, Some(mouse_event_1));
1466 assert_eq!(mouse_receiver.next().await, Some(mouse_event_2));
1467 assert_eq!(mouse_receiver.next().await, Some(mouse_event_3));
1468
1469 assert_eq!(fake_receiver.next().await, Some(fake_event_1));
1471 }
1472}