1use crate::display_ownership::DisplayOwnership;
6use crate::focus_listener::FocusListener;
7use crate::input_device::InputPipelineFeatureFlags;
8use crate::input_handler::Handler;
9use crate::{Dispatcher, Incoming, input_device, input_handler, metrics};
10use anyhow::{Context, Error, format_err};
11use fidl::endpoints;
12use fidl_fuchsia_io as fio;
13use focus_chain_provider::FocusChainProviderPublisher;
14use fuchsia_async as fasync;
15use fuchsia_component::directory::AsRefDirectory;
16use fuchsia_fs::directory::{WatchEvent, Watcher};
17use fuchsia_inspect::NumericProperty;
18use fuchsia_inspect::health::Reporter;
19use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
20use futures::future::LocalBoxFuture;
21use futures::lock::Mutex;
22use futures::{StreamExt, TryStreamExt};
23use itertools::Itertools;
24use metrics_registry::*;
25use std::collections::HashMap;
26use std::path::PathBuf;
27use std::rc::Rc;
28use std::sync::atomic::{AtomicU32, Ordering};
29use std::sync::{Arc, LazyLock};
30
31static NEXT_DEVICE_ID: LazyLock<AtomicU32> = LazyLock::new(|| AtomicU32::new(10));
38
39fn get_next_device_id() -> u32 {
43 NEXT_DEVICE_ID.fetch_add(1, Ordering::SeqCst)
44}
45
46type BoxedInputDeviceBinding = Box<dyn input_device::InputDeviceBinding>;
47
48pub type InputDeviceBindingHashMap = Arc<Mutex<HashMap<u32, Vec<BoxedInputDeviceBinding>>>>;
51
52pub struct InputPipelineAssembly {
66 sender: UnboundedSender<Vec<input_device::InputEvent>>,
69 receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
72
73 handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>,
75
76 display_ownership_fut: Option<LocalBoxFuture<'static, ()>>,
78
79 focus_listener_fut: Option<LocalBoxFuture<'static, ()>>,
81
82 metrics_logger: metrics::MetricsLogger,
84}
85
86impl InputPipelineAssembly {
87 pub fn new(metrics_logger: metrics::MetricsLogger) -> Self {
90 let (sender, receiver) = mpsc::unbounded();
91 InputPipelineAssembly {
92 sender,
93 receiver,
94 handlers: vec![],
95 metrics_logger,
96 display_ownership_fut: None,
97 focus_listener_fut: None,
98 }
99 }
100
101 pub fn add_handler(mut self, handler: Rc<dyn input_handler::BatchInputHandler>) -> Self {
104 self.handlers.push(handler);
105 self
106 }
107
108 pub fn add_all_handlers(self, handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>) -> Self {
110 handlers.into_iter().fold(self, |assembly, handler| assembly.add_handler(handler))
111 }
112
113 pub fn add_display_ownership(
114 mut self,
115 display_ownership_event: zx::Event,
116 input_handlers_node: &fuchsia_inspect::Node,
117 ) -> InputPipelineAssembly {
118 let h = DisplayOwnership::new(
119 display_ownership_event,
120 input_handlers_node,
121 self.metrics_logger.clone(),
122 );
123 let metrics_logger_clone = self.metrics_logger.clone();
124 let h_clone = h.clone();
125 let sender_clone = self.sender.clone();
126 let display_ownership_fut = Box::pin(async move {
127 h_clone.clone().set_handler_healthy();
128 h_clone.clone()
129 .handle_ownership_change(sender_clone)
130 .await
131 .map_err(|e| {
132 metrics_logger_clone.log_error(
133 InputPipelineErrorMetricDimensionEvent::InputPipelineDisplayOwnershipIsNotSupposedToTerminate,
134 std::format!(
135 "display ownership is not supposed to terminate - this is likely a problem: {:?}", e));
136 })
137 .unwrap();
138 h_clone.set_handler_unhealthy("Receive loop terminated for handler: DisplayOwnership");
139 });
140 self.display_ownership_fut = Some(display_ownership_fut);
141 self.add_handler(h)
142 }
143
144 fn into_components(
150 self,
151 ) -> (
152 UnboundedSender<Vec<input_device::InputEvent>>,
153 UnboundedReceiver<Vec<input_device::InputEvent>>,
154 Vec<Rc<dyn input_handler::BatchInputHandler>>,
155 metrics::MetricsLogger,
156 Option<LocalBoxFuture<'static, ()>>,
157 Option<LocalBoxFuture<'static, ()>>,
158 ) {
159 (
160 self.sender,
161 self.receiver,
162 self.handlers,
163 self.metrics_logger,
164 self.display_ownership_fut,
165 self.focus_listener_fut,
166 )
167 }
168
169 pub fn add_focus_listener(
170 mut self,
171 incoming: &Incoming,
172 focus_chain_publisher: FocusChainProviderPublisher,
173 ) -> Self {
174 let metrics_logger_clone = self.metrics_logger.clone();
175 let incoming2 = incoming.clone();
176 let focus_listener_fut = Box::pin(async move {
177 if let Ok(mut focus_listener) = FocusListener::new(
178 &incoming2,
179 focus_chain_publisher,
180 metrics_logger_clone,
181 )
182 .map_err(|e| {
183 log::warn!("could not create focus listener, focus will not be dispatched: {:?}", e)
184 }) {
185 let _result = focus_listener
188 .dispatch_focus_changes()
189 .await
190 .map(|_| {
191 log::warn!("dispatch focus loop ended, focus will no longer be dispatched")
192 })
193 .map_err(|e| {
194 panic!("could not dispatch focus changes, this is a fatal error: {:?}", e)
195 });
196 }
197 });
198 self.focus_listener_fut = Some(focus_listener_fut);
199 self
200 }
201}
202
203pub struct InputPipeline {
231 pipeline_sender: UnboundedSender<Vec<input_device::InputEvent>>,
235
236 device_event_sender: UnboundedSender<Vec<input_device::InputEvent>>,
239
240 device_event_receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
242
243 input_device_types: Vec<input_device::InputDeviceType>,
245
246 input_device_bindings: InputDeviceBindingHashMap,
248
249 inspect_node: fuchsia_inspect::Node,
252
253 metrics_logger: metrics::MetricsLogger,
255
256 pub feature_flags: input_device::InputPipelineFeatureFlags,
258}
259
260impl InputPipeline {
261 fn new_common(
262 input_device_types: Vec<input_device::InputDeviceType>,
263 assembly: InputPipelineAssembly,
264 inspect_node: fuchsia_inspect::Node,
265 feature_flags: input_device::InputPipelineFeatureFlags,
266 ) -> Self {
267 let (
268 pipeline_sender,
269 receiver,
270 handlers,
271 metrics_logger,
272 display_ownership_fut,
273 focus_listener_fut,
274 ) = assembly.into_components();
275
276 let mut handlers_count = handlers.len();
277 if let Some(fut) = display_ownership_fut {
279 fasync::Task::local(fut).detach();
280 handlers_count += 1;
281 }
282
283 if let Some(fut) = focus_listener_fut {
285 fasync::Task::local(fut).detach();
286 handlers_count += 1;
287 }
288
289 inspect_node.record_string("supported_input_devices", input_device_types.iter().join(", "));
291 inspect_node.record_uint("handlers_registered", handlers_count as u64);
292 inspect_node.record_uint("handlers_healthy", handlers_count as u64);
293
294 InputPipeline::run(receiver, handlers, metrics_logger.clone());
296
297 let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
298 let input_device_bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
299 InputPipeline {
300 pipeline_sender,
301 device_event_sender,
302 device_event_receiver,
303 input_device_types,
304 input_device_bindings,
305 inspect_node,
306 metrics_logger,
307 feature_flags,
308 }
309 }
310
311 pub fn new_for_test(
319 input_device_types: Vec<input_device::InputDeviceType>,
320 assembly: InputPipelineAssembly,
321 ) -> Self {
322 let inspector = fuchsia_inspect::Inspector::default();
323 let root = inspector.root();
324 let test_node = root.create_child("input_pipeline");
325 Self::new_common(
326 input_device_types,
327 assembly,
328 test_node,
329 input_device::InputPipelineFeatureFlags { enable_merge_touch_events: false },
330 )
331 }
332
333 pub fn new(
340 incoming: &Incoming,
341 input_device_types: Vec<input_device::InputDeviceType>,
342 assembly: InputPipelineAssembly,
343 inspect_node: fuchsia_inspect::Node,
344 feature_flags: input_device::InputPipelineFeatureFlags,
345 metrics_logger: metrics::MetricsLogger,
346 ) -> Result<Self, Error> {
347 let input_pipeline =
348 Self::new_common(input_device_types, assembly, inspect_node, feature_flags);
349 let input_device_types = input_pipeline.input_device_types.clone();
350 let input_event_sender = input_pipeline.device_event_sender.clone();
351 let input_device_bindings = input_pipeline.input_device_bindings.clone();
352 let devices_node = input_pipeline.inspect_node.create_child("input_devices");
353 let feature_flags = input_pipeline.feature_flags.clone();
354 let incoming = incoming.clone();
355 fasync::Task::local(async move {
356 match async {
359 let (dir_proxy, server) = endpoints::create_proxy::<fio::DirectoryMarker>();
360 incoming.as_ref_directory().open(
361 input_device::INPUT_REPORT_PATH,
362 fio::PERM_READABLE,
363 server.into()
364 )
365 .with_context(|| format!("failed to open {}", input_device::INPUT_REPORT_PATH))?;
366 let device_watcher =
367 Watcher::new(&dir_proxy).await.context("failed to create watcher")?;
368 Self::watch_for_devices(
369 device_watcher,
370 dir_proxy,
371 input_device_types,
372 input_event_sender,
373 input_device_bindings,
374 &devices_node,
375 false, feature_flags,
377 metrics_logger.clone(),
378 )
379 .await
380 .context("failed to watch for devices")
381 }
382 .await
383 {
384 Ok(()) => {}
385 Err(err) => {
386 metrics_logger.log_warn(
390 InputPipelineErrorMetricDimensionEvent::InputPipelineUnableToWatchForNewInputDevices,
391 std::format!(
392 "Input pipeline is unable to watch for new input devices: {:?}",
393 err
394 ));
395 }
396 }
397 }).detach();
398
399 Ok(input_pipeline)
400 }
401
402 pub fn input_device_bindings(&self) -> &InputDeviceBindingHashMap {
404 &self.input_device_bindings
405 }
406
407 pub fn input_event_sender(&self) -> &UnboundedSender<Vec<input_device::InputEvent>> {
410 &self.device_event_sender
411 }
412
413 pub fn input_device_types(&self) -> &Vec<input_device::InputDeviceType> {
415 &self.input_device_types
416 }
417
418 pub async fn handle_input_events(mut self) {
420 let metrics_logger_clone = self.metrics_logger.clone();
421 while let Some(input_event) = self.device_event_receiver.next().await {
422 if let Err(e) = self.pipeline_sender.unbounded_send(input_event) {
423 metrics_logger_clone.log_error(
424 InputPipelineErrorMetricDimensionEvent::InputPipelineCouldNotForwardEventFromDriver,
425 std::format!("could not forward event from driver: {:?}", &e));
426 }
427 }
428
429 metrics_logger_clone.log_error(
430 InputPipelineErrorMetricDimensionEvent::InputPipelineStopHandlingEvents,
431 "Input pipeline stopped handling input events.".to_string(),
432 );
433 }
434
435 async fn watch_for_devices(
451 mut device_watcher: Watcher,
452 dir_proxy: fio::DirectoryProxy,
453 device_types: Vec<input_device::InputDeviceType>,
454 input_event_sender: UnboundedSender<Vec<input_device::InputEvent>>,
455 bindings: InputDeviceBindingHashMap,
456 input_devices_node: &fuchsia_inspect::Node,
457 break_on_idle: bool,
458 feature_flags: input_device::InputPipelineFeatureFlags,
459 metrics_logger: metrics::MetricsLogger,
460 ) -> Result<(), Error> {
461 let devices_discovered = input_devices_node.create_uint("devices_discovered", 0);
463 let devices_connected = input_devices_node.create_uint("devices_connected", 0);
464 while let Some(msg) = device_watcher.try_next().await? {
465 if let Ok(filename) = msg.filename.into_os_string().into_string() {
466 if filename == "." {
467 continue;
468 }
469
470 let pathbuf = PathBuf::from(filename.clone());
471 match msg.event {
472 WatchEvent::EXISTING | WatchEvent::ADD_FILE => {
473 log::info!("found input device {}", filename);
474 devices_discovered.add(1);
475 let device_proxy =
476 input_device::get_device_from_dir_entry_path(&dir_proxy, &pathbuf)?;
477 add_device_bindings(
478 &device_types,
479 &filename,
480 device_proxy,
481 &input_event_sender,
482 &bindings,
483 get_next_device_id(),
484 input_devices_node,
485 Some(&devices_connected),
486 feature_flags.clone(),
487 metrics_logger.clone(),
488 )
489 .await;
490 }
491 WatchEvent::IDLE => {
492 if break_on_idle {
493 break;
494 }
495 }
496 _ => (),
497 }
498 }
499 }
500 input_devices_node.record(devices_discovered);
502 input_devices_node.record(devices_connected);
503 Err(format_err!("Input pipeline stopped watching for new input devices."))
504 }
505
506 pub async fn handle_input_device_registry_request_stream(
521 mut stream: fidl_fuchsia_input_injection::InputDeviceRegistryRequestStream,
522 device_types: &Vec<input_device::InputDeviceType>,
523 input_event_sender: &UnboundedSender<Vec<input_device::InputEvent>>,
524 bindings: &InputDeviceBindingHashMap,
525 input_devices_node: &fuchsia_inspect::Node,
526 feature_flags: input_device::InputPipelineFeatureFlags,
527 metrics_logger: metrics::MetricsLogger,
528 ) -> Result<(), Error> {
529 while let Some(request) = stream
530 .try_next()
531 .await
532 .context("Error handling input device registry request stream")?
533 {
534 match request {
535 fidl_fuchsia_input_injection::InputDeviceRegistryRequest::Register {
536 device,
537 ..
538 } => {
539 let device_proxy = device.into_proxy();
541
542 let device_id = get_next_device_id();
543
544 add_device_bindings(
545 device_types,
546 &format!("input-device-registry-{}", device_id),
547 device_proxy,
548 input_event_sender,
549 bindings,
550 device_id,
551 input_devices_node,
552 None,
553 feature_flags.clone(),
554 metrics_logger.clone(),
555 )
556 .await;
557 }
558 fidl_fuchsia_input_injection::InputDeviceRegistryRequest::RegisterAndGetDeviceInfo {
559 device,
560 responder,
561 .. } => {
562 let device_proxy = device.into_proxy();
564
565 let device_id = get_next_device_id();
566
567 add_device_bindings(
568 device_types,
569 &format!("input-device-registry-{}", device_id),
570 device_proxy,
571 input_event_sender,
572 bindings,
573 device_id,
574 input_devices_node,
575 None,
576 feature_flags.clone(),
577 metrics_logger.clone(),
578 )
579 .await;
580
581 responder.send(fidl_fuchsia_input_injection::InputDeviceRegistryRegisterAndGetDeviceInfoResponse{
582 device_id: Some(device_id),
583 ..Default::default()
584 }).expect("Failed to respond to RegisterAndGetDeviceInfo request");
585 }
586 }
587 }
588
589 Ok(())
590 }
591
592 fn run(
594 mut receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
595 handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>,
596 metrics_logger: metrics::MetricsLogger,
597 ) {
598 Dispatcher::spawn_local(async move {
599 for handler in &handlers {
600 handler.clone().set_handler_healthy();
601 }
602
603 use input_device::InputEventType;
604 use std::collections::HashMap;
605
606 let mut handlers_by_type: HashMap<
608 InputEventType,
609 Vec<Rc<dyn input_handler::BatchInputHandler>>,
610 > = HashMap::new();
611
612 let event_types = vec![
614 InputEventType::Keyboard,
615 InputEventType::LightSensor,
616 InputEventType::ConsumerControls,
617 InputEventType::Mouse,
618 InputEventType::TouchScreen,
619 InputEventType::Touchpad,
620 #[cfg(test)]
621 InputEventType::Fake,
622 ];
623
624 for event_type in event_types {
625 let handlers_for_type: Vec<Rc<dyn input_handler::BatchInputHandler>> = handlers
626 .iter()
627 .filter(|h| h.interest().contains(&event_type))
628 .cloned()
629 .collect();
630 handlers_by_type.insert(event_type, handlers_for_type);
631 }
632
633 while let Some(events) = receiver.next().await {
634 if events.is_empty() {
635 continue;
636 }
637
638 let mut groups_seen = 0;
639 for (event_type, event_group) in events
640 .into_iter()
641 .chunk_by(|e| InputEventType::from(&e.device_event))
642 .into_iter()
643 {
644 groups_seen += 1;
645 if groups_seen == 2 {
646 metrics_logger.log_error(
647 InputPipelineErrorMetricDimensionEvent::InputFrameContainsMultipleTypesOfEvents,
648 "it is not recommended to contain multiple types of events in 1 send".to_string(),
649 );
650 }
651 let mut events_in_group: Vec<_> = event_group.collect();
652
653 let handlers = handlers_by_type.get(&event_type).unwrap();
655
656 for handler in handlers {
657 events_in_group =
658 handler.clone().handle_input_events(events_in_group).await;
659 }
660
661 for event in events_in_group {
662 if event.handled == input_device::Handled::No {
663 log::warn!("unhandled input event: {:?}", &event);
664 }
665 if let Some(trace_id) = event.trace_id {
666 fuchsia_trace::flow_end!(
667 "input",
668 "event_in_input_pipeline",
669 trace_id.into()
670 );
671 }
672 }
673 }
674 }
675 for handler in &handlers {
676 handler.clone().set_handler_unhealthy("Pipeline loop terminated");
677 }
678 panic!("Runner task is not supposed to terminate.")
679 }).detach();
680 }
681}
682
683async fn add_device_bindings(
703 device_types: &Vec<input_device::InputDeviceType>,
704 filename: &String,
705 device_proxy: fidl_fuchsia_input_report::InputDeviceProxy,
706 input_event_sender: &UnboundedSender<Vec<input_device::InputEvent>>,
707 bindings: &InputDeviceBindingHashMap,
708 device_id: u32,
709 input_devices_node: &fuchsia_inspect::Node,
710 devices_connected: Option<&fuchsia_inspect::UintProperty>,
711 feature_flags: InputPipelineFeatureFlags,
712 metrics_logger: metrics::MetricsLogger,
713) {
714 let mut matched_device_types = vec![];
715 if let Ok(descriptor) = device_proxy.get_descriptor().await {
716 for device_type in device_types {
717 if input_device::is_device_type(&descriptor, *device_type).await {
718 matched_device_types.push(device_type);
719 match devices_connected {
720 Some(dev_connected) => {
721 let _ = dev_connected.add(1);
722 }
723 None => (),
724 };
725 }
726 }
727 if matched_device_types.is_empty() {
728 log::info!(
729 "device {} did not match any supported device types: {:?}",
730 filename,
731 device_types
732 );
733 let device_node = input_devices_node.create_child(format!("{}_Unsupported", filename));
734 let mut health = fuchsia_inspect::health::Node::new(&device_node);
735 health.set_unhealthy("Unsupported device type.");
736 device_node.record(health);
737 input_devices_node.record(device_node);
738 return;
739 }
740 } else {
741 metrics_logger.clone().log_error(
742 InputPipelineErrorMetricDimensionEvent::InputPipelineNoDeviceDescriptor,
743 std::format!("cannot bind device {} without a device descriptor", filename),
744 );
745 return;
746 }
747
748 log::info!(
749 "binding {} to device types: {}",
750 filename,
751 matched_device_types
752 .iter()
753 .fold(String::new(), |device_types_string, device_type| device_types_string
754 + &format!("{:?}, ", device_type))
755 );
756
757 let mut new_bindings: Vec<BoxedInputDeviceBinding> = vec![];
758 for device_type in matched_device_types {
759 let proxy = device_proxy.clone();
780 let device_node = input_devices_node.create_child(format!("{}_{}", filename, device_type));
781 match input_device::get_device_binding(
782 *device_type,
783 proxy,
784 device_id,
785 input_event_sender.clone(),
786 device_node,
787 feature_flags.clone(),
788 metrics_logger.clone(),
789 )
790 .await
791 {
792 Ok(binding) => new_bindings.push(binding),
793 Err(e) => {
794 metrics_logger.log_error(
795 InputPipelineErrorMetricDimensionEvent::InputPipelineFailedToBind,
796 std::format!("failed to bind {} as {:?}: {}", filename, device_type, e),
797 );
798 }
799 }
800 }
801
802 if !new_bindings.is_empty() {
803 let mut bindings = bindings.lock().await;
804 bindings.entry(device_id).or_insert(Vec::new()).extend(new_bindings);
805 }
806}
807
808#[cfg(test)]
809mod tests {
810 use super::*;
811 use crate::input_device::{InputDeviceBinding, InputEventType};
812 use crate::utils::Position;
813 use crate::{
814 fake_input_device_binding, mouse_binding, mouse_model_database,
815 observe_fake_events_input_handler,
816 };
817 use async_trait::async_trait;
818 use diagnostics_assertions::AnyProperty;
819 use fidl::endpoints::{create_proxy_and_stream, create_request_stream};
820 use fuchsia_async as fasync;
821 use futures::FutureExt;
822 use pretty_assertions::assert_eq;
823 use rand::Rng;
824 use std::collections::HashSet;
825 use vfs::{pseudo_directory, service as pseudo_fs_service};
826
827 const COUNTS_PER_MM: u32 = 12;
828
829 fn send_input_event(
834 sender: UnboundedSender<Vec<input_device::InputEvent>>,
835 ) -> Vec<input_device::InputEvent> {
836 let mut rng = rand::rng();
837 let offset =
838 Position { x: rng.random_range(0..10) as f32, y: rng.random_range(0..10) as f32 };
839 let input_event = input_device::InputEvent {
840 device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
841 mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
842 millimeters: Position {
843 x: offset.x / COUNTS_PER_MM as f32,
844 y: offset.y / COUNTS_PER_MM as f32,
845 },
846 }),
847 None, None, mouse_binding::MousePhase::Move,
850 HashSet::new(),
851 HashSet::new(),
852 None, None, )),
855 device_descriptor: input_device::InputDeviceDescriptor::Mouse(
856 mouse_binding::MouseDeviceDescriptor {
857 device_id: 1,
858 absolute_x_range: None,
859 absolute_y_range: None,
860 wheel_v_range: None,
861 wheel_h_range: None,
862 buttons: None,
863 counts_per_mm: COUNTS_PER_MM,
864 },
865 ),
866 event_time: zx::MonotonicInstant::get(),
867 handled: input_device::Handled::No,
868 trace_id: None,
869 };
870 match sender.unbounded_send(vec![input_event.clone()]) {
871 Err(_) => assert!(false),
872 _ => {}
873 }
874
875 vec![input_event]
876 }
877
878 fn handle_input_device_request(
883 input_device_request: fidl_fuchsia_input_report::InputDeviceRequest,
884 ) {
885 match input_device_request {
886 fidl_fuchsia_input_report::InputDeviceRequest::GetDescriptor { responder } => {
887 let _ = responder.send(&fidl_fuchsia_input_report::DeviceDescriptor {
888 device_information: None,
889 mouse: Some(fidl_fuchsia_input_report::MouseDescriptor {
890 input: Some(fidl_fuchsia_input_report::MouseInputDescriptor {
891 movement_x: None,
892 movement_y: None,
893 scroll_v: None,
894 scroll_h: None,
895 buttons: Some(vec![0]),
896 position_x: None,
897 position_y: None,
898 ..Default::default()
899 }),
900 ..Default::default()
901 }),
902 sensor: None,
903 touch: None,
904 keyboard: None,
905 consumer_control: None,
906 ..Default::default()
907 });
908 }
909 _ => {}
910 }
911 }
912
913 #[fasync::run_singlethreaded(test)]
915 async fn multiple_devices_single_handler() {
916 let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
918 let first_device_binding =
919 fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
920 let second_device_binding =
921 fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
922
923 let (handler_event_sender, mut handler_event_receiver) =
925 futures::channel::mpsc::channel(100);
926 let input_handler = observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
927 handler_event_sender,
928 );
929
930 let (sender, receiver, handlers, _, _, _) =
932 InputPipelineAssembly::new(metrics::MetricsLogger::default())
933 .add_handler(input_handler)
934 .into_components();
935 let inspector = fuchsia_inspect::Inspector::default();
936 let test_node = inspector.root().create_child("input_pipeline");
937 let input_pipeline = InputPipeline {
938 pipeline_sender: sender,
939 device_event_sender,
940 device_event_receiver,
941 input_device_types: vec![],
942 input_device_bindings: Arc::new(Mutex::new(HashMap::new())),
943 inspect_node: test_node,
944 metrics_logger: metrics::MetricsLogger::default(),
945 feature_flags: input_device::InputPipelineFeatureFlags::default(),
946 };
947 InputPipeline::run(receiver, handlers, metrics::MetricsLogger::default());
948
949 let first_device_events = send_input_event(first_device_binding.input_event_sender());
951 let second_device_events = send_input_event(second_device_binding.input_event_sender());
952
953 fasync::Task::local(async {
955 input_pipeline.handle_input_events().await;
956 })
957 .detach();
958
959 let first_handled_event = handler_event_receiver.next().await;
961 assert_eq!(first_handled_event, first_device_events.into_iter().next());
962
963 let second_handled_event = handler_event_receiver.next().await;
964 assert_eq!(second_handled_event, second_device_events.into_iter().next());
965 }
966
967 #[fasync::run_singlethreaded(test)]
969 async fn single_device_multiple_handlers() {
970 let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
972 let input_device_binding =
973 fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
974
975 let (first_handler_event_sender, mut first_handler_event_receiver) =
977 futures::channel::mpsc::channel(100);
978 let first_input_handler =
979 observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
980 first_handler_event_sender,
981 );
982 let (second_handler_event_sender, mut second_handler_event_receiver) =
983 futures::channel::mpsc::channel(100);
984 let second_input_handler =
985 observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
986 second_handler_event_sender,
987 );
988
989 let (sender, receiver, handlers, _, _, _) =
991 InputPipelineAssembly::new(metrics::MetricsLogger::default())
992 .add_handler(first_input_handler)
993 .add_handler(second_input_handler)
994 .into_components();
995 let inspector = fuchsia_inspect::Inspector::default();
996 let test_node = inspector.root().create_child("input_pipeline");
997 let input_pipeline = InputPipeline {
998 pipeline_sender: sender,
999 device_event_sender,
1000 device_event_receiver,
1001 input_device_types: vec![],
1002 input_device_bindings: Arc::new(Mutex::new(HashMap::new())),
1003 inspect_node: test_node,
1004 metrics_logger: metrics::MetricsLogger::default(),
1005 feature_flags: input_device::InputPipelineFeatureFlags::default(),
1006 };
1007 InputPipeline::run(receiver, handlers, metrics::MetricsLogger::default());
1008
1009 let input_events = send_input_event(input_device_binding.input_event_sender());
1011
1012 fasync::Task::local(async {
1014 input_pipeline.handle_input_events().await;
1015 })
1016 .detach();
1017
1018 let expected_event = input_events.into_iter().next();
1020 let first_handler_event = first_handler_event_receiver.next().await;
1021 assert_eq!(first_handler_event, expected_event);
1022 let second_handler_event = second_handler_event_receiver.next().await;
1023 assert_eq!(second_handler_event, expected_event);
1024 }
1025
1026 #[fasync::run_singlethreaded(test)]
1029 async fn watch_devices_one_match_exists() {
1030 let mut count: i8 = 0;
1032 let dir = pseudo_directory! {
1033 "file_name" => pseudo_fs_service::host(
1034 move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1035 async move {
1036 while count < 3 {
1037 if let Some(input_device_request) =
1038 request_stream.try_next().await.unwrap()
1039 {
1040 handle_input_device_request(input_device_request);
1041 count += 1;
1042 }
1043 }
1044
1045 }.boxed()
1046 },
1047 )
1048 };
1049
1050 let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1052 let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1053 let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1056
1057 let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1058 let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1059 let supported_device_types = vec![input_device::InputDeviceType::Mouse];
1060
1061 let inspector = fuchsia_inspect::Inspector::default();
1062 let test_node = inspector.root().create_child("input_pipeline");
1063 test_node.record_string(
1064 "supported_input_devices",
1065 supported_device_types.clone().iter().join(", "),
1066 );
1067 let input_devices = test_node.create_child("input_devices");
1068 diagnostics_assertions::assert_data_tree!(inspector, root: {
1070 input_pipeline: {
1071 supported_input_devices: "Mouse",
1072 input_devices: {}
1073 }
1074 });
1075
1076 let _ = InputPipeline::watch_for_devices(
1077 device_watcher,
1078 dir_proxy_for_pipeline,
1079 supported_device_types,
1080 input_event_sender,
1081 bindings.clone(),
1082 &input_devices,
1083 true, InputPipelineFeatureFlags { enable_merge_touch_events: false },
1085 metrics::MetricsLogger::default(),
1086 )
1087 .await;
1088
1089 let bindings_hashmap = bindings.lock().await;
1091 assert_eq!(bindings_hashmap.len(), 1);
1092 let bindings_vector = bindings_hashmap.get(&10);
1093 assert!(bindings_vector.is_some());
1094 assert_eq!(bindings_vector.unwrap().len(), 1);
1095 let boxed_mouse_binding = bindings_vector.unwrap().get(0);
1096 assert!(boxed_mouse_binding.is_some());
1097 assert_eq!(
1098 boxed_mouse_binding.unwrap().get_device_descriptor(),
1099 input_device::InputDeviceDescriptor::Mouse(mouse_binding::MouseDeviceDescriptor {
1100 device_id: 10,
1101 absolute_x_range: None,
1102 absolute_y_range: None,
1103 wheel_v_range: None,
1104 wheel_h_range: None,
1105 buttons: Some(vec![0]),
1106 counts_per_mm: mouse_model_database::db::DEFAULT_COUNTS_PER_MM,
1107 })
1108 );
1109
1110 diagnostics_assertions::assert_data_tree!(inspector, root: {
1112 input_pipeline: {
1113 supported_input_devices: "Mouse",
1114 input_devices: {
1115 devices_discovered: 1u64,
1116 devices_connected: 1u64,
1117 "file_name_Mouse": contains {
1118 reports_received_count: 0u64,
1119 reports_filtered_count: 0u64,
1120 events_generated: 0u64,
1121 last_received_timestamp_ns: 0u64,
1122 last_generated_timestamp_ns: 0u64,
1123 "fuchsia.inspect.Health": {
1124 status: "OK",
1125 start_timestamp_nanos: AnyProperty
1128 },
1129 }
1130 }
1131 }
1132 });
1133 }
1134
1135 #[fasync::run_singlethreaded(test)]
1138 async fn watch_devices_no_matches_exist() {
1139 let mut count: i8 = 0;
1141 let dir = pseudo_directory! {
1142 "file_name" => pseudo_fs_service::host(
1143 move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1144 async move {
1145 while count < 1 {
1146 if let Some(input_device_request) =
1147 request_stream.try_next().await.unwrap()
1148 {
1149 handle_input_device_request(input_device_request);
1150 count += 1;
1151 }
1152 }
1153
1154 }.boxed()
1155 },
1156 )
1157 };
1158
1159 let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1161 let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1162 let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1165
1166 let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1167 let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1168 let supported_device_types = vec![input_device::InputDeviceType::Keyboard];
1169
1170 let inspector = fuchsia_inspect::Inspector::default();
1171 let test_node = inspector.root().create_child("input_pipeline");
1172 test_node.record_string(
1173 "supported_input_devices",
1174 supported_device_types.clone().iter().join(", "),
1175 );
1176 let input_devices = test_node.create_child("input_devices");
1177 diagnostics_assertions::assert_data_tree!(inspector, root: {
1179 input_pipeline: {
1180 supported_input_devices: "Keyboard",
1181 input_devices: {}
1182 }
1183 });
1184
1185 let _ = InputPipeline::watch_for_devices(
1186 device_watcher,
1187 dir_proxy_for_pipeline,
1188 supported_device_types,
1189 input_event_sender,
1190 bindings.clone(),
1191 &input_devices,
1192 true, InputPipelineFeatureFlags { enable_merge_touch_events: false },
1194 metrics::MetricsLogger::default(),
1195 )
1196 .await;
1197
1198 let bindings = bindings.lock().await;
1200 assert_eq!(bindings.len(), 0);
1201
1202 diagnostics_assertions::assert_data_tree!(inspector, root: {
1204 input_pipeline: {
1205 supported_input_devices: "Keyboard",
1206 input_devices: {
1207 devices_discovered: 1u64,
1208 devices_connected: 0u64,
1209 "file_name_Unsupported": {
1210 "fuchsia.inspect.Health": {
1211 status: "UNHEALTHY",
1212 message: "Unsupported device type.",
1213 start_timestamp_nanos: AnyProperty
1216 },
1217 }
1218 }
1219 }
1220 });
1221 }
1222
1223 #[fasync::run_singlethreaded(test)]
1226 async fn handle_input_device_registry_request_stream() {
1227 let (input_device_registry_proxy, input_device_registry_request_stream) =
1228 create_proxy_and_stream::<fidl_fuchsia_input_injection::InputDeviceRegistryMarker>();
1229 let (input_device_client_end, mut input_device_request_stream) =
1230 create_request_stream::<fidl_fuchsia_input_report::InputDeviceMarker>();
1231
1232 let device_types = vec![input_device::InputDeviceType::Mouse];
1233 let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1234 let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1235
1236 let mut count: i8 = 0;
1238 fasync::Task::local(async move {
1239 let _ = input_device_registry_proxy.register(input_device_client_end);
1241
1242 while count < 3 {
1243 if let Some(input_device_request) =
1244 input_device_request_stream.try_next().await.unwrap()
1245 {
1246 handle_input_device_request(input_device_request);
1247 count += 1;
1248 }
1249 }
1250
1251 input_device_registry_proxy.take_event_stream();
1253 })
1254 .detach();
1255
1256 let inspector = fuchsia_inspect::Inspector::default();
1257 let test_node = inspector.root().create_child("input_pipeline");
1258
1259 let bindings_clone = bindings.clone();
1261 let _ = InputPipeline::handle_input_device_registry_request_stream(
1262 input_device_registry_request_stream,
1263 &device_types,
1264 &input_event_sender,
1265 &bindings_clone,
1266 &test_node,
1267 InputPipelineFeatureFlags { enable_merge_touch_events: false },
1268 metrics::MetricsLogger::default(),
1269 )
1270 .await;
1271
1272 let bindings = bindings.lock().await;
1274 assert_eq!(bindings.len(), 1);
1275 }
1276
1277 #[fasync::run_singlethreaded(test)]
1279 async fn check_inspect_node_has_correct_properties() {
1280 let device_types = vec![
1281 input_device::InputDeviceType::Touch,
1282 input_device::InputDeviceType::ConsumerControls,
1283 ];
1284 let inspector = fuchsia_inspect::Inspector::default();
1285 let test_node = inspector.root().create_child("input_pipeline");
1286 let (fake_handler_event_sender, _fake_handler_event_receiver) =
1288 futures::channel::mpsc::channel(100);
1289 let fake_input_handler =
1290 observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
1291 fake_handler_event_sender,
1292 );
1293 let assembly = InputPipelineAssembly::new(metrics::MetricsLogger::default())
1294 .add_handler(fake_input_handler);
1295 let _test_input_pipeline = InputPipeline::new(
1296 &Incoming::new(),
1297 device_types,
1298 assembly,
1299 test_node,
1300 InputPipelineFeatureFlags { enable_merge_touch_events: false },
1301 metrics::MetricsLogger::default(),
1302 );
1303 diagnostics_assertions::assert_data_tree!(inspector, root: {
1304 input_pipeline: {
1305 supported_input_devices: "Touch, ConsumerControls",
1306 handlers_registered: 1u64,
1307 handlers_healthy: 1u64,
1308 input_devices: {}
1309 }
1310 });
1311 }
1312
1313 struct SpecificInterestFakeHandler {
1314 interest_types: Vec<input_device::InputEventType>,
1315 event_sender: std::cell::RefCell<futures::channel::mpsc::Sender<input_device::InputEvent>>,
1316 }
1317
1318 impl SpecificInterestFakeHandler {
1319 pub fn new(
1320 interest_types: Vec<input_device::InputEventType>,
1321 event_sender: futures::channel::mpsc::Sender<input_device::InputEvent>,
1322 ) -> Rc<Self> {
1323 Rc::new(SpecificInterestFakeHandler {
1324 interest_types,
1325 event_sender: std::cell::RefCell::new(event_sender),
1326 })
1327 }
1328 }
1329
1330 impl Handler for SpecificInterestFakeHandler {
1331 fn set_handler_healthy(self: std::rc::Rc<Self>) {}
1332 fn set_handler_unhealthy(self: std::rc::Rc<Self>, _msg: &str) {}
1333 fn get_name(&self) -> &'static str {
1334 "SpecificInterestFakeHandler"
1335 }
1336
1337 fn interest(&self) -> Vec<input_device::InputEventType> {
1338 self.interest_types.clone()
1339 }
1340 }
1341
1342 #[async_trait(?Send)]
1343 impl input_handler::InputHandler for SpecificInterestFakeHandler {
1344 async fn handle_input_event(
1345 self: Rc<Self>,
1346 input_event: input_device::InputEvent,
1347 ) -> Vec<input_device::InputEvent> {
1348 match self.event_sender.borrow_mut().try_send(input_event.clone()) {
1349 Err(e) => panic!("SpecificInterestFakeHandler failed to send event: {:?}", e),
1350 Ok(_) => {}
1351 }
1352 vec![input_event]
1353 }
1354 }
1355
1356 #[fasync::run_singlethreaded(test)]
1357 async fn run_only_sends_events_to_interested_handlers() {
1358 let (mouse_sender, mut mouse_receiver) = futures::channel::mpsc::channel(1);
1360 let mouse_handler =
1361 SpecificInterestFakeHandler::new(vec![InputEventType::Mouse], mouse_sender);
1362
1363 let (fake_sender, mut fake_receiver) = futures::channel::mpsc::channel(1);
1365 let fake_handler =
1366 SpecificInterestFakeHandler::new(vec![InputEventType::Fake], fake_sender);
1367
1368 let (pipeline_sender, pipeline_receiver, handlers, _, _, _) =
1369 InputPipelineAssembly::new(metrics::MetricsLogger::default())
1370 .add_handler(mouse_handler)
1371 .add_handler(fake_handler)
1372 .into_components();
1373
1374 InputPipeline::run(pipeline_receiver, handlers, metrics::MetricsLogger::default());
1376
1377 let fake_event = input_device::InputEvent {
1379 device_event: input_device::InputDeviceEvent::Fake,
1380 device_descriptor: input_device::InputDeviceDescriptor::Fake,
1381 event_time: zx::MonotonicInstant::get(),
1382 handled: input_device::Handled::No,
1383 trace_id: None,
1384 };
1385
1386 pipeline_sender.unbounded_send(vec![fake_event.clone()]).expect("failed to send event");
1388
1389 let received_by_fake = fake_receiver.next().await;
1391 assert_eq!(received_by_fake, Some(fake_event));
1392
1393 assert!(mouse_receiver.try_next().is_err());
1395 }
1396
1397 fn create_mouse_event(x: f32, y: f32) -> input_device::InputEvent {
1398 input_device::InputEvent {
1399 device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
1400 mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
1401 millimeters: Position { x, y },
1402 }),
1403 None,
1404 None,
1405 mouse_binding::MousePhase::Move,
1406 HashSet::new(),
1407 HashSet::new(),
1408 None,
1409 None,
1410 )),
1411 device_descriptor: input_device::InputDeviceDescriptor::Mouse(
1412 mouse_binding::MouseDeviceDescriptor {
1413 device_id: 1,
1414 absolute_x_range: None,
1415 absolute_y_range: None,
1416 wheel_v_range: None,
1417 wheel_h_range: None,
1418 buttons: None,
1419 counts_per_mm: 1,
1420 },
1421 ),
1422 event_time: zx::MonotonicInstant::get(),
1423 handled: input_device::Handled::No,
1424 trace_id: None,
1425 }
1426 }
1427
1428 #[fasync::run_singlethreaded(test)]
1429 async fn run_mixed_event_types_dispatched_correctly() {
1430 let (mouse_sender, mut mouse_receiver) = futures::channel::mpsc::channel(10);
1432 let mouse_handler =
1433 SpecificInterestFakeHandler::new(vec![InputEventType::Mouse], mouse_sender);
1434
1435 let (fake_sender, mut fake_receiver) = futures::channel::mpsc::channel(10);
1437 let fake_handler =
1438 SpecificInterestFakeHandler::new(vec![InputEventType::Fake], fake_sender);
1439
1440 let (pipeline_sender, pipeline_receiver, handlers, _, _, _) =
1441 InputPipelineAssembly::new(metrics::MetricsLogger::default())
1442 .add_handler(mouse_handler)
1443 .add_handler(fake_handler)
1444 .into_components();
1445
1446 InputPipeline::run(pipeline_receiver, handlers, metrics::MetricsLogger::default());
1448
1449 let mouse_event_1 = create_mouse_event(1.0, 1.0);
1451 let mouse_event_2 = create_mouse_event(2.0, 2.0);
1452 let mouse_event_3 = create_mouse_event(3.0, 3.0);
1453
1454 let fake_event_1 = input_device::InputEvent {
1455 device_event: input_device::InputDeviceEvent::Fake,
1456 device_descriptor: input_device::InputDeviceDescriptor::Fake,
1457 event_time: zx::MonotonicInstant::get(),
1458 handled: input_device::Handled::No,
1459 trace_id: None,
1460 };
1461
1462 let mixed_batch = vec![
1465 mouse_event_1.clone(),
1466 mouse_event_2.clone(),
1467 fake_event_1.clone(),
1468 mouse_event_3.clone(),
1469 ];
1470 pipeline_sender.unbounded_send(mixed_batch).expect("failed to send events");
1471
1472 assert_eq!(mouse_receiver.next().await, Some(mouse_event_1));
1474 assert_eq!(mouse_receiver.next().await, Some(mouse_event_2));
1475 assert_eq!(mouse_receiver.next().await, Some(mouse_event_3));
1476
1477 assert_eq!(fake_receiver.next().await, Some(fake_event_1));
1479 }
1480}