1use crate::display_ownership::DisplayOwnership;
6use crate::focus_listener::FocusListener;
7use crate::input_device::InputPipelineFeatureFlags;
8use crate::input_handler::Handler;
9use crate::{Dispatcher, Incoming, Transport, input_device, input_handler, metrics};
10use anyhow::{Context, Error, format_err};
11use fidl::endpoints;
12use fidl_fuchsia_io as fio;
13use focus_chain_provider::FocusChainProviderPublisher;
14use fuchsia_async as fasync;
15use fuchsia_component::directory::AsRefDirectory;
16use fuchsia_fs::directory::{WatchEvent, Watcher};
17use fuchsia_inspect::NumericProperty;
18use fuchsia_inspect::health::Reporter;
19use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
20use futures::future::LocalBoxFuture;
21use futures::lock::Mutex;
22use futures::{StreamExt, TryStreamExt};
23use itertools::Itertools;
24use metrics_registry::*;
25use sorted_vec_map::SortedVecMap;
26use std::path::PathBuf;
27use std::rc::Rc;
28use std::sync::atomic::{AtomicU32, Ordering};
29use std::sync::{Arc, LazyLock};
30use strum::EnumCount;
31
32static NEXT_DEVICE_ID: LazyLock<AtomicU32> = LazyLock::new(|| AtomicU32::new(10));
39
40fn get_next_device_id() -> u32 {
44 NEXT_DEVICE_ID.fetch_add(1, Ordering::SeqCst)
45}
46
47type BoxedInputDeviceBinding = Box<dyn input_device::InputDeviceBinding>;
48
49pub type InputDeviceBindingMap = Arc<Mutex<SortedVecMap<u32, Vec<BoxedInputDeviceBinding>>>>;
52
53pub struct InputPipelineAssembly {
67 sender: UnboundedSender<Vec<input_device::InputEvent>>,
70 receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
73
74 handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>,
76
77 display_ownership_fut: Option<LocalBoxFuture<'static, ()>>,
79
80 focus_listener_fut: Option<LocalBoxFuture<'static, ()>>,
82
83 metrics_logger: metrics::MetricsLogger,
85}
86
87impl InputPipelineAssembly {
88 pub fn new(metrics_logger: metrics::MetricsLogger) -> Self {
91 let (sender, receiver) = mpsc::unbounded();
92 InputPipelineAssembly {
93 sender,
94 receiver,
95 handlers: vec![],
96 metrics_logger,
97 display_ownership_fut: None,
98 focus_listener_fut: None,
99 }
100 }
101
102 pub fn add_handler(mut self, handler: Rc<dyn input_handler::BatchInputHandler>) -> Self {
105 self.handlers.push(handler);
106 self
107 }
108
109 pub fn add_all_handlers(self, handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>) -> Self {
111 handlers.into_iter().fold(self, |assembly, handler| assembly.add_handler(handler))
112 }
113
114 pub fn add_display_ownership(
115 mut self,
116 display_ownership_event: zx::Event,
117 input_handlers_node: &fuchsia_inspect::Node,
118 ) -> InputPipelineAssembly {
119 let h = DisplayOwnership::new(
120 display_ownership_event,
121 input_handlers_node,
122 self.metrics_logger.clone(),
123 );
124 let metrics_logger_clone = self.metrics_logger.clone();
125 let h_clone = h.clone();
126 let sender_clone = self.sender.clone();
127 let display_ownership_fut = Box::pin(async move {
128 h_clone.clone().set_handler_healthy();
129 h_clone.clone()
130 .handle_ownership_change(sender_clone)
131 .await
132 .map_err(|e| {
133 metrics_logger_clone.log_error(
134 InputPipelineErrorMetricDimensionEvent::InputPipelineDisplayOwnershipIsNotSupposedToTerminate,
135 std::format!(
136 "display ownership is not supposed to terminate - this is likely a problem: {:?}", e));
137 })
138 .unwrap();
139 h_clone.set_handler_unhealthy("Receive loop terminated for handler: DisplayOwnership");
140 });
141 self.display_ownership_fut = Some(display_ownership_fut);
142 self.add_handler(h)
143 }
144
145 fn into_components(
151 self,
152 ) -> (
153 UnboundedSender<Vec<input_device::InputEvent>>,
154 UnboundedReceiver<Vec<input_device::InputEvent>>,
155 Vec<Rc<dyn input_handler::BatchInputHandler>>,
156 metrics::MetricsLogger,
157 Option<LocalBoxFuture<'static, ()>>,
158 Option<LocalBoxFuture<'static, ()>>,
159 ) {
160 (
161 self.sender,
162 self.receiver,
163 self.handlers,
164 self.metrics_logger,
165 self.display_ownership_fut,
166 self.focus_listener_fut,
167 )
168 }
169
170 pub fn add_focus_listener(
171 mut self,
172 incoming: &Incoming,
173 focus_chain_publisher: FocusChainProviderPublisher,
174 ) -> Self {
175 let metrics_logger_clone = self.metrics_logger.clone();
176 let incoming2 = incoming.clone();
177 let focus_listener_fut = Box::pin(async move {
178 if let Ok(mut focus_listener) = FocusListener::new(
179 &incoming2,
180 focus_chain_publisher,
181 metrics_logger_clone,
182 )
183 .map_err(|e| {
184 log::warn!("could not create focus listener, focus will not be dispatched: {:?}", e)
185 }) {
186 let _result = focus_listener
189 .dispatch_focus_changes()
190 .await
191 .map(|_| {
192 log::warn!("dispatch focus loop ended, focus will no longer be dispatched")
193 })
194 .map_err(|e| {
195 panic!("could not dispatch focus changes, this is a fatal error: {:?}", e)
196 });
197 }
198 });
199 self.focus_listener_fut = Some(focus_listener_fut);
200 self
201 }
202}
203
204pub struct InputPipeline {
232 pipeline_sender: UnboundedSender<Vec<input_device::InputEvent>>,
236
237 device_event_sender: UnboundedSender<Vec<input_device::InputEvent>>,
240
241 device_event_receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
243
244 input_device_types: Vec<input_device::InputDeviceType>,
246
247 input_device_bindings: InputDeviceBindingMap,
249
250 inspect_node: fuchsia_inspect::Node,
253
254 metrics_logger: metrics::MetricsLogger,
256
257 pub feature_flags: input_device::InputPipelineFeatureFlags,
259}
260
261impl InputPipeline {
262 fn new_common(
263 input_device_types: Vec<input_device::InputDeviceType>,
264 assembly: InputPipelineAssembly,
265 inspect_node: fuchsia_inspect::Node,
266 feature_flags: input_device::InputPipelineFeatureFlags,
267 ) -> Self {
268 let (
269 pipeline_sender,
270 receiver,
271 handlers,
272 metrics_logger,
273 display_ownership_fut,
274 focus_listener_fut,
275 ) = assembly.into_components();
276
277 let mut handlers_count = handlers.len();
278 if let Some(fut) = display_ownership_fut {
280 fasync::Task::local(fut).detach();
281 handlers_count += 1;
282 }
283
284 if let Some(fut) = focus_listener_fut {
286 fasync::Task::local(fut).detach();
287 handlers_count += 1;
288 }
289
290 inspect_node.record_string("supported_input_devices", input_device_types.iter().join(", "));
292 inspect_node.record_uint("handlers_registered", handlers_count as u64);
293 inspect_node.record_uint("handlers_healthy", handlers_count as u64);
294
295 InputPipeline::run(receiver, handlers, metrics_logger.clone());
297
298 let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
299 let input_device_bindings: InputDeviceBindingMap =
300 Arc::new(Mutex::new(SortedVecMap::new()));
301 InputPipeline {
302 pipeline_sender,
303 device_event_sender,
304 device_event_receiver,
305 input_device_types,
306 input_device_bindings,
307 inspect_node,
308 metrics_logger,
309 feature_flags,
310 }
311 }
312
313 pub fn new_for_test(
321 input_device_types: Vec<input_device::InputDeviceType>,
322 assembly: InputPipelineAssembly,
323 ) -> Self {
324 let inspector = fuchsia_inspect::Inspector::default();
325 let root = inspector.root();
326 let test_node = root.create_child("input_pipeline");
327 Self::new_common(
328 input_device_types,
329 assembly,
330 test_node,
331 input_device::InputPipelineFeatureFlags { enable_merge_touch_events: false },
332 )
333 }
334
335 pub fn new(
342 incoming: &Incoming,
343 input_device_types: Vec<input_device::InputDeviceType>,
344 assembly: InputPipelineAssembly,
345 inspect_node: fuchsia_inspect::Node,
346 feature_flags: input_device::InputPipelineFeatureFlags,
347 metrics_logger: metrics::MetricsLogger,
348 ) -> Result<Self, Error> {
349 let input_pipeline =
350 Self::new_common(input_device_types, assembly, inspect_node, feature_flags);
351 let input_device_types = input_pipeline.input_device_types.clone();
352 let input_event_sender = input_pipeline.device_event_sender.clone();
353 let input_device_bindings = input_pipeline.input_device_bindings.clone();
354 let devices_node = input_pipeline.inspect_node.create_child("input_devices");
355 let feature_flags = input_pipeline.feature_flags.clone();
356 let incoming = incoming.clone();
357 fasync::Task::local(async move {
362 match async {
365 let (dir_proxy, server) = endpoints::create_proxy::<fio::DirectoryMarker>();
366 incoming.as_ref_directory().open(
367 input_device::INPUT_REPORT_PATH,
368 fio::PERM_READABLE,
369 server.into()
370 )
371 .with_context(|| format!("failed to open {}", input_device::INPUT_REPORT_PATH))?;
372 let device_watcher =
373 Watcher::new(&dir_proxy).await.context("failed to create watcher")?;
374 Self::watch_for_devices(
375 device_watcher,
376 dir_proxy,
377 input_device_types,
378 input_event_sender,
379 input_device_bindings,
380 &devices_node,
381 false, feature_flags,
383 metrics_logger.clone(),
384 )
385 .await
386 .context("failed to watch for devices")
387 }
388 .await
389 {
390 Ok(()) => {}
391 Err(err) => {
392 metrics_logger.log_warn(
396 InputPipelineErrorMetricDimensionEvent::InputPipelineUnableToWatchForNewInputDevices,
397 std::format!(
398 "Input pipeline is unable to watch for new input devices: {:?}",
399 err
400 ));
401 }
402 }
403 }).detach();
404
405 Ok(input_pipeline)
406 }
407
408 pub fn input_device_bindings(&self) -> &InputDeviceBindingMap {
410 &self.input_device_bindings
411 }
412
413 pub fn input_event_sender(&self) -> &UnboundedSender<Vec<input_device::InputEvent>> {
416 &self.device_event_sender
417 }
418
419 pub fn input_device_types(&self) -> &Vec<input_device::InputDeviceType> {
421 &self.input_device_types
422 }
423
424 pub async fn handle_input_events(mut self) {
426 let metrics_logger_clone = self.metrics_logger.clone();
427 while let Some(input_event) = self.device_event_receiver.next().await {
428 if let Err(e) = self.pipeline_sender.unbounded_send(input_event) {
429 metrics_logger_clone.log_error(
430 InputPipelineErrorMetricDimensionEvent::InputPipelineCouldNotForwardEventFromDriver,
431 std::format!("could not forward event from driver: {:?}", &e));
432 }
433 }
434
435 metrics_logger_clone.log_error(
436 InputPipelineErrorMetricDimensionEvent::InputPipelineStopHandlingEvents,
437 "Input pipeline stopped handling input events.".to_string(),
438 );
439 }
440
441 async fn watch_for_devices(
457 mut device_watcher: Watcher,
458 dir_proxy: fio::DirectoryProxy,
459 device_types: Vec<input_device::InputDeviceType>,
460 input_event_sender: UnboundedSender<Vec<input_device::InputEvent>>,
461 bindings: InputDeviceBindingMap,
462 input_devices_node: &fuchsia_inspect::Node,
463 break_on_idle: bool,
464 feature_flags: input_device::InputPipelineFeatureFlags,
465 metrics_logger: metrics::MetricsLogger,
466 ) -> Result<(), Error> {
467 let devices_discovered = input_devices_node.create_uint("devices_discovered", 0);
469 let devices_connected = input_devices_node.create_uint("devices_connected", 0);
470 while let Some(msg) = device_watcher.try_next().await? {
471 if let Ok(filename) = msg.filename.into_os_string().into_string() {
472 if filename == "." {
473 continue;
474 }
475
476 let pathbuf = PathBuf::from(filename.clone());
477 match msg.event {
478 WatchEvent::EXISTING | WatchEvent::ADD_FILE => {
479 log::info!("found input device {}", filename);
480 devices_discovered.add(1);
481 let device_proxy =
482 input_device::get_device_from_dir_entry_path(&dir_proxy, &pathbuf)?;
483 add_device_bindings(
484 &device_types,
485 &filename,
486 device_proxy,
487 &input_event_sender,
488 &bindings,
489 get_next_device_id(),
490 input_devices_node,
491 Some(&devices_connected),
492 feature_flags.clone(),
493 metrics_logger.clone(),
494 )
495 .await;
496 }
497 WatchEvent::IDLE => {
498 if break_on_idle {
499 break;
500 }
501 }
502 _ => (),
503 }
504 }
505 }
506 input_devices_node.record(devices_discovered);
508 input_devices_node.record(devices_connected);
509 Err(format_err!("Input pipeline stopped watching for new input devices."))
510 }
511
512 pub async fn handle_input_device_registry_request_stream(
527 mut stream: fidl_fuchsia_input_injection::InputDeviceRegistryRequestStream,
528 device_types: &Vec<input_device::InputDeviceType>,
529 input_event_sender: &UnboundedSender<Vec<input_device::InputEvent>>,
530 bindings: &InputDeviceBindingMap,
531 input_devices_node: &fuchsia_inspect::Node,
532 feature_flags: input_device::InputPipelineFeatureFlags,
533 metrics_logger: metrics::MetricsLogger,
534 ) -> Result<(), Error> {
535 while let Some(request) = stream
536 .try_next()
537 .await
538 .context("Error handling input device registry request stream")?
539 {
540 match request {
541 fidl_fuchsia_input_injection::InputDeviceRegistryRequest::Register {
542 device,
543 ..
544 } => {
545 let device = fidl_next::ClientEnd::<
547 fidl_next_fuchsia_input_report::InputDevice,
548 zx::Channel,
549 >::from_untyped(device.into_channel());
550 let device = Dispatcher::client_from_zx_channel(device);
551 let device = device.spawn();
552 let device_id = get_next_device_id();
553
554 add_device_bindings(
555 device_types,
556 &format!("input-device-registry-{}", device_id),
557 device,
558 input_event_sender,
559 bindings,
560 device_id,
561 input_devices_node,
562 None,
563 feature_flags.clone(),
564 metrics_logger.clone(),
565 )
566 .await;
567 }
568 fidl_fuchsia_input_injection::InputDeviceRegistryRequest::RegisterAndGetDeviceInfo {
569 device,
570 responder,
571 .. } => {
572 let device = fidl_next::ClientEnd::<
574 fidl_next_fuchsia_input_report::InputDevice,
575 zx::Channel,
576 >::from_untyped(device.into_channel());
577 let device = Dispatcher::client_from_zx_channel(device);
578 let device = device.spawn();
579 let device_id = get_next_device_id();
580
581 add_device_bindings(
582 device_types,
583 &format!("input-device-registry-{}", device_id),
584 device,
585 input_event_sender,
586 bindings,
587 device_id,
588 input_devices_node,
589 None,
590 feature_flags.clone(),
591 metrics_logger.clone(),
592 )
593 .await;
594
595 responder.send(fidl_fuchsia_input_injection::InputDeviceRegistryRegisterAndGetDeviceInfoResponse{
596 device_id: Some(device_id),
597 ..Default::default()
598 }).expect("Failed to respond to RegisterAndGetDeviceInfo request");
599 }
600 }
601 }
602
603 Ok(())
604 }
605
606 fn run(
608 mut receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
609 handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>,
610 metrics_logger: metrics::MetricsLogger,
611 ) {
612 Dispatcher::spawn_local(async move {
613 for handler in &handlers {
614 handler.clone().set_handler_healthy();
615 }
616
617 use input_device::InputEventType;
618
619 let mut handlers_by_type: [Vec<Rc<dyn input_handler::BatchInputHandler>>; InputEventType::COUNT] = Default::default();
620
621 let event_types = vec![
623 InputEventType::Keyboard,
624 InputEventType::LightSensor,
625 InputEventType::ConsumerControls,
626 InputEventType::Mouse,
627 InputEventType::TouchScreen,
628 InputEventType::Touchpad,
629 #[cfg(test)]
630 InputEventType::Fake,
631 ];
632
633 for event_type in event_types {
634 let handlers_for_type: Vec<Rc<dyn input_handler::BatchInputHandler>> = handlers
635 .iter()
636 .filter(|h| h.interest().contains(&event_type))
637 .cloned()
638 .collect();
639 handlers_by_type[event_type as usize] = handlers_for_type;
640 }
641
642 while let Some(events) = receiver.next().await {
643 if events.is_empty() {
644 continue;
645 }
646
647 let mut groups_seen = 0;
648 let events = events
649 .into_iter()
650 .chunk_by(|e| InputEventType::from(&e.device_event));
651 let events = events.into_iter().map(|(k, v)| (k, v.collect::<Vec<_>>()));
652 for (event_type, event_group) in events {
653 groups_seen += 1;
654 if groups_seen == 2 {
655 metrics_logger.log_error(
656 InputPipelineErrorMetricDimensionEvent::InputFrameContainsMultipleTypesOfEvents,
657 "it is not recommended to contain multiple types of events in 1 send".to_string(),
658 );
659 }
660 let mut events_in_group = event_group;
661
662 let handlers = &handlers_by_type[event_type as usize];
664
665 for handler in handlers {
666 events_in_group =
667 handler.clone().handle_input_events(events_in_group).await;
668 }
669
670 for event in events_in_group {
671 if event.handled == input_device::Handled::No {
672 log::warn!("unhandled input event: {:?}", &event);
673 }
674 if let Some(trace_id) = event.trace_id {
675 fuchsia_trace::flow_end!(
676 "input",
677 "event_in_input_pipeline",
678 trace_id.into()
679 );
680 }
681 }
682 }
683 }
684 for handler in &handlers {
685 handler.clone().set_handler_unhealthy("Pipeline loop terminated");
686 }
687 panic!("Runner task is not supposed to terminate.")
688 }).detach();
689 }
690}
691
692async fn add_device_bindings(
712 device_types: &Vec<input_device::InputDeviceType>,
713 filename: &String,
714 device_proxy: fidl_next::Client<fidl_next_fuchsia_input_report::InputDevice, Transport>,
715 input_event_sender: &UnboundedSender<Vec<input_device::InputEvent>>,
716 bindings: &InputDeviceBindingMap,
717 device_id: u32,
718 input_devices_node: &fuchsia_inspect::Node,
719 devices_connected: Option<&fuchsia_inspect::UintProperty>,
720 feature_flags: InputPipelineFeatureFlags,
721 metrics_logger: metrics::MetricsLogger,
722) {
723 let mut matched_device_types = vec![];
724 if let Ok(res) = device_proxy.get_descriptor().await {
725 for device_type in device_types {
726 if input_device::is_device_type(&res.descriptor, *device_type).await {
727 matched_device_types.push(device_type);
728 match devices_connected {
729 Some(dev_connected) => {
730 let _ = dev_connected.add(1);
731 }
732 None => (),
733 };
734 }
735 }
736 if matched_device_types.is_empty() {
737 log::info!(
738 "device {} did not match any supported device types: {:?}",
739 filename,
740 device_types
741 );
742 let device_node = input_devices_node.create_child(format!("{}_Unsupported", filename));
743 let mut health = fuchsia_inspect::health::Node::new(&device_node);
744 health.set_unhealthy("Unsupported device type.");
745 device_node.record(health);
746 input_devices_node.record(device_node);
747 return;
748 }
749 } else {
750 metrics_logger.clone().log_error(
751 InputPipelineErrorMetricDimensionEvent::InputPipelineNoDeviceDescriptor,
752 std::format!("cannot bind device {} without a device descriptor", filename),
753 );
754 return;
755 }
756
757 log::info!(
758 "binding {} to device types: {}",
759 filename,
760 matched_device_types
761 .iter()
762 .fold(String::new(), |device_types_string, device_type| device_types_string
763 + &format!("{:?}, ", device_type))
764 );
765
766 let mut new_bindings: Vec<BoxedInputDeviceBinding> = vec![];
767 for device_type in matched_device_types {
768 let proxy = device_proxy.clone();
789 let device_node = input_devices_node.create_child(format!("{}_{}", filename, device_type));
790 match input_device::get_device_binding(
791 *device_type,
792 proxy,
793 device_id,
794 input_event_sender.clone(),
795 device_node,
796 feature_flags.clone(),
797 metrics_logger.clone(),
798 )
799 .await
800 {
801 Ok(binding) => new_bindings.push(binding),
802 Err(e) => {
803 metrics_logger.log_error(
804 InputPipelineErrorMetricDimensionEvent::InputPipelineFailedToBind,
805 std::format!("failed to bind {} as {:?}: {}", filename, device_type, e),
806 );
807 }
808 }
809 }
810
811 if !new_bindings.is_empty() {
812 let mut bindings = bindings.lock().await;
813 if let Some(v) = bindings.get_mut(&device_id) {
814 v.extend(new_bindings);
815 } else {
816 bindings.insert(device_id, new_bindings);
817 }
818 }
819}
820
821#[cfg(test)]
822mod tests {
823 use super::*;
824 use crate::input_device::{InputDeviceBinding, InputEventType};
825 use crate::utils::Position;
826 use crate::{
827 fake_input_device_binding, mouse_binding, mouse_model_database,
828 observe_fake_events_input_handler,
829 };
830 use async_trait::async_trait;
831 use diagnostics_assertions::AnyProperty;
832 use fidl::endpoints::{create_proxy_and_stream, create_request_stream};
833 use fuchsia_async as fasync;
834 use futures::FutureExt;
835 use pretty_assertions::assert_eq;
836 use rand::Rng;
837 use sorted_vec_map::SortedVecSet;
838 use vfs::{pseudo_directory, service as pseudo_fs_service};
839
840 const COUNTS_PER_MM: u32 = 12;
841
842 fn send_input_event(
847 sender: UnboundedSender<Vec<input_device::InputEvent>>,
848 ) -> Vec<input_device::InputEvent> {
849 let mut rng = rand::rng();
850 let offset =
851 Position { x: rng.random_range(0..10) as f32, y: rng.random_range(0..10) as f32 };
852 let input_event = input_device::InputEvent {
853 device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
854 mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
855 millimeters: Position {
856 x: offset.x / COUNTS_PER_MM as f32,
857 y: offset.y / COUNTS_PER_MM as f32,
858 },
859 }),
860 None, None, mouse_binding::MousePhase::Move,
863 SortedVecSet::new(),
864 SortedVecSet::new(),
865 None, None, )),
868 device_descriptor: input_device::InputDeviceDescriptor::Mouse(
869 mouse_binding::MouseDeviceDescriptor {
870 device_id: 1,
871 absolute_x_range: None,
872 absolute_y_range: None,
873 wheel_v_range: None,
874 wheel_h_range: None,
875 buttons: None,
876 counts_per_mm: COUNTS_PER_MM,
877 },
878 ),
879 event_time: zx::MonotonicInstant::get(),
880 handled: input_device::Handled::No,
881 trace_id: None,
882 };
883 match sender.unbounded_send(vec![input_event.clone()]) {
884 Err(_) => assert!(false),
885 _ => {}
886 }
887
888 vec![input_event]
889 }
890
891 fn handle_input_device_request(
896 input_device_request: fidl_fuchsia_input_report::InputDeviceRequest,
897 ) {
898 match input_device_request {
899 fidl_fuchsia_input_report::InputDeviceRequest::GetDescriptor { responder } => {
900 let _ = responder.send(&fidl_fuchsia_input_report::DeviceDescriptor {
901 device_information: None,
902 mouse: Some(fidl_fuchsia_input_report::MouseDescriptor {
903 input: Some(fidl_fuchsia_input_report::MouseInputDescriptor {
904 movement_x: None,
905 movement_y: None,
906 scroll_v: None,
907 scroll_h: None,
908 buttons: Some(vec![0]),
909 position_x: None,
910 position_y: None,
911 ..Default::default()
912 }),
913 ..Default::default()
914 }),
915 sensor: None,
916 touch: None,
917 keyboard: None,
918 consumer_control: None,
919 ..Default::default()
920 });
921 }
922 _ => {}
923 }
924 }
925
926 #[fasync::run_singlethreaded(test)]
928 async fn multiple_devices_single_handler() {
929 let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
931 let first_device_binding =
932 fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
933 let second_device_binding =
934 fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
935
936 let (handler_event_sender, mut handler_event_receiver) =
938 futures::channel::mpsc::channel(100);
939 let input_handler = observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
940 handler_event_sender,
941 );
942
943 let (sender, receiver, handlers, _, _, _) =
945 InputPipelineAssembly::new(metrics::MetricsLogger::default())
946 .add_handler(input_handler)
947 .into_components();
948 let inspector = fuchsia_inspect::Inspector::default();
949 let test_node = inspector.root().create_child("input_pipeline");
950 let input_pipeline = InputPipeline {
951 pipeline_sender: sender,
952 device_event_sender,
953 device_event_receiver,
954 input_device_types: vec![],
955 input_device_bindings: Arc::new(Mutex::new(SortedVecMap::new())),
956 inspect_node: test_node,
957 metrics_logger: metrics::MetricsLogger::default(),
958 feature_flags: input_device::InputPipelineFeatureFlags::default(),
959 };
960 InputPipeline::run(receiver, handlers, metrics::MetricsLogger::default());
961
962 let first_device_events = send_input_event(first_device_binding.input_event_sender());
964 let second_device_events = send_input_event(second_device_binding.input_event_sender());
965
966 fasync::Task::local(async {
968 input_pipeline.handle_input_events().await;
969 })
970 .detach();
971
972 let first_handled_event = handler_event_receiver.next().await;
974 assert_eq!(first_handled_event, first_device_events.into_iter().next());
975
976 let second_handled_event = handler_event_receiver.next().await;
977 assert_eq!(second_handled_event, second_device_events.into_iter().next());
978 }
979
980 #[fasync::run_singlethreaded(test)]
982 async fn single_device_multiple_handlers() {
983 let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
985 let input_device_binding =
986 fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
987
988 let (first_handler_event_sender, mut first_handler_event_receiver) =
990 futures::channel::mpsc::channel(100);
991 let first_input_handler =
992 observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
993 first_handler_event_sender,
994 );
995 let (second_handler_event_sender, mut second_handler_event_receiver) =
996 futures::channel::mpsc::channel(100);
997 let second_input_handler =
998 observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
999 second_handler_event_sender,
1000 );
1001
1002 let (sender, receiver, handlers, _, _, _) =
1004 InputPipelineAssembly::new(metrics::MetricsLogger::default())
1005 .add_handler(first_input_handler)
1006 .add_handler(second_input_handler)
1007 .into_components();
1008 let inspector = fuchsia_inspect::Inspector::default();
1009 let test_node = inspector.root().create_child("input_pipeline");
1010 let input_pipeline = InputPipeline {
1011 pipeline_sender: sender,
1012 device_event_sender,
1013 device_event_receiver,
1014 input_device_types: vec![],
1015 input_device_bindings: Arc::new(Mutex::new(SortedVecMap::new())),
1016 inspect_node: test_node,
1017 metrics_logger: metrics::MetricsLogger::default(),
1018 feature_flags: input_device::InputPipelineFeatureFlags::default(),
1019 };
1020 InputPipeline::run(receiver, handlers, metrics::MetricsLogger::default());
1021
1022 let input_events = send_input_event(input_device_binding.input_event_sender());
1024
1025 fasync::Task::local(async {
1027 input_pipeline.handle_input_events().await;
1028 })
1029 .detach();
1030
1031 let expected_event = input_events.into_iter().next();
1033 let first_handler_event = first_handler_event_receiver.next().await;
1034 assert_eq!(first_handler_event, expected_event);
1035 let second_handler_event = second_handler_event_receiver.next().await;
1036 assert_eq!(second_handler_event, expected_event);
1037 }
1038
1039 #[fasync::run_singlethreaded(test)]
1042 async fn watch_devices_one_match_exists() {
1043 let mut count: i8 = 0;
1045 let dir = pseudo_directory! {
1046 "file_name" => pseudo_fs_service::host(
1047 move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1048 async move {
1049 while count < 3 {
1050 if let Some(input_device_request) =
1051 request_stream.try_next().await.unwrap()
1052 {
1053 handle_input_device_request(input_device_request);
1054 count += 1;
1055 }
1056 }
1057
1058 }.boxed()
1059 },
1060 )
1061 };
1062
1063 let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1065 let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1066 let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1069
1070 let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1071 let bindings: InputDeviceBindingMap = Arc::new(Mutex::new(SortedVecMap::new()));
1072 let supported_device_types = vec![input_device::InputDeviceType::Mouse];
1073
1074 let inspector = fuchsia_inspect::Inspector::default();
1075 let test_node = inspector.root().create_child("input_pipeline");
1076 test_node.record_string(
1077 "supported_input_devices",
1078 supported_device_types.clone().iter().join(", "),
1079 );
1080 let input_devices = test_node.create_child("input_devices");
1081 diagnostics_assertions::assert_data_tree!(inspector, root: {
1083 input_pipeline: {
1084 supported_input_devices: "Mouse",
1085 input_devices: {}
1086 }
1087 });
1088
1089 let _ = InputPipeline::watch_for_devices(
1090 device_watcher,
1091 dir_proxy_for_pipeline,
1092 supported_device_types,
1093 input_event_sender,
1094 bindings.clone(),
1095 &input_devices,
1096 true, InputPipelineFeatureFlags { enable_merge_touch_events: false },
1098 metrics::MetricsLogger::default(),
1099 )
1100 .await;
1101
1102 let bindings_map = bindings.lock().await;
1104 assert_eq!(bindings_map.len(), 1);
1105 let bindings_vector = bindings_map.get(&10);
1106 assert!(bindings_vector.is_some());
1107 assert_eq!(bindings_vector.unwrap().len(), 1);
1108 let boxed_mouse_binding = bindings_vector.unwrap().get(0);
1109 assert!(boxed_mouse_binding.is_some());
1110 assert_eq!(
1111 boxed_mouse_binding.unwrap().get_device_descriptor(),
1112 input_device::InputDeviceDescriptor::Mouse(mouse_binding::MouseDeviceDescriptor {
1113 device_id: 10,
1114 absolute_x_range: None,
1115 absolute_y_range: None,
1116 wheel_v_range: None,
1117 wheel_h_range: None,
1118 buttons: Some(vec![0]),
1119 counts_per_mm: mouse_model_database::db::DEFAULT_COUNTS_PER_MM,
1120 })
1121 );
1122
1123 diagnostics_assertions::assert_data_tree!(inspector, root: {
1125 input_pipeline: {
1126 supported_input_devices: "Mouse",
1127 input_devices: {
1128 devices_discovered: 1u64,
1129 devices_connected: 1u64,
1130 "file_name_Mouse": contains {
1131 reports_received_count: 0u64,
1132 reports_filtered_count: 0u64,
1133 events_generated: 0u64,
1134 last_received_timestamp_ns: 0u64,
1135 last_generated_timestamp_ns: 0u64,
1136 "fuchsia.inspect.Health": {
1137 status: "OK",
1138 start_timestamp_nanos: AnyProperty
1141 },
1142 }
1143 }
1144 }
1145 });
1146 }
1147
1148 #[fasync::run_singlethreaded(test)]
1151 async fn watch_devices_no_matches_exist() {
1152 let mut count: i8 = 0;
1154 let dir = pseudo_directory! {
1155 "file_name" => pseudo_fs_service::host(
1156 move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1157 async move {
1158 while count < 1 {
1159 if let Some(input_device_request) =
1160 request_stream.try_next().await.unwrap()
1161 {
1162 handle_input_device_request(input_device_request);
1163 count += 1;
1164 }
1165 }
1166
1167 }.boxed()
1168 },
1169 )
1170 };
1171
1172 let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1174 let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1175 let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1178
1179 let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1180 let bindings: InputDeviceBindingMap = Arc::new(Mutex::new(SortedVecMap::new()));
1181 let supported_device_types = vec![input_device::InputDeviceType::Keyboard];
1182
1183 let inspector = fuchsia_inspect::Inspector::default();
1184 let test_node = inspector.root().create_child("input_pipeline");
1185 test_node.record_string(
1186 "supported_input_devices",
1187 supported_device_types.clone().iter().join(", "),
1188 );
1189 let input_devices = test_node.create_child("input_devices");
1190 diagnostics_assertions::assert_data_tree!(inspector, root: {
1192 input_pipeline: {
1193 supported_input_devices: "Keyboard",
1194 input_devices: {}
1195 }
1196 });
1197
1198 let _ = InputPipeline::watch_for_devices(
1199 device_watcher,
1200 dir_proxy_for_pipeline,
1201 supported_device_types,
1202 input_event_sender,
1203 bindings.clone(),
1204 &input_devices,
1205 true, InputPipelineFeatureFlags { enable_merge_touch_events: false },
1207 metrics::MetricsLogger::default(),
1208 )
1209 .await;
1210
1211 let bindings = bindings.lock().await;
1213 assert_eq!(bindings.len(), 0);
1214
1215 diagnostics_assertions::assert_data_tree!(inspector, root: {
1217 input_pipeline: {
1218 supported_input_devices: "Keyboard",
1219 input_devices: {
1220 devices_discovered: 1u64,
1221 devices_connected: 0u64,
1222 "file_name_Unsupported": {
1223 "fuchsia.inspect.Health": {
1224 status: "UNHEALTHY",
1225 message: "Unsupported device type.",
1226 start_timestamp_nanos: AnyProperty
1229 },
1230 }
1231 }
1232 }
1233 });
1234 }
1235
1236 #[fasync::run_singlethreaded(test)]
1239 async fn handle_input_device_registry_request_stream() {
1240 let (input_device_registry_proxy, input_device_registry_request_stream) =
1241 create_proxy_and_stream::<fidl_fuchsia_input_injection::InputDeviceRegistryMarker>();
1242 let (input_device_client_end, mut input_device_request_stream) =
1243 create_request_stream::<fidl_fuchsia_input_report::InputDeviceMarker>();
1244
1245 let device_types = vec![input_device::InputDeviceType::Mouse];
1246 let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1247 let bindings: InputDeviceBindingMap = Arc::new(Mutex::new(SortedVecMap::new()));
1248
1249 let mut count: i8 = 0;
1251 fasync::Task::local(async move {
1252 let _ = input_device_registry_proxy.register(input_device_client_end);
1254
1255 while count < 3 {
1256 if let Some(input_device_request) =
1257 input_device_request_stream.try_next().await.unwrap()
1258 {
1259 handle_input_device_request(input_device_request);
1260 count += 1;
1261 }
1262 }
1263
1264 input_device_registry_proxy.take_event_stream();
1266 })
1267 .detach();
1268
1269 let inspector = fuchsia_inspect::Inspector::default();
1270 let test_node = inspector.root().create_child("input_pipeline");
1271
1272 let bindings_clone = bindings.clone();
1274 let _ = InputPipeline::handle_input_device_registry_request_stream(
1275 input_device_registry_request_stream,
1276 &device_types,
1277 &input_event_sender,
1278 &bindings_clone,
1279 &test_node,
1280 InputPipelineFeatureFlags { enable_merge_touch_events: false },
1281 metrics::MetricsLogger::default(),
1282 )
1283 .await;
1284
1285 let bindings = bindings.lock().await;
1287 assert_eq!(bindings.len(), 1);
1288 }
1289
1290 #[fasync::run_singlethreaded(test)]
1292 async fn check_inspect_node_has_correct_properties() {
1293 let device_types = vec![
1294 input_device::InputDeviceType::Touch,
1295 input_device::InputDeviceType::ConsumerControls,
1296 ];
1297 let inspector = fuchsia_inspect::Inspector::default();
1298 let test_node = inspector.root().create_child("input_pipeline");
1299 let (fake_handler_event_sender, _fake_handler_event_receiver) =
1301 futures::channel::mpsc::channel(100);
1302 let fake_input_handler =
1303 observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
1304 fake_handler_event_sender,
1305 );
1306 let assembly = InputPipelineAssembly::new(metrics::MetricsLogger::default())
1307 .add_handler(fake_input_handler);
1308 let _test_input_pipeline = InputPipeline::new(
1309 &Incoming::new(),
1310 device_types,
1311 assembly,
1312 test_node,
1313 InputPipelineFeatureFlags { enable_merge_touch_events: false },
1314 metrics::MetricsLogger::default(),
1315 );
1316 diagnostics_assertions::assert_data_tree!(inspector, root: {
1317 input_pipeline: {
1318 supported_input_devices: "Touch, ConsumerControls",
1319 handlers_registered: 1u64,
1320 handlers_healthy: 1u64,
1321 input_devices: {}
1322 }
1323 });
1324 }
1325
1326 struct SpecificInterestFakeHandler {
1327 interest_types: Vec<input_device::InputEventType>,
1328 event_sender: std::cell::RefCell<futures::channel::mpsc::Sender<input_device::InputEvent>>,
1329 }
1330
1331 impl SpecificInterestFakeHandler {
1332 pub fn new(
1333 interest_types: Vec<input_device::InputEventType>,
1334 event_sender: futures::channel::mpsc::Sender<input_device::InputEvent>,
1335 ) -> Rc<Self> {
1336 Rc::new(SpecificInterestFakeHandler {
1337 interest_types,
1338 event_sender: std::cell::RefCell::new(event_sender),
1339 })
1340 }
1341 }
1342
1343 impl Handler for SpecificInterestFakeHandler {
1344 fn set_handler_healthy(self: std::rc::Rc<Self>) {}
1345 fn set_handler_unhealthy(self: std::rc::Rc<Self>, _msg: &str) {}
1346 fn get_name(&self) -> &'static str {
1347 "SpecificInterestFakeHandler"
1348 }
1349
1350 fn interest(&self) -> Vec<input_device::InputEventType> {
1351 self.interest_types.clone()
1352 }
1353 }
1354
1355 #[async_trait(?Send)]
1356 impl input_handler::InputHandler for SpecificInterestFakeHandler {
1357 async fn handle_input_event(
1358 self: Rc<Self>,
1359 input_event: input_device::InputEvent,
1360 ) -> Vec<input_device::InputEvent> {
1361 match self.event_sender.borrow_mut().try_send(input_event.clone()) {
1362 Err(e) => panic!("SpecificInterestFakeHandler failed to send event: {:?}", e),
1363 Ok(_) => {}
1364 }
1365 vec![input_event]
1366 }
1367 }
1368
1369 #[fasync::run_singlethreaded(test)]
1370 async fn run_only_sends_events_to_interested_handlers() {
1371 let (mouse_sender, mut mouse_receiver) = futures::channel::mpsc::channel(1);
1373 let mouse_handler =
1374 SpecificInterestFakeHandler::new(vec![InputEventType::Mouse], mouse_sender);
1375
1376 let (fake_sender, mut fake_receiver) = futures::channel::mpsc::channel(1);
1378 let fake_handler =
1379 SpecificInterestFakeHandler::new(vec![InputEventType::Fake], fake_sender);
1380
1381 let (pipeline_sender, pipeline_receiver, handlers, _, _, _) =
1382 InputPipelineAssembly::new(metrics::MetricsLogger::default())
1383 .add_handler(mouse_handler)
1384 .add_handler(fake_handler)
1385 .into_components();
1386
1387 InputPipeline::run(pipeline_receiver, handlers, metrics::MetricsLogger::default());
1389
1390 let fake_event = input_device::InputEvent {
1392 device_event: input_device::InputDeviceEvent::Fake,
1393 device_descriptor: input_device::InputDeviceDescriptor::Fake,
1394 event_time: zx::MonotonicInstant::get(),
1395 handled: input_device::Handled::No,
1396 trace_id: None,
1397 };
1398
1399 pipeline_sender.unbounded_send(vec![fake_event.clone()]).expect("failed to send event");
1401
1402 let received_by_fake = fake_receiver.next().await;
1404 assert_eq!(received_by_fake, Some(fake_event));
1405
1406 assert!(mouse_receiver.try_next().is_err());
1408 }
1409
1410 fn create_mouse_event(x: f32, y: f32) -> input_device::InputEvent {
1411 input_device::InputEvent {
1412 device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
1413 mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
1414 millimeters: Position { x, y },
1415 }),
1416 None,
1417 None,
1418 mouse_binding::MousePhase::Move,
1419 SortedVecSet::new(),
1420 SortedVecSet::new(),
1421 None,
1422 None,
1423 )),
1424 device_descriptor: input_device::InputDeviceDescriptor::Mouse(
1425 mouse_binding::MouseDeviceDescriptor {
1426 device_id: 1,
1427 absolute_x_range: None,
1428 absolute_y_range: None,
1429 wheel_v_range: None,
1430 wheel_h_range: None,
1431 buttons: None,
1432 counts_per_mm: 1,
1433 },
1434 ),
1435 event_time: zx::MonotonicInstant::get(),
1436 handled: input_device::Handled::No,
1437 trace_id: None,
1438 }
1439 }
1440
1441 #[fasync::run_singlethreaded(test)]
1442 async fn run_mixed_event_types_dispatched_correctly() {
1443 let (mouse_sender, mut mouse_receiver) = futures::channel::mpsc::channel(10);
1445 let mouse_handler =
1446 SpecificInterestFakeHandler::new(vec![InputEventType::Mouse], mouse_sender);
1447
1448 let (fake_sender, mut fake_receiver) = futures::channel::mpsc::channel(10);
1450 let fake_handler =
1451 SpecificInterestFakeHandler::new(vec![InputEventType::Fake], fake_sender);
1452
1453 let (pipeline_sender, pipeline_receiver, handlers, _, _, _) =
1454 InputPipelineAssembly::new(metrics::MetricsLogger::default())
1455 .add_handler(mouse_handler)
1456 .add_handler(fake_handler)
1457 .into_components();
1458
1459 InputPipeline::run(pipeline_receiver, handlers, metrics::MetricsLogger::default());
1461
1462 let mouse_event_1 = create_mouse_event(1.0, 1.0);
1464 let mouse_event_2 = create_mouse_event(2.0, 2.0);
1465 let mouse_event_3 = create_mouse_event(3.0, 3.0);
1466
1467 let fake_event_1 = input_device::InputEvent {
1468 device_event: input_device::InputDeviceEvent::Fake,
1469 device_descriptor: input_device::InputDeviceDescriptor::Fake,
1470 event_time: zx::MonotonicInstant::get(),
1471 handled: input_device::Handled::No,
1472 trace_id: None,
1473 };
1474
1475 let mixed_batch = vec![
1478 mouse_event_1.clone(),
1479 mouse_event_2.clone(),
1480 fake_event_1.clone(),
1481 mouse_event_3.clone(),
1482 ];
1483 pipeline_sender.unbounded_send(mixed_batch).expect("failed to send events");
1484
1485 assert_eq!(mouse_receiver.next().await, Some(mouse_event_1));
1487 assert_eq!(mouse_receiver.next().await, Some(mouse_event_2));
1488 assert_eq!(mouse_receiver.next().await, Some(mouse_event_3));
1489
1490 assert_eq!(fake_receiver.next().await, Some(fake_event_1));
1492 }
1493}