1use crate::display_ownership::DisplayOwnership;
6use crate::focus_listener::FocusListener;
7use crate::input_handler::UnhandledInputHandler;
8use crate::{input_device, input_handler, metrics};
9use anyhow::{Context, Error, format_err};
10use focus_chain_provider::FocusChainProviderPublisher;
11use fuchsia_fs::directory::{WatchEvent, Watcher};
12use fuchsia_inspect::NumericProperty;
13use fuchsia_inspect::health::Reporter;
14use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
15use futures::future::LocalBoxFuture;
16use futures::lock::Mutex;
17use futures::{StreamExt, TryStreamExt};
18use itertools::Itertools;
19use metrics_registry::*;
20use std::collections::HashMap;
21use std::path::PathBuf;
22use std::rc::Rc;
23use std::sync::atomic::{AtomicU32, Ordering};
24use std::sync::{Arc, LazyLock};
25use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
26
27static NEXT_DEVICE_ID: LazyLock<AtomicU32> = LazyLock::new(|| AtomicU32::new(10));
34
35fn get_next_device_id() -> u32 {
39 NEXT_DEVICE_ID.fetch_add(1, Ordering::SeqCst)
40}
41
42type BoxedInputDeviceBinding = Box<dyn input_device::InputDeviceBinding>;
43
44pub type InputDeviceBindingHashMap = Arc<Mutex<HashMap<u32, Vec<BoxedInputDeviceBinding>>>>;
47
48pub struct InputPipelineAssembly {
62 sender: UnboundedSender<input_device::InputEvent>,
65 receiver: UnboundedReceiver<input_device::InputEvent>,
68 handlers: Vec<Rc<dyn input_handler::InputHandler>>,
70
71 display_ownership_fut: Option<LocalBoxFuture<'static, ()>>,
73
74 focus_listener_fut: Option<LocalBoxFuture<'static, ()>>,
76
77 metrics_logger: metrics::MetricsLogger,
79}
80
81impl InputPipelineAssembly {
82 pub fn new(metrics_logger: metrics::MetricsLogger) -> Self {
85 let (sender, receiver) = mpsc::unbounded();
86 InputPipelineAssembly {
87 sender,
88 receiver,
89 handlers: vec![],
90 metrics_logger,
91 display_ownership_fut: None,
92 focus_listener_fut: None,
93 }
94 }
95
96 pub fn add_handler(mut self, handler: Rc<dyn input_handler::InputHandler>) -> Self {
99 self.handlers.push(handler);
100 self
101 }
102
103 pub fn add_all_handlers(self, handlers: Vec<Rc<dyn input_handler::InputHandler>>) -> Self {
105 handlers.into_iter().fold(self, |assembly, handler| assembly.add_handler(handler))
106 }
107
108 pub fn add_display_ownership(
109 mut self,
110 display_ownership_event: zx::Event,
111 input_handlers_node: &fuchsia_inspect::Node,
112 ) -> InputPipelineAssembly {
113 let h = DisplayOwnership::new(display_ownership_event, input_handlers_node);
114 let metrics_logger_clone = self.metrics_logger.clone();
115 let h_clone = h.clone();
116 let sender_clone = self.sender.clone();
117 let display_ownership_fut = Box::pin(async move {
118 h_clone.clone().set_handler_healthy();
119 h_clone.clone()
120 .handle_ownership_change(sender_clone)
121 .await
122 .map_err(|e| {
123 metrics_logger_clone.log_error(
124 InputPipelineErrorMetricDimensionEvent::InputPipelineDisplayOwnershipIsNotSupposedToTerminate,
125 std::format!(
126 "display ownership is not supposed to terminate - this is likely a problem: {:?}", e));
127 })
128 .unwrap();
129 h_clone.set_handler_unhealthy("Receive loop terminated for handler: DisplayOwnership");
130 });
131 self.display_ownership_fut = Some(display_ownership_fut);
132 self.add_handler(h)
133 }
134
135 fn into_components(
141 self,
142 ) -> (
143 UnboundedSender<input_device::InputEvent>,
144 UnboundedReceiver<input_device::InputEvent>,
145 Vec<Rc<dyn input_handler::InputHandler>>,
146 metrics::MetricsLogger,
147 Option<LocalBoxFuture<'static, ()>>,
148 Option<LocalBoxFuture<'static, ()>>,
149 ) {
150 (
151 self.sender,
152 self.receiver,
153 self.handlers,
154 self.metrics_logger,
155 self.display_ownership_fut,
156 self.focus_listener_fut,
157 )
158 }
159
160 pub fn add_focus_listener(
161 mut self,
162 focus_chain_publisher: FocusChainProviderPublisher,
163 ) -> Self {
164 let metrics_logger_clone = self.metrics_logger.clone();
165 let focus_listener_fut = Box::pin(async move {
166 if let Ok(mut focus_listener) =
167 FocusListener::new(focus_chain_publisher, metrics_logger_clone).map_err(|e| {
168 log::warn!(
169 "could not create focus listener, focus will not be dispatched: {:?}",
170 e
171 )
172 })
173 {
174 let _result = focus_listener
177 .dispatch_focus_changes()
178 .await
179 .map(|_| {
180 log::warn!("dispatch focus loop ended, focus will no longer be dispatched")
181 })
182 .map_err(|e| {
183 panic!("could not dispatch focus changes, this is a fatal error: {:?}", e)
184 });
185 }
186 });
187 self.focus_listener_fut = Some(focus_listener_fut);
188 self
189 }
190}
191
192pub struct InputPipeline {
220 pipeline_sender: UnboundedSender<input_device::InputEvent>,
224
225 device_event_sender: UnboundedSender<input_device::InputEvent>,
228
229 device_event_receiver: UnboundedReceiver<input_device::InputEvent>,
231
232 input_device_types: Vec<input_device::InputDeviceType>,
234
235 input_device_bindings: InputDeviceBindingHashMap,
237
238 inspect_node: fuchsia_inspect::Node,
241
242 metrics_logger: metrics::MetricsLogger,
244}
245
246impl InputPipeline {
247 fn new_common(
248 input_device_types: Vec<input_device::InputDeviceType>,
249 assembly: InputPipelineAssembly,
250 inspect_node: fuchsia_inspect::Node,
251 ) -> Self {
252 let (
253 pipeline_sender,
254 receiver,
255 handlers,
256 metrics_logger,
257 display_ownership_fut,
258 focus_listener_fut,
259 ) = assembly.into_components();
260
261 let mut handlers_count = handlers.len();
262 if let Some(fut) = display_ownership_fut {
264 fasync::Task::local(fut).detach();
265 handlers_count += 1;
266 }
267
268 if let Some(fut) = focus_listener_fut {
270 fasync::Task::local(fut).detach();
271 handlers_count += 1;
272 }
273
274 inspect_node.record_string("supported_input_devices", input_device_types.iter().join(", "));
276 inspect_node.record_uint("handlers_registered", handlers_count as u64);
277 inspect_node.record_uint("handlers_healthy", handlers_count as u64);
278
279 InputPipeline::run(receiver, handlers);
281
282 let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
283 let input_device_bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
284 InputPipeline {
285 pipeline_sender,
286 device_event_sender,
287 device_event_receiver,
288 input_device_types,
289 input_device_bindings,
290 inspect_node,
291 metrics_logger,
292 }
293 }
294
295 pub fn new_for_test(
303 input_device_types: Vec<input_device::InputDeviceType>,
304 assembly: InputPipelineAssembly,
305 ) -> Self {
306 let inspector = fuchsia_inspect::Inspector::default();
307 let root = inspector.root();
308 let test_node = root.create_child("input_pipeline");
309 Self::new_common(input_device_types, assembly, test_node)
310 }
311
312 pub fn new(
319 input_device_types: Vec<input_device::InputDeviceType>,
320 assembly: InputPipelineAssembly,
321 inspect_node: fuchsia_inspect::Node,
322 metrics_logger: metrics::MetricsLogger,
323 ) -> Result<Self, Error> {
324 let input_pipeline = Self::new_common(input_device_types, assembly, inspect_node);
325 let input_device_types = input_pipeline.input_device_types.clone();
326 let input_event_sender = input_pipeline.device_event_sender.clone();
327 let input_device_bindings = input_pipeline.input_device_bindings.clone();
328 let devices_node = input_pipeline.inspect_node.create_child("input_devices");
329 fasync::Task::local(async move {
330 match async {
333 let dir_proxy = fuchsia_fs::directory::open_in_namespace(
334 input_device::INPUT_REPORT_PATH,
335 fuchsia_fs::PERM_READABLE,
336 )
337 .with_context(|| format!("failed to open {}", input_device::INPUT_REPORT_PATH))?;
338 let device_watcher =
339 Watcher::new(&dir_proxy).await.context("failed to create watcher")?;
340 Self::watch_for_devices(
341 device_watcher,
342 dir_proxy,
343 input_device_types,
344 input_event_sender,
345 input_device_bindings,
346 &devices_node,
347 false, metrics_logger.clone(),
349 )
350 .await
351 .context("failed to watch for devices")
352 }
353 .await
354 {
355 Ok(()) => {}
356 Err(err) => {
357 metrics_logger.log_warn(
361 InputPipelineErrorMetricDimensionEvent::InputPipelineUnableToWatchForNewInputDevices,
362 std::format!(
363 "Input pipeline is unable to watch for new input devices: {:?}",
364 err
365 ));
366 }
367 }
368 })
369 .detach();
370
371 Ok(input_pipeline)
372 }
373
374 pub fn input_device_bindings(&self) -> &InputDeviceBindingHashMap {
376 &self.input_device_bindings
377 }
378
379 pub fn input_event_sender(&self) -> &UnboundedSender<input_device::InputEvent> {
382 &self.device_event_sender
383 }
384
385 pub fn input_device_types(&self) -> &Vec<input_device::InputDeviceType> {
387 &self.input_device_types
388 }
389
390 pub async fn handle_input_events(mut self) {
392 let metrics_logger_clone = self.metrics_logger.clone();
393 while let Some(input_event) = self.device_event_receiver.next().await {
394 if let Err(e) = self.pipeline_sender.unbounded_send(input_event) {
395 metrics_logger_clone.log_error(
396 InputPipelineErrorMetricDimensionEvent::InputPipelineCouldNotForwardEventFromDriver,
397 std::format!("could not forward event from driver: {:?}", &e));
398 }
399 }
400
401 metrics_logger_clone.log_error(
402 InputPipelineErrorMetricDimensionEvent::InputPipelineStopHandlingEvents,
403 "Input pipeline stopped handling input events.".to_string(),
404 );
405 }
406
407 async fn watch_for_devices(
423 mut device_watcher: Watcher,
424 dir_proxy: fio::DirectoryProxy,
425 device_types: Vec<input_device::InputDeviceType>,
426 input_event_sender: UnboundedSender<input_device::InputEvent>,
427 bindings: InputDeviceBindingHashMap,
428 input_devices_node: &fuchsia_inspect::Node,
429 break_on_idle: bool,
430 metrics_logger: metrics::MetricsLogger,
431 ) -> Result<(), Error> {
432 let devices_discovered = input_devices_node.create_uint("devices_discovered", 0);
434 let devices_connected = input_devices_node.create_uint("devices_connected", 0);
435 while let Some(msg) = device_watcher.try_next().await? {
436 if let Ok(filename) = msg.filename.into_os_string().into_string() {
437 if filename == "." {
438 continue;
439 }
440
441 let pathbuf = PathBuf::from(filename.clone());
442 match msg.event {
443 WatchEvent::EXISTING | WatchEvent::ADD_FILE => {
444 log::info!("found input device {}", filename);
445 devices_discovered.add(1);
446 let device_proxy =
447 input_device::get_device_from_dir_entry_path(&dir_proxy, &pathbuf)?;
448 add_device_bindings(
449 &device_types,
450 &filename,
451 device_proxy,
452 &input_event_sender,
453 &bindings,
454 get_next_device_id(),
455 input_devices_node,
456 Some(&devices_connected),
457 metrics_logger.clone(),
458 )
459 .await;
460 }
461 WatchEvent::IDLE => {
462 if break_on_idle {
463 break;
464 }
465 }
466 _ => (),
467 }
468 }
469 }
470 input_devices_node.record(devices_discovered);
472 input_devices_node.record(devices_connected);
473 Err(format_err!("Input pipeline stopped watching for new input devices."))
474 }
475
476 pub async fn handle_input_device_registry_request_stream(
491 mut stream: fidl_fuchsia_input_injection::InputDeviceRegistryRequestStream,
492 device_types: &Vec<input_device::InputDeviceType>,
493 input_event_sender: &UnboundedSender<input_device::InputEvent>,
494 bindings: &InputDeviceBindingHashMap,
495 input_devices_node: &fuchsia_inspect::Node,
496 metrics_logger: metrics::MetricsLogger,
497 ) -> Result<(), Error> {
498 while let Some(request) = stream
499 .try_next()
500 .await
501 .context("Error handling input device registry request stream")?
502 {
503 match request {
504 fidl_fuchsia_input_injection::InputDeviceRegistryRequest::Register {
505 device,
506 ..
507 } => {
508 let device_proxy = device.into_proxy();
510
511 let device_id = get_next_device_id();
512
513 add_device_bindings(
514 device_types,
515 &format!("input-device-registry-{}", device_id),
516 device_proxy,
517 input_event_sender,
518 bindings,
519 device_id,
520 input_devices_node,
521 None,
522 metrics_logger.clone(),
523 )
524 .await;
525 }
526 fidl_fuchsia_input_injection::InputDeviceRegistryRequest::RegisterAndGetDeviceInfo {
527 device,
528 responder,
529 .. } => {
530 let device_proxy = device.into_proxy();
532
533 let device_id = get_next_device_id();
534
535 add_device_bindings(
536 device_types,
537 &format!("input-device-registry-{}", device_id),
538 device_proxy,
539 input_event_sender,
540 bindings,
541 device_id,
542 input_devices_node,
543 None,
544 metrics_logger.clone(),
545 )
546 .await;
547
548 responder.send(fidl_fuchsia_input_injection::InputDeviceRegistryRegisterAndGetDeviceInfoResponse{
549 device_id: Some(device_id),
550 ..Default::default()
551 }).expect("Failed to respond to RegisterAndGetDeviceInfo request");
552 }
553 }
554 }
555
556 Ok(())
557 }
558
559 fn run(
561 mut receiver: UnboundedReceiver<input_device::InputEvent>,
562 handlers: Vec<Rc<dyn input_handler::InputHandler>>,
563 ) {
564 fasync::Task::local(async move {
565 for handler in &handlers {
566 handler.clone().set_handler_healthy();
567 }
568
569 use input_device::InputEventType;
570 use std::collections::HashMap;
571
572 let mut handlers_by_type: HashMap<
574 InputEventType,
575 Vec<Rc<dyn input_handler::InputHandler>>,
576 > = HashMap::new();
577
578 let event_types = vec![
580 InputEventType::Keyboard,
581 InputEventType::LightSensor,
582 InputEventType::ConsumerControls,
583 InputEventType::Mouse,
584 InputEventType::TouchScreen,
585 InputEventType::Touchpad,
586 #[cfg(test)]
587 InputEventType::Fake,
588 ];
589
590 for event_type in event_types {
591 let handlers_for_type: Vec<Rc<dyn input_handler::InputHandler>> = handlers
592 .iter()
593 .filter(|h| h.interest().contains(&event_type))
594 .cloned()
595 .collect();
596 handlers_by_type.insert(event_type, handlers_for_type);
597 }
598
599 while let Some(event) = receiver.next().await {
600 let event_type = InputEventType::from(&event.device_event);
601
602 let handlers = handlers_by_type.get(&event_type).unwrap();
604
605 let mut events = vec![event];
606 for handler in handlers {
607 let mut next_events = vec![];
608 let handler_name = handler.get_name();
609 for e in events {
610 let out_events = {
611 let _async_trace = fuchsia_trace::async_enter!(
612 fuchsia_trace::Id::random(),
613 "input",
614 "handle_input_event",
615 "name" => handler_name
616 );
617 handler.clone().handle_input_event(e).await
618 };
619 next_events.extend(out_events);
620 }
621 events = next_events;
622 }
623
624 for event in events {
625 if event.handled == input_device::Handled::No {
626 log::warn!("unhandled input event: {:?}", &event);
627 }
628 }
629 }
630 for handler in &handlers {
631 handler.clone().set_handler_unhealthy("Pipeline loop terminated");
632 }
633 panic!("Runner task is not supposed to terminate.")
634 })
635 .detach();
636 }
637}
638
639async fn add_device_bindings(
659 device_types: &Vec<input_device::InputDeviceType>,
660 filename: &String,
661 device_proxy: fidl_fuchsia_input_report::InputDeviceProxy,
662 input_event_sender: &UnboundedSender<input_device::InputEvent>,
663 bindings: &InputDeviceBindingHashMap,
664 device_id: u32,
665 input_devices_node: &fuchsia_inspect::Node,
666 devices_connected: Option<&fuchsia_inspect::UintProperty>,
667 metrics_logger: metrics::MetricsLogger,
668) {
669 let mut matched_device_types = vec![];
670 if let Ok(descriptor) = device_proxy.get_descriptor().await {
671 for device_type in device_types {
672 if input_device::is_device_type(&descriptor, *device_type).await {
673 matched_device_types.push(device_type);
674 match devices_connected {
675 Some(dev_connected) => {
676 let _ = dev_connected.add(1);
677 }
678 None => (),
679 };
680 }
681 }
682 if matched_device_types.is_empty() {
683 log::info!(
684 "device {} did not match any supported device types: {:?}",
685 filename,
686 device_types
687 );
688 let device_node = input_devices_node.create_child(format!("{}_Unsupported", filename));
689 let mut health = fuchsia_inspect::health::Node::new(&device_node);
690 health.set_unhealthy("Unsupported device type.");
691 device_node.record(health);
692 input_devices_node.record(device_node);
693 return;
694 }
695 } else {
696 metrics_logger.clone().log_error(
697 InputPipelineErrorMetricDimensionEvent::InputPipelineNoDeviceDescriptor,
698 std::format!("cannot bind device {} without a device descriptor", filename),
699 );
700 return;
701 }
702
703 log::info!(
704 "binding {} to device types: {}",
705 filename,
706 matched_device_types
707 .iter()
708 .fold(String::new(), |device_types_string, device_type| device_types_string
709 + &format!("{:?}, ", device_type))
710 );
711
712 let mut new_bindings: Vec<BoxedInputDeviceBinding> = vec![];
713 for device_type in matched_device_types {
714 let proxy = device_proxy.clone();
735 let device_node = input_devices_node.create_child(format!("{}_{}", filename, device_type));
736 match input_device::get_device_binding(
737 *device_type,
738 proxy,
739 device_id,
740 input_event_sender.clone(),
741 device_node,
742 metrics_logger.clone(),
743 )
744 .await
745 {
746 Ok(binding) => new_bindings.push(binding),
747 Err(e) => {
748 metrics_logger.log_error(
749 InputPipelineErrorMetricDimensionEvent::InputPipelineFailedToBind,
750 std::format!("failed to bind {} as {:?}: {}", filename, device_type, e),
751 );
752 }
753 }
754 }
755
756 if !new_bindings.is_empty() {
757 let mut bindings = bindings.lock().await;
758 bindings.entry(device_id).or_insert(Vec::new()).extend(new_bindings);
759 }
760}
761
762#[cfg(test)]
763mod tests {
764 use super::*;
765 use crate::input_device::{InputDeviceBinding, InputEventType};
766 use crate::utils::Position;
767 use crate::{
768 fake_input_device_binding, mouse_binding, mouse_model_database,
769 observe_fake_events_input_handler,
770 };
771 use async_trait::async_trait;
772 use diagnostics_assertions::AnyProperty;
773 use fidl::endpoints::{create_proxy_and_stream, create_request_stream};
774 use fuchsia_async as fasync;
775 use futures::FutureExt;
776 use pretty_assertions::assert_eq;
777 use rand::Rng;
778 use std::collections::HashSet;
779 use vfs::{pseudo_directory, service as pseudo_fs_service};
780
781 const COUNTS_PER_MM: u32 = 12;
782
783 fn send_input_event(
788 sender: UnboundedSender<input_device::InputEvent>,
789 ) -> input_device::InputEvent {
790 let mut rng = rand::rng();
791 let offset =
792 Position { x: rng.random_range(0..10) as f32, y: rng.random_range(0..10) as f32 };
793 let input_event = input_device::InputEvent {
794 device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
795 mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
796 millimeters: Position {
797 x: offset.x / COUNTS_PER_MM as f32,
798 y: offset.y / COUNTS_PER_MM as f32,
799 },
800 }),
801 None, None, mouse_binding::MousePhase::Move,
804 HashSet::new(),
805 HashSet::new(),
806 None, None, )),
809 device_descriptor: input_device::InputDeviceDescriptor::Mouse(
810 mouse_binding::MouseDeviceDescriptor {
811 device_id: 1,
812 absolute_x_range: None,
813 absolute_y_range: None,
814 wheel_v_range: None,
815 wheel_h_range: None,
816 buttons: None,
817 counts_per_mm: COUNTS_PER_MM,
818 },
819 ),
820 event_time: zx::MonotonicInstant::get(),
821 handled: input_device::Handled::No,
822 trace_id: None,
823 };
824 match sender.unbounded_send(input_event.clone()) {
825 Err(_) => assert!(false),
826 _ => {}
827 }
828
829 input_event
830 }
831
832 fn handle_input_device_request(
837 input_device_request: fidl_fuchsia_input_report::InputDeviceRequest,
838 ) {
839 match input_device_request {
840 fidl_fuchsia_input_report::InputDeviceRequest::GetDescriptor { responder } => {
841 let _ = responder.send(&fidl_fuchsia_input_report::DeviceDescriptor {
842 device_information: None,
843 mouse: Some(fidl_fuchsia_input_report::MouseDescriptor {
844 input: Some(fidl_fuchsia_input_report::MouseInputDescriptor {
845 movement_x: None,
846 movement_y: None,
847 scroll_v: None,
848 scroll_h: None,
849 buttons: Some(vec![0]),
850 position_x: None,
851 position_y: None,
852 ..Default::default()
853 }),
854 ..Default::default()
855 }),
856 sensor: None,
857 touch: None,
858 keyboard: None,
859 consumer_control: None,
860 ..Default::default()
861 });
862 }
863 _ => {}
864 }
865 }
866
867 #[fasync::run_singlethreaded(test)]
869 async fn multiple_devices_single_handler() {
870 let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
872 let first_device_binding =
873 fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
874 let second_device_binding =
875 fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
876
877 let (handler_event_sender, mut handler_event_receiver) =
879 futures::channel::mpsc::channel(100);
880 let input_handler = observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
881 handler_event_sender,
882 );
883
884 let (sender, receiver, handlers, _, _, _) =
886 InputPipelineAssembly::new(metrics::MetricsLogger::default())
887 .add_handler(input_handler)
888 .into_components();
889 let inspector = fuchsia_inspect::Inspector::default();
890 let test_node = inspector.root().create_child("input_pipeline");
891 let input_pipeline = InputPipeline {
892 pipeline_sender: sender,
893 device_event_sender,
894 device_event_receiver,
895 input_device_types: vec![],
896 input_device_bindings: Arc::new(Mutex::new(HashMap::new())),
897 inspect_node: test_node,
898 metrics_logger: metrics::MetricsLogger::default(),
899 };
900 InputPipeline::run(receiver, handlers);
901
902 let first_device_event = send_input_event(first_device_binding.input_event_sender());
904 let second_device_event = send_input_event(second_device_binding.input_event_sender());
905
906 fasync::Task::local(async {
908 input_pipeline.handle_input_events().await;
909 })
910 .detach();
911
912 let first_handled_event = handler_event_receiver.next().await;
914 assert_eq!(first_handled_event, Some(first_device_event));
915
916 let second_handled_event = handler_event_receiver.next().await;
917 assert_eq!(second_handled_event, Some(second_device_event));
918 }
919
920 #[fasync::run_singlethreaded(test)]
922 async fn single_device_multiple_handlers() {
923 let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
925 let input_device_binding =
926 fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
927
928 let (first_handler_event_sender, mut first_handler_event_receiver) =
930 futures::channel::mpsc::channel(100);
931 let first_input_handler =
932 observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
933 first_handler_event_sender,
934 );
935 let (second_handler_event_sender, mut second_handler_event_receiver) =
936 futures::channel::mpsc::channel(100);
937 let second_input_handler =
938 observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
939 second_handler_event_sender,
940 );
941
942 let (sender, receiver, handlers, _, _, _) =
944 InputPipelineAssembly::new(metrics::MetricsLogger::default())
945 .add_handler(first_input_handler)
946 .add_handler(second_input_handler)
947 .into_components();
948 let inspector = fuchsia_inspect::Inspector::default();
949 let test_node = inspector.root().create_child("input_pipeline");
950 let input_pipeline = InputPipeline {
951 pipeline_sender: sender,
952 device_event_sender,
953 device_event_receiver,
954 input_device_types: vec![],
955 input_device_bindings: Arc::new(Mutex::new(HashMap::new())),
956 inspect_node: test_node,
957 metrics_logger: metrics::MetricsLogger::default(),
958 };
959 InputPipeline::run(receiver, handlers);
960
961 let input_event = send_input_event(input_device_binding.input_event_sender());
963
964 fasync::Task::local(async {
966 input_pipeline.handle_input_events().await;
967 })
968 .detach();
969
970 let first_handler_event = first_handler_event_receiver.next().await;
972 assert_eq!(first_handler_event, Some(input_event.clone()));
973 let second_handler_event = second_handler_event_receiver.next().await;
974 assert_eq!(second_handler_event, Some(input_event));
975 }
976
977 #[fasync::run_singlethreaded(test)]
980 async fn watch_devices_one_match_exists() {
981 let mut count: i8 = 0;
983 let dir = pseudo_directory! {
984 "file_name" => pseudo_fs_service::host(
985 move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
986 async move {
987 while count < 3 {
988 if let Some(input_device_request) =
989 request_stream.try_next().await.unwrap()
990 {
991 handle_input_device_request(input_device_request);
992 count += 1;
993 }
994 }
995
996 }.boxed()
997 },
998 )
999 };
1000
1001 let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1003 let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1004 let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1007
1008 let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1009 let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1010 let supported_device_types = vec![input_device::InputDeviceType::Mouse];
1011
1012 let inspector = fuchsia_inspect::Inspector::default();
1013 let test_node = inspector.root().create_child("input_pipeline");
1014 test_node.record_string(
1015 "supported_input_devices",
1016 supported_device_types.clone().iter().join(", "),
1017 );
1018 let input_devices = test_node.create_child("input_devices");
1019 diagnostics_assertions::assert_data_tree!(inspector, root: {
1021 input_pipeline: {
1022 supported_input_devices: "Mouse",
1023 input_devices: {}
1024 }
1025 });
1026
1027 let _ = InputPipeline::watch_for_devices(
1028 device_watcher,
1029 dir_proxy_for_pipeline,
1030 supported_device_types,
1031 input_event_sender,
1032 bindings.clone(),
1033 &input_devices,
1034 true, metrics::MetricsLogger::default(),
1036 )
1037 .await;
1038
1039 let bindings_hashmap = bindings.lock().await;
1041 assert_eq!(bindings_hashmap.len(), 1);
1042 let bindings_vector = bindings_hashmap.get(&10);
1043 assert!(bindings_vector.is_some());
1044 assert_eq!(bindings_vector.unwrap().len(), 1);
1045 let boxed_mouse_binding = bindings_vector.unwrap().get(0);
1046 assert!(boxed_mouse_binding.is_some());
1047 assert_eq!(
1048 boxed_mouse_binding.unwrap().get_device_descriptor(),
1049 input_device::InputDeviceDescriptor::Mouse(mouse_binding::MouseDeviceDescriptor {
1050 device_id: 10,
1051 absolute_x_range: None,
1052 absolute_y_range: None,
1053 wheel_v_range: None,
1054 wheel_h_range: None,
1055 buttons: Some(vec![0]),
1056 counts_per_mm: mouse_model_database::db::DEFAULT_COUNTS_PER_MM,
1057 })
1058 );
1059
1060 diagnostics_assertions::assert_data_tree!(inspector, root: {
1062 input_pipeline: {
1063 supported_input_devices: "Mouse",
1064 input_devices: {
1065 devices_discovered: 1u64,
1066 devices_connected: 1u64,
1067 "file_name_Mouse": contains {
1068 reports_received_count: 0u64,
1069 reports_filtered_count: 0u64,
1070 events_generated: 0u64,
1071 last_received_timestamp_ns: 0u64,
1072 last_generated_timestamp_ns: 0u64,
1073 "fuchsia.inspect.Health": {
1074 status: "OK",
1075 start_timestamp_nanos: AnyProperty
1078 },
1079 }
1080 }
1081 }
1082 });
1083 }
1084
1085 #[fasync::run_singlethreaded(test)]
1088 async fn watch_devices_no_matches_exist() {
1089 let mut count: i8 = 0;
1091 let dir = pseudo_directory! {
1092 "file_name" => pseudo_fs_service::host(
1093 move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1094 async move {
1095 while count < 1 {
1096 if let Some(input_device_request) =
1097 request_stream.try_next().await.unwrap()
1098 {
1099 handle_input_device_request(input_device_request);
1100 count += 1;
1101 }
1102 }
1103
1104 }.boxed()
1105 },
1106 )
1107 };
1108
1109 let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1111 let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1112 let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1115
1116 let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1117 let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1118 let supported_device_types = vec![input_device::InputDeviceType::Keyboard];
1119
1120 let inspector = fuchsia_inspect::Inspector::default();
1121 let test_node = inspector.root().create_child("input_pipeline");
1122 test_node.record_string(
1123 "supported_input_devices",
1124 supported_device_types.clone().iter().join(", "),
1125 );
1126 let input_devices = test_node.create_child("input_devices");
1127 diagnostics_assertions::assert_data_tree!(inspector, root: {
1129 input_pipeline: {
1130 supported_input_devices: "Keyboard",
1131 input_devices: {}
1132 }
1133 });
1134
1135 let _ = InputPipeline::watch_for_devices(
1136 device_watcher,
1137 dir_proxy_for_pipeline,
1138 supported_device_types,
1139 input_event_sender,
1140 bindings.clone(),
1141 &input_devices,
1142 true, metrics::MetricsLogger::default(),
1144 )
1145 .await;
1146
1147 let bindings = bindings.lock().await;
1149 assert_eq!(bindings.len(), 0);
1150
1151 diagnostics_assertions::assert_data_tree!(inspector, root: {
1153 input_pipeline: {
1154 supported_input_devices: "Keyboard",
1155 input_devices: {
1156 devices_discovered: 1u64,
1157 devices_connected: 0u64,
1158 "file_name_Unsupported": {
1159 "fuchsia.inspect.Health": {
1160 status: "UNHEALTHY",
1161 message: "Unsupported device type.",
1162 start_timestamp_nanos: AnyProperty
1165 },
1166 }
1167 }
1168 }
1169 });
1170 }
1171
1172 #[fasync::run_singlethreaded(test)]
1175 async fn handle_input_device_registry_request_stream() {
1176 let (input_device_registry_proxy, input_device_registry_request_stream) =
1177 create_proxy_and_stream::<fidl_fuchsia_input_injection::InputDeviceRegistryMarker>();
1178 let (input_device_client_end, mut input_device_request_stream) =
1179 create_request_stream::<fidl_fuchsia_input_report::InputDeviceMarker>();
1180
1181 let device_types = vec![input_device::InputDeviceType::Mouse];
1182 let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1183 let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1184
1185 let mut count: i8 = 0;
1187 fasync::Task::local(async move {
1188 let _ = input_device_registry_proxy.register(input_device_client_end);
1190
1191 while count < 3 {
1192 if let Some(input_device_request) =
1193 input_device_request_stream.try_next().await.unwrap()
1194 {
1195 handle_input_device_request(input_device_request);
1196 count += 1;
1197 }
1198 }
1199
1200 input_device_registry_proxy.take_event_stream();
1202 })
1203 .detach();
1204
1205 let inspector = fuchsia_inspect::Inspector::default();
1206 let test_node = inspector.root().create_child("input_pipeline");
1207
1208 let bindings_clone = bindings.clone();
1210 let _ = InputPipeline::handle_input_device_registry_request_stream(
1211 input_device_registry_request_stream,
1212 &device_types,
1213 &input_event_sender,
1214 &bindings_clone,
1215 &test_node,
1216 metrics::MetricsLogger::default(),
1217 )
1218 .await;
1219
1220 let bindings = bindings.lock().await;
1222 assert_eq!(bindings.len(), 1);
1223 }
1224
1225 #[fasync::run_singlethreaded(test)]
1227 async fn check_inspect_node_has_correct_properties() {
1228 let device_types = vec![
1229 input_device::InputDeviceType::Touch,
1230 input_device::InputDeviceType::ConsumerControls,
1231 ];
1232 let inspector = fuchsia_inspect::Inspector::default();
1233 let test_node = inspector.root().create_child("input_pipeline");
1234 let (fake_handler_event_sender, _fake_handler_event_receiver) =
1236 futures::channel::mpsc::channel(100);
1237 let fake_input_handler =
1238 observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
1239 fake_handler_event_sender,
1240 );
1241 let assembly = InputPipelineAssembly::new(metrics::MetricsLogger::default())
1242 .add_handler(fake_input_handler);
1243 let _test_input_pipeline = InputPipeline::new(
1244 device_types,
1245 assembly,
1246 test_node,
1247 metrics::MetricsLogger::default(),
1248 );
1249 diagnostics_assertions::assert_data_tree!(inspector, root: {
1250 input_pipeline: {
1251 supported_input_devices: "Touch, ConsumerControls",
1252 handlers_registered: 1u64,
1253 handlers_healthy: 1u64,
1254 input_devices: {}
1255 }
1256 });
1257 }
1258
1259 struct SpecificInterestFakeHandler {
1260 interest_types: Vec<input_device::InputEventType>,
1261 event_sender: std::cell::RefCell<futures::channel::mpsc::Sender<input_device::InputEvent>>,
1262 }
1263
1264 impl SpecificInterestFakeHandler {
1265 pub fn new(
1266 interest_types: Vec<input_device::InputEventType>,
1267 event_sender: futures::channel::mpsc::Sender<input_device::InputEvent>,
1268 ) -> Rc<Self> {
1269 Rc::new(SpecificInterestFakeHandler {
1270 interest_types,
1271 event_sender: std::cell::RefCell::new(event_sender),
1272 })
1273 }
1274 }
1275
1276 #[async_trait(?Send)]
1277 impl input_handler::InputHandler for SpecificInterestFakeHandler {
1278 async fn handle_input_event(
1279 self: Rc<Self>,
1280 input_event: input_device::InputEvent,
1281 ) -> Vec<input_device::InputEvent> {
1282 match self.event_sender.borrow_mut().try_send(input_event.clone()) {
1283 Err(e) => panic!("SpecificInterestFakeHandler failed to send event: {:?}", e),
1284 Ok(_) => {}
1285 }
1286 vec![input_event]
1287 }
1288
1289 fn set_handler_healthy(self: std::rc::Rc<Self>) {}
1290 fn set_handler_unhealthy(self: std::rc::Rc<Self>, _msg: &str) {}
1291 fn get_name(&self) -> &'static str {
1292 "SpecificInterestFakeHandler"
1293 }
1294
1295 fn interest(&self) -> Vec<input_device::InputEventType> {
1296 self.interest_types.clone()
1297 }
1298 }
1299
1300 #[fasync::run_singlethreaded(test)]
1301 async fn run_only_sends_events_to_interested_handlers() {
1302 let (mouse_sender, mut mouse_receiver) = futures::channel::mpsc::channel(1);
1304 let mouse_handler =
1305 SpecificInterestFakeHandler::new(vec![InputEventType::Mouse], mouse_sender);
1306
1307 let (fake_sender, mut fake_receiver) = futures::channel::mpsc::channel(1);
1309 let fake_handler =
1310 SpecificInterestFakeHandler::new(vec![InputEventType::Fake], fake_sender);
1311
1312 let (pipeline_sender, pipeline_receiver, handlers, _, _, _) =
1313 InputPipelineAssembly::new(metrics::MetricsLogger::default())
1314 .add_handler(mouse_handler)
1315 .add_handler(fake_handler)
1316 .into_components();
1317
1318 InputPipeline::run(pipeline_receiver, handlers);
1320
1321 let fake_event = input_device::InputEvent {
1323 device_event: input_device::InputDeviceEvent::Fake,
1324 device_descriptor: input_device::InputDeviceDescriptor::Fake,
1325 event_time: zx::MonotonicInstant::get(),
1326 handled: input_device::Handled::No,
1327 trace_id: None,
1328 };
1329
1330 pipeline_sender.unbounded_send(fake_event.clone()).expect("failed to send event");
1332
1333 let received_by_fake = fake_receiver.next().await;
1335 assert_eq!(received_by_fake, Some(fake_event));
1336
1337 assert!(mouse_receiver.try_next().is_err());
1339 }
1340}