input_pipeline/
input_pipeline.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::display_ownership::DisplayOwnership;
6use crate::focus_listener::FocusListener;
7use crate::input_handler::UnhandledInputHandler;
8use crate::{input_device, input_handler, metrics};
9use anyhow::{Context, Error, format_err};
10use focus_chain_provider::FocusChainProviderPublisher;
11use fuchsia_fs::directory::{WatchEvent, Watcher};
12use fuchsia_inspect::NumericProperty;
13use fuchsia_inspect::health::Reporter;
14use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
15use futures::future::LocalBoxFuture;
16use futures::lock::Mutex;
17use futures::{StreamExt, TryStreamExt};
18use itertools::Itertools;
19use metrics_registry::*;
20use std::collections::HashMap;
21use std::path::PathBuf;
22use std::rc::Rc;
23use std::sync::atomic::{AtomicU32, Ordering};
24use std::sync::{Arc, LazyLock};
25use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
26
27/// Use a self incremental u32 unique id for device_id.
28///
29/// device id start from 10 to avoid conflict with default devices in Starnix.
30/// Currently, Starnix using 0 and 1 as default devices' id. Starnix need to
31/// use default devices to deliver events from physical devices until we have
32/// API to expose device changes to UI clients.
33static NEXT_DEVICE_ID: LazyLock<AtomicU32> = LazyLock::new(|| AtomicU32::new(10));
34
35/// Each time this function is invoked, it returns the current value of its
36/// internal counter (serving as a unique id for device_id) and then increments
37/// that counter in preparation for the next call.
38fn get_next_device_id() -> u32 {
39    NEXT_DEVICE_ID.fetch_add(1, Ordering::SeqCst)
40}
41
42type BoxedInputDeviceBinding = Box<dyn input_device::InputDeviceBinding>;
43
44/// An [`InputDeviceBindingHashMap`] maps an input device to one or more InputDeviceBindings.
45/// It uses unique device id as key.
46pub type InputDeviceBindingHashMap = Arc<Mutex<HashMap<u32, Vec<BoxedInputDeviceBinding>>>>;
47
48/// An input pipeline assembly.
49///
50/// Represents a partial stage of the input pipeline which accepts inputs through an asynchronous
51/// sender channel, and emits outputs through an asynchronous receiver channel.  Use [new] to
52/// create a new assembly.  Use [add_handler], or [add_all_handlers] to add the input pipeline
53/// handlers to use.  When done, [InputPipeline::new] can be used to make a new input pipeline.
54///
55/// # Implementation notes
56///
57/// Internally, when a new [InputPipelineAssembly] is created with multiple [InputHandler]s, the
58/// handlers are connected together using async queues.  This allows fully streamed processing of
59/// input events, and also allows some pipeline stages to generate events spontaneously, i.e.
60/// without an external stimulus.
61pub struct InputPipelineAssembly {
62    /// The top-level sender: send into this queue to inject an event into the input
63    /// pipeline.
64    sender: UnboundedSender<input_device::InputEvent>,
65    /// The bottom-level receiver: any events that fall through the entire pipeline can
66    /// be read from this receiver.
67    receiver: UnboundedReceiver<input_device::InputEvent>,
68    /// The input handlers that comprise the input pipeline.
69    handlers: Vec<Rc<dyn input_handler::InputHandler>>,
70
71    /// The display ownership watcher task.
72    display_ownership_fut: Option<LocalBoxFuture<'static, ()>>,
73
74    /// The focus listener task.
75    focus_listener_fut: Option<LocalBoxFuture<'static, ()>>,
76
77    /// The metrics logger.
78    metrics_logger: metrics::MetricsLogger,
79}
80
81impl InputPipelineAssembly {
82    /// Create a new but empty [InputPipelineAssembly]. Use [add_handler] or similar
83    /// to add new handlers to it.
84    pub fn new(metrics_logger: metrics::MetricsLogger) -> Self {
85        let (sender, receiver) = mpsc::unbounded();
86        InputPipelineAssembly {
87            sender,
88            receiver,
89            handlers: vec![],
90            metrics_logger,
91            display_ownership_fut: None,
92            focus_listener_fut: None,
93        }
94    }
95
96    /// Adds another [input_handler::InputHandler] into the [InputPipelineAssembly]. The handlers
97    /// are invoked in the order they are added. Returns `Self` for chaining.
98    pub fn add_handler(mut self, handler: Rc<dyn input_handler::InputHandler>) -> Self {
99        self.handlers.push(handler);
100        self
101    }
102
103    /// Adds all handlers into the assembly in the order they appear in `handlers`.
104    pub fn add_all_handlers(self, handlers: Vec<Rc<dyn input_handler::InputHandler>>) -> Self {
105        handlers.into_iter().fold(self, |assembly, handler| assembly.add_handler(handler))
106    }
107
108    pub fn add_display_ownership(
109        mut self,
110        display_ownership_event: zx::Event,
111        input_handlers_node: &fuchsia_inspect::Node,
112    ) -> InputPipelineAssembly {
113        let h = DisplayOwnership::new(display_ownership_event, input_handlers_node);
114        let metrics_logger_clone = self.metrics_logger.clone();
115        let h_clone = h.clone();
116        let sender_clone = self.sender.clone();
117        let display_ownership_fut = Box::pin(async move {
118            h_clone.clone().set_handler_healthy();
119            h_clone.clone()
120                .handle_ownership_change(sender_clone)
121                .await
122                .map_err(|e| {
123                    metrics_logger_clone.log_error(
124                        InputPipelineErrorMetricDimensionEvent::InputPipelineDisplayOwnershipIsNotSupposedToTerminate,
125                        std::format!(
126                            "display ownership is not supposed to terminate - this is likely a problem: {:?}", e));
127                        })
128                        .unwrap();
129            h_clone.set_handler_unhealthy("Receive loop terminated for handler: DisplayOwnership");
130        });
131        self.display_ownership_fut = Some(display_ownership_fut);
132        self.add_handler(h)
133    }
134
135    /// Deconstructs the assembly into constituent components, used when constructing
136    /// [InputPipeline].
137    ///
138    /// You should call [catch_unhandled] on the returned [async_channel::Receiver], and
139    /// [run] on the returned [fuchsia_async::Tasks] (or supply own equivalents).
140    fn into_components(
141        self,
142    ) -> (
143        UnboundedSender<input_device::InputEvent>,
144        UnboundedReceiver<input_device::InputEvent>,
145        Vec<Rc<dyn input_handler::InputHandler>>,
146        metrics::MetricsLogger,
147        Option<LocalBoxFuture<'static, ()>>,
148        Option<LocalBoxFuture<'static, ()>>,
149    ) {
150        (
151            self.sender,
152            self.receiver,
153            self.handlers,
154            self.metrics_logger,
155            self.display_ownership_fut,
156            self.focus_listener_fut,
157        )
158    }
159
160    pub fn add_focus_listener(
161        mut self,
162        focus_chain_publisher: FocusChainProviderPublisher,
163    ) -> Self {
164        let metrics_logger_clone = self.metrics_logger.clone();
165        let focus_listener_fut = Box::pin(async move {
166            if let Ok(mut focus_listener) =
167                FocusListener::new(focus_chain_publisher, metrics_logger_clone).map_err(|e| {
168                    log::warn!(
169                        "could not create focus listener, focus will not be dispatched: {:?}",
170                        e
171                    )
172                })
173            {
174                // This will await indefinitely and process focus messages in a loop, unless there
175                // is a problem.
176                let _result = focus_listener
177                    .dispatch_focus_changes()
178                    .await
179                    .map(|_| {
180                        log::warn!("dispatch focus loop ended, focus will no longer be dispatched")
181                    })
182                    .map_err(|e| {
183                        panic!("could not dispatch focus changes, this is a fatal error: {:?}", e)
184                    });
185            }
186        });
187        self.focus_listener_fut = Some(focus_listener_fut);
188        self
189    }
190}
191
192/// An [`InputPipeline`] manages input devices and propagates input events through input handlers.
193///
194/// On creation, clients declare what types of input devices an [`InputPipeline`] manages. The
195/// [`InputPipeline`] will continuously detect new input devices of supported type(s).
196///
197/// # Example
198/// ```
199/// let ime_handler =
200///     ImeHandler::new(scene_manager.session.clone(), scene_manager.compositor_id).await?;
201/// let touch_handler = TouchHandler::new(
202///     scene_manager.session.clone(),
203///     scene_manager.compositor_id,
204///     scene_manager.display_size
205/// ).await?;
206///
207/// let assembly = InputPipelineAssembly::new()
208///     .add_handler(Box::new(ime_handler)),
209///     .add_handler(Box::new(touch_handler)),
210/// let input_pipeline = InputPipeline::new(
211///     vec![
212///         input_device::InputDeviceType::Touch,
213///         input_device::InputDeviceType::Keyboard,
214///     ],
215///     assembly,
216/// );
217/// input_pipeline.handle_input_events().await;
218/// ```
219pub struct InputPipeline {
220    /// The entry point into the input handler pipeline. Incoming input events should
221    /// be inserted into this async queue, and the input pipeline will ensure that they
222    /// are propagated through all the input handlers in the appropriate sequence.
223    pipeline_sender: UnboundedSender<input_device::InputEvent>,
224
225    /// A clone of this sender is given to every InputDeviceBinding that this pipeline owns.
226    /// Each InputDeviceBinding will send InputEvents to the pipeline through this channel.
227    device_event_sender: UnboundedSender<input_device::InputEvent>,
228
229    /// Receives InputEvents from all InputDeviceBindings that this pipeline owns.
230    device_event_receiver: UnboundedReceiver<input_device::InputEvent>,
231
232    /// The types of devices this pipeline supports.
233    input_device_types: Vec<input_device::InputDeviceType>,
234
235    /// The InputDeviceBindings bound to this pipeline.
236    input_device_bindings: InputDeviceBindingHashMap,
237
238    /// This node is bound to the lifetime of this InputPipeline.
239    /// Inspect data will be dumped for this pipeline as long as it exists.
240    inspect_node: fuchsia_inspect::Node,
241
242    /// The metrics logger.
243    metrics_logger: metrics::MetricsLogger,
244}
245
246impl InputPipeline {
247    fn new_common(
248        input_device_types: Vec<input_device::InputDeviceType>,
249        assembly: InputPipelineAssembly,
250        inspect_node: fuchsia_inspect::Node,
251    ) -> Self {
252        let (
253            pipeline_sender,
254            receiver,
255            handlers,
256            metrics_logger,
257            display_ownership_fut,
258            focus_listener_fut,
259        ) = assembly.into_components();
260
261        let mut handlers_count = handlers.len();
262        // TODO: b/469745447 - should use futures::select! instead of Task::local().detach().
263        if let Some(fut) = display_ownership_fut {
264            fasync::Task::local(fut).detach();
265            handlers_count += 1;
266        }
267
268        // TODO: b/469745447 - should use futures::select! instead of Task::local().detach().
269        if let Some(fut) = focus_listener_fut {
270            fasync::Task::local(fut).detach();
271            handlers_count += 1;
272        }
273
274        // Add properties to inspect node
275        inspect_node.record_string("supported_input_devices", input_device_types.iter().join(", "));
276        inspect_node.record_uint("handlers_registered", handlers_count as u64);
277        inspect_node.record_uint("handlers_healthy", handlers_count as u64);
278
279        // Initializes all handlers and starts the input pipeline loop.
280        InputPipeline::run(receiver, handlers);
281
282        let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
283        let input_device_bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
284        InputPipeline {
285            pipeline_sender,
286            device_event_sender,
287            device_event_receiver,
288            input_device_types,
289            input_device_bindings,
290            inspect_node,
291            metrics_logger,
292        }
293    }
294
295    /// Creates a new [`InputPipeline`] for integration testing.
296    /// Unlike a production input pipeline, this pipeline will not monitor
297    /// `/dev/class/input-report` for devices.
298    ///
299    /// # Parameters
300    /// - `input_device_types`: The types of devices the new [`InputPipeline`] will support.
301    /// - `assembly`: The input handlers that the [`InputPipeline`] sends InputEvents to.
302    pub fn new_for_test(
303        input_device_types: Vec<input_device::InputDeviceType>,
304        assembly: InputPipelineAssembly,
305    ) -> Self {
306        let inspector = fuchsia_inspect::Inspector::default();
307        let root = inspector.root();
308        let test_node = root.create_child("input_pipeline");
309        Self::new_common(input_device_types, assembly, test_node)
310    }
311
312    /// Creates a new [`InputPipeline`] for production use.
313    ///
314    /// # Parameters
315    /// - `input_device_types`: The types of devices the new [`InputPipeline`] will support.
316    /// - `assembly`: The input handlers that the [`InputPipeline`] sends InputEvents to.
317    /// - `inspect_node`: The root node for InputPipeline's Inspect tree
318    pub fn new(
319        input_device_types: Vec<input_device::InputDeviceType>,
320        assembly: InputPipelineAssembly,
321        inspect_node: fuchsia_inspect::Node,
322        metrics_logger: metrics::MetricsLogger,
323    ) -> Result<Self, Error> {
324        let input_pipeline = Self::new_common(input_device_types, assembly, inspect_node);
325        let input_device_types = input_pipeline.input_device_types.clone();
326        let input_event_sender = input_pipeline.device_event_sender.clone();
327        let input_device_bindings = input_pipeline.input_device_bindings.clone();
328        let devices_node = input_pipeline.inspect_node.create_child("input_devices");
329        fasync::Task::local(async move {
330            // Watches the input device directory for new input devices. Creates new InputDeviceBindings
331            // that send InputEvents to `input_event_receiver`.
332            match async {
333                let dir_proxy = fuchsia_fs::directory::open_in_namespace(
334                    input_device::INPUT_REPORT_PATH,
335                    fuchsia_fs::PERM_READABLE,
336                )
337                .with_context(|| format!("failed to open {}", input_device::INPUT_REPORT_PATH))?;
338                let device_watcher =
339                    Watcher::new(&dir_proxy).await.context("failed to create watcher")?;
340                Self::watch_for_devices(
341                    device_watcher,
342                    dir_proxy,
343                    input_device_types,
344                    input_event_sender,
345                    input_device_bindings,
346                    &devices_node,
347                    false, /* break_on_idle */
348                    metrics_logger.clone(),
349                )
350                .await
351                .context("failed to watch for devices")
352            }
353            .await
354            {
355                Ok(()) => {}
356                Err(err) => {
357                    // This error is usually benign in tests: it means that the setup does not
358                    // support dynamic device discovery. Almost no tests support dynamic
359                    // device discovery, and they also do not need those.
360                    metrics_logger.log_warn(
361                        InputPipelineErrorMetricDimensionEvent::InputPipelineUnableToWatchForNewInputDevices,
362                        std::format!(
363                            "Input pipeline is unable to watch for new input devices: {:?}",
364                            err
365                        ));
366                }
367            }
368        })
369        .detach();
370
371        Ok(input_pipeline)
372    }
373
374    /// Gets the input device bindings.
375    pub fn input_device_bindings(&self) -> &InputDeviceBindingHashMap {
376        &self.input_device_bindings
377    }
378
379    /// Gets the input device sender: this is the channel that should be cloned
380    /// and used for injecting events from the drivers into the input pipeline.
381    pub fn input_event_sender(&self) -> &UnboundedSender<input_device::InputEvent> {
382        &self.device_event_sender
383    }
384
385    /// Gets a list of input device types supported by this input pipeline.
386    pub fn input_device_types(&self) -> &Vec<input_device::InputDeviceType> {
387        &self.input_device_types
388    }
389
390    /// Forwards all input events into the input pipeline.
391    pub async fn handle_input_events(mut self) {
392        let metrics_logger_clone = self.metrics_logger.clone();
393        while let Some(input_event) = self.device_event_receiver.next().await {
394            if let Err(e) = self.pipeline_sender.unbounded_send(input_event) {
395                metrics_logger_clone.log_error(
396                    InputPipelineErrorMetricDimensionEvent::InputPipelineCouldNotForwardEventFromDriver,
397                    std::format!("could not forward event from driver: {:?}", &e));
398            }
399        }
400
401        metrics_logger_clone.log_error(
402            InputPipelineErrorMetricDimensionEvent::InputPipelineStopHandlingEvents,
403            "Input pipeline stopped handling input events.".to_string(),
404        );
405    }
406
407    /// Watches the input report directory for new input devices. Creates InputDeviceBindings
408    /// if new devices match a type in `device_types`.
409    ///
410    /// # Parameters
411    /// - `device_watcher`: Watches the input report directory for new devices.
412    /// - `dir_proxy`: The directory containing InputDevice connections.
413    /// - `device_types`: The types of devices to watch for.
414    /// - `input_event_sender`: The channel new InputDeviceBindings will send InputEvents to.
415    /// - `bindings`: Holds all the InputDeviceBindings
416    /// - `input_devices_node`: The parent node for all device bindings' inspect nodes.
417    /// - `break_on_idle`: If true, stops watching for devices once all existing devices are handled.
418    /// - `metrics_logger`: The metrics logger.
419    ///
420    /// # Errors
421    /// If the input report directory or a file within it cannot be read.
422    async fn watch_for_devices(
423        mut device_watcher: Watcher,
424        dir_proxy: fio::DirectoryProxy,
425        device_types: Vec<input_device::InputDeviceType>,
426        input_event_sender: UnboundedSender<input_device::InputEvent>,
427        bindings: InputDeviceBindingHashMap,
428        input_devices_node: &fuchsia_inspect::Node,
429        break_on_idle: bool,
430        metrics_logger: metrics::MetricsLogger,
431    ) -> Result<(), Error> {
432        // Add non-static properties to inspect node.
433        let devices_discovered = input_devices_node.create_uint("devices_discovered", 0);
434        let devices_connected = input_devices_node.create_uint("devices_connected", 0);
435        while let Some(msg) = device_watcher.try_next().await? {
436            if let Ok(filename) = msg.filename.into_os_string().into_string() {
437                if filename == "." {
438                    continue;
439                }
440
441                let pathbuf = PathBuf::from(filename.clone());
442                match msg.event {
443                    WatchEvent::EXISTING | WatchEvent::ADD_FILE => {
444                        log::info!("found input device {}", filename);
445                        devices_discovered.add(1);
446                        let device_proxy =
447                            input_device::get_device_from_dir_entry_path(&dir_proxy, &pathbuf)?;
448                        add_device_bindings(
449                            &device_types,
450                            &filename,
451                            device_proxy,
452                            &input_event_sender,
453                            &bindings,
454                            get_next_device_id(),
455                            input_devices_node,
456                            Some(&devices_connected),
457                            metrics_logger.clone(),
458                        )
459                        .await;
460                    }
461                    WatchEvent::IDLE => {
462                        if break_on_idle {
463                            break;
464                        }
465                    }
466                    _ => (),
467                }
468            }
469        }
470        // Ensure inspect properties persist for debugging if device watch loop ends.
471        input_devices_node.record(devices_discovered);
472        input_devices_node.record(devices_connected);
473        Err(format_err!("Input pipeline stopped watching for new input devices."))
474    }
475
476    /// Handles the incoming InputDeviceRegistryRequestStream.
477    ///
478    /// This method will end when the request stream is closed. If the stream closes with an
479    /// error the error will be returned in the Result.
480    ///
481    /// **NOTE**: Only one stream is handled at a time. https://fxbug.dev/42061078
482    ///
483    /// # Parameters
484    /// - `stream`: The stream of InputDeviceRegistryRequests.
485    /// - `device_types`: The types of devices to watch for.
486    /// - `input_event_sender`: The channel new InputDeviceBindings will send InputEvents to.
487    /// - `bindings`: Holds all the InputDeviceBindings associated with the InputPipeline.
488    /// - `input_devices_node`: The parent node for all injected devices' inspect nodes.
489    /// - `metrics_logger`: The metrics logger.
490    pub async fn handle_input_device_registry_request_stream(
491        mut stream: fidl_fuchsia_input_injection::InputDeviceRegistryRequestStream,
492        device_types: &Vec<input_device::InputDeviceType>,
493        input_event_sender: &UnboundedSender<input_device::InputEvent>,
494        bindings: &InputDeviceBindingHashMap,
495        input_devices_node: &fuchsia_inspect::Node,
496        metrics_logger: metrics::MetricsLogger,
497    ) -> Result<(), Error> {
498        while let Some(request) = stream
499            .try_next()
500            .await
501            .context("Error handling input device registry request stream")?
502        {
503            match request {
504                fidl_fuchsia_input_injection::InputDeviceRegistryRequest::Register {
505                    device,
506                    ..
507                } => {
508                    // Add a binding if the device is a type being tracked
509                    let device_proxy = device.into_proxy();
510
511                    let device_id = get_next_device_id();
512
513                    add_device_bindings(
514                        device_types,
515                        &format!("input-device-registry-{}", device_id),
516                        device_proxy,
517                        input_event_sender,
518                        bindings,
519                        device_id,
520                        input_devices_node,
521                        None,
522                        metrics_logger.clone(),
523                    )
524                    .await;
525                }
526                fidl_fuchsia_input_injection::InputDeviceRegistryRequest::RegisterAndGetDeviceInfo {
527                    device,
528                    responder,
529                    .. } => {
530                    // Add a binding if the device is a type being tracked
531                    let device_proxy = device.into_proxy();
532
533                    let device_id = get_next_device_id();
534
535                    add_device_bindings(
536                        device_types,
537                        &format!("input-device-registry-{}", device_id),
538                        device_proxy,
539                        input_event_sender,
540                        bindings,
541                        device_id,
542                        input_devices_node,
543                        None,
544                        metrics_logger.clone(),
545                    )
546                    .await;
547
548                    responder.send(fidl_fuchsia_input_injection::InputDeviceRegistryRegisterAndGetDeviceInfoResponse{
549                        device_id: Some(device_id),
550                        ..Default::default()
551                    }).expect("Failed to respond to RegisterAndGetDeviceInfo request");
552                }
553            }
554        }
555
556        Ok(())
557    }
558
559    /// Initializes all handlers and starts the input pipeline loop in an asynchronous executor.
560    fn run(
561        mut receiver: UnboundedReceiver<input_device::InputEvent>,
562        handlers: Vec<Rc<dyn input_handler::InputHandler>>,
563    ) {
564        fasync::Task::local(async move {
565            for handler in &handlers {
566                handler.clone().set_handler_healthy();
567            }
568
569            use input_device::InputEventType;
570            use std::collections::HashMap;
571
572            // Pre-compute handler lists for each event type.
573            let mut handlers_by_type: HashMap<
574                InputEventType,
575                Vec<Rc<dyn input_handler::InputHandler>>,
576            > = HashMap::new();
577
578            // TODO: b/478262850 - We can use supported_input_devices to populate this list.
579            let event_types = vec![
580                InputEventType::Keyboard,
581                InputEventType::LightSensor,
582                InputEventType::ConsumerControls,
583                InputEventType::Mouse,
584                InputEventType::TouchScreen,
585                InputEventType::Touchpad,
586                #[cfg(test)]
587                InputEventType::Fake,
588            ];
589
590            for event_type in event_types {
591                let handlers_for_type: Vec<Rc<dyn input_handler::InputHandler>> = handlers
592                    .iter()
593                    .filter(|h| h.interest().contains(&event_type))
594                    .cloned()
595                    .collect();
596                handlers_by_type.insert(event_type, handlers_for_type);
597            }
598
599            while let Some(event) = receiver.next().await {
600                let event_type = InputEventType::from(&event.device_event);
601
602                // Get pre-computed handlers for this event type.
603                let handlers = handlers_by_type.get(&event_type).unwrap();
604
605                let mut events = vec![event];
606                for handler in handlers {
607                    let mut next_events = vec![];
608                    let handler_name = handler.get_name();
609                    for e in events {
610                        let out_events = {
611                            let _async_trace = fuchsia_trace::async_enter!(
612                                fuchsia_trace::Id::random(),
613                                "input",
614                                "handle_input_event",
615                                "name" => handler_name
616                            );
617                            handler.clone().handle_input_event(e).await
618                        };
619                        next_events.extend(out_events);
620                    }
621                    events = next_events;
622                }
623
624                for event in events {
625                    if event.handled == input_device::Handled::No {
626                        log::warn!("unhandled input event: {:?}", &event);
627                    }
628                }
629            }
630            for handler in &handlers {
631                handler.clone().set_handler_unhealthy("Pipeline loop terminated");
632            }
633            panic!("Runner task is not supposed to terminate.")
634        })
635        .detach();
636    }
637}
638
639/// Adds `InputDeviceBinding`s to `bindings` for all `device_types` exposed by `device_proxy`.
640///
641/// # Parameters
642/// - `device_types`: The types of devices to watch for.
643/// - `device_proxy`: A proxy to the input device.
644/// - `input_event_sender`: The channel new InputDeviceBindings will send InputEvents to.
645/// - `bindings`: Holds all the InputDeviceBindings associated with the InputPipeline.
646/// - `device_id`: The device id of the associated bindings.
647/// - `input_devices_node`: The parent node for all device bindings' inspect nodes.
648///
649/// # Note
650/// This will create multiple bindings, in the case where
651/// * `device_proxy().get_descriptor()` returns a `fidl_fuchsia_input_report::DeviceDescriptor`
652///   with multiple table fields populated, and
653/// * multiple populated table fields correspond to device types present in `device_types`
654///
655/// This is used, for example, to support the Atlas touchpad. In that case, a single
656/// node in `/dev/class/input-report` provides both a `fuchsia.input.report.MouseDescriptor` and
657/// a `fuchsia.input.report.TouchDescriptor`.
658async fn add_device_bindings(
659    device_types: &Vec<input_device::InputDeviceType>,
660    filename: &String,
661    device_proxy: fidl_fuchsia_input_report::InputDeviceProxy,
662    input_event_sender: &UnboundedSender<input_device::InputEvent>,
663    bindings: &InputDeviceBindingHashMap,
664    device_id: u32,
665    input_devices_node: &fuchsia_inspect::Node,
666    devices_connected: Option<&fuchsia_inspect::UintProperty>,
667    metrics_logger: metrics::MetricsLogger,
668) {
669    let mut matched_device_types = vec![];
670    if let Ok(descriptor) = device_proxy.get_descriptor().await {
671        for device_type in device_types {
672            if input_device::is_device_type(&descriptor, *device_type).await {
673                matched_device_types.push(device_type);
674                match devices_connected {
675                    Some(dev_connected) => {
676                        let _ = dev_connected.add(1);
677                    }
678                    None => (),
679                };
680            }
681        }
682        if matched_device_types.is_empty() {
683            log::info!(
684                "device {} did not match any supported device types: {:?}",
685                filename,
686                device_types
687            );
688            let device_node = input_devices_node.create_child(format!("{}_Unsupported", filename));
689            let mut health = fuchsia_inspect::health::Node::new(&device_node);
690            health.set_unhealthy("Unsupported device type.");
691            device_node.record(health);
692            input_devices_node.record(device_node);
693            return;
694        }
695    } else {
696        metrics_logger.clone().log_error(
697            InputPipelineErrorMetricDimensionEvent::InputPipelineNoDeviceDescriptor,
698            std::format!("cannot bind device {} without a device descriptor", filename),
699        );
700        return;
701    }
702
703    log::info!(
704        "binding {} to device types: {}",
705        filename,
706        matched_device_types
707            .iter()
708            .fold(String::new(), |device_types_string, device_type| device_types_string
709                + &format!("{:?}, ", device_type))
710    );
711
712    let mut new_bindings: Vec<BoxedInputDeviceBinding> = vec![];
713    for device_type in matched_device_types {
714        // Clone `device_proxy`, so that multiple bindings (e.g. a `MouseBinding` and a
715        // `TouchBinding`) can read data from the same `/dev/class/input-report` node.
716        //
717        // There's no conflict in having multiple bindings read from the same node,
718        // since:
719        // * each binding will create its own `fuchsia.input.report.InputReportsReader`, and
720        // * the device driver will copy each incoming report to each connected reader.
721        //
722        // This does mean that reports from the Atlas touchpad device get read twice
723        // (by a `MouseBinding` and a `TouchBinding`), regardless of whether the device
724        // is operating in mouse mode or touchpad mode.
725        //
726        // This hasn't been an issue because:
727        // * Semantically: things are fine, because each binding discards irrelevant reports.
728        //   (E.g. `MouseBinding` discards anything that isn't a `MouseInputReport`), and
729        // * Performance wise: things are fine, because the data rate of the touchpad is low
730        //   (125 HZ).
731        //
732        // If we add additional cases where bindings share an underlying `input-report` node,
733        // we might consider adding a multiplexing binding, to avoid reading duplicate reports.
734        let proxy = device_proxy.clone();
735        let device_node = input_devices_node.create_child(format!("{}_{}", filename, device_type));
736        match input_device::get_device_binding(
737            *device_type,
738            proxy,
739            device_id,
740            input_event_sender.clone(),
741            device_node,
742            metrics_logger.clone(),
743        )
744        .await
745        {
746            Ok(binding) => new_bindings.push(binding),
747            Err(e) => {
748                metrics_logger.log_error(
749                    InputPipelineErrorMetricDimensionEvent::InputPipelineFailedToBind,
750                    std::format!("failed to bind {} as {:?}: {}", filename, device_type, e),
751                );
752            }
753        }
754    }
755
756    if !new_bindings.is_empty() {
757        let mut bindings = bindings.lock().await;
758        bindings.entry(device_id).or_insert(Vec::new()).extend(new_bindings);
759    }
760}
761
762#[cfg(test)]
763mod tests {
764    use super::*;
765    use crate::input_device::{InputDeviceBinding, InputEventType};
766    use crate::utils::Position;
767    use crate::{
768        fake_input_device_binding, mouse_binding, mouse_model_database,
769        observe_fake_events_input_handler,
770    };
771    use async_trait::async_trait;
772    use diagnostics_assertions::AnyProperty;
773    use fidl::endpoints::{create_proxy_and_stream, create_request_stream};
774    use fuchsia_async as fasync;
775    use futures::FutureExt;
776    use pretty_assertions::assert_eq;
777    use rand::Rng;
778    use std::collections::HashSet;
779    use vfs::{pseudo_directory, service as pseudo_fs_service};
780
781    const COUNTS_PER_MM: u32 = 12;
782
783    /// Returns the InputEvent sent over `sender`.
784    ///
785    /// # Parameters
786    /// - `sender`: The channel to send the InputEvent over.
787    fn send_input_event(
788        sender: UnboundedSender<input_device::InputEvent>,
789    ) -> input_device::InputEvent {
790        let mut rng = rand::rng();
791        let offset =
792            Position { x: rng.random_range(0..10) as f32, y: rng.random_range(0..10) as f32 };
793        let input_event = input_device::InputEvent {
794            device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
795                mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
796                    millimeters: Position {
797                        x: offset.x / COUNTS_PER_MM as f32,
798                        y: offset.y / COUNTS_PER_MM as f32,
799                    },
800                }),
801                None, /* wheel_delta_v */
802                None, /* wheel_delta_h */
803                mouse_binding::MousePhase::Move,
804                HashSet::new(),
805                HashSet::new(),
806                None, /* is_precision_scroll */
807                None, /* wake_lease */
808            )),
809            device_descriptor: input_device::InputDeviceDescriptor::Mouse(
810                mouse_binding::MouseDeviceDescriptor {
811                    device_id: 1,
812                    absolute_x_range: None,
813                    absolute_y_range: None,
814                    wheel_v_range: None,
815                    wheel_h_range: None,
816                    buttons: None,
817                    counts_per_mm: COUNTS_PER_MM,
818                },
819            ),
820            event_time: zx::MonotonicInstant::get(),
821            handled: input_device::Handled::No,
822            trace_id: None,
823        };
824        match sender.unbounded_send(input_event.clone()) {
825            Err(_) => assert!(false),
826            _ => {}
827        }
828
829        input_event
830    }
831
832    /// Returns a MouseDescriptor on an InputDeviceRequest.
833    ///
834    /// # Parameters
835    /// - `input_device_request`: The request to handle.
836    fn handle_input_device_request(
837        input_device_request: fidl_fuchsia_input_report::InputDeviceRequest,
838    ) {
839        match input_device_request {
840            fidl_fuchsia_input_report::InputDeviceRequest::GetDescriptor { responder } => {
841                let _ = responder.send(&fidl_fuchsia_input_report::DeviceDescriptor {
842                    device_information: None,
843                    mouse: Some(fidl_fuchsia_input_report::MouseDescriptor {
844                        input: Some(fidl_fuchsia_input_report::MouseInputDescriptor {
845                            movement_x: None,
846                            movement_y: None,
847                            scroll_v: None,
848                            scroll_h: None,
849                            buttons: Some(vec![0]),
850                            position_x: None,
851                            position_y: None,
852                            ..Default::default()
853                        }),
854                        ..Default::default()
855                    }),
856                    sensor: None,
857                    touch: None,
858                    keyboard: None,
859                    consumer_control: None,
860                    ..Default::default()
861                });
862            }
863            _ => {}
864        }
865    }
866
867    /// Tests that an input pipeline handles events from multiple devices.
868    #[fasync::run_singlethreaded(test)]
869    async fn multiple_devices_single_handler() {
870        // Create two fake device bindings.
871        let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
872        let first_device_binding =
873            fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
874        let second_device_binding =
875            fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
876
877        // Create a fake input handler.
878        let (handler_event_sender, mut handler_event_receiver) =
879            futures::channel::mpsc::channel(100);
880        let input_handler = observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
881            handler_event_sender,
882        );
883
884        // Build the input pipeline.
885        let (sender, receiver, handlers, _, _, _) =
886            InputPipelineAssembly::new(metrics::MetricsLogger::default())
887                .add_handler(input_handler)
888                .into_components();
889        let inspector = fuchsia_inspect::Inspector::default();
890        let test_node = inspector.root().create_child("input_pipeline");
891        let input_pipeline = InputPipeline {
892            pipeline_sender: sender,
893            device_event_sender,
894            device_event_receiver,
895            input_device_types: vec![],
896            input_device_bindings: Arc::new(Mutex::new(HashMap::new())),
897            inspect_node: test_node,
898            metrics_logger: metrics::MetricsLogger::default(),
899        };
900        InputPipeline::run(receiver, handlers);
901
902        // Send an input event from each device.
903        let first_device_event = send_input_event(first_device_binding.input_event_sender());
904        let second_device_event = send_input_event(second_device_binding.input_event_sender());
905
906        // Run the pipeline.
907        fasync::Task::local(async {
908            input_pipeline.handle_input_events().await;
909        })
910        .detach();
911
912        // Assert the handler receives the events.
913        let first_handled_event = handler_event_receiver.next().await;
914        assert_eq!(first_handled_event, Some(first_device_event));
915
916        let second_handled_event = handler_event_receiver.next().await;
917        assert_eq!(second_handled_event, Some(second_device_event));
918    }
919
920    /// Tests that an input pipeline handles events through multiple input handlers.
921    #[fasync::run_singlethreaded(test)]
922    async fn single_device_multiple_handlers() {
923        // Create two fake device bindings.
924        let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
925        let input_device_binding =
926            fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
927
928        // Create two fake input handlers.
929        let (first_handler_event_sender, mut first_handler_event_receiver) =
930            futures::channel::mpsc::channel(100);
931        let first_input_handler =
932            observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
933                first_handler_event_sender,
934            );
935        let (second_handler_event_sender, mut second_handler_event_receiver) =
936            futures::channel::mpsc::channel(100);
937        let second_input_handler =
938            observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
939                second_handler_event_sender,
940            );
941
942        // Build the input pipeline.
943        let (sender, receiver, handlers, _, _, _) =
944            InputPipelineAssembly::new(metrics::MetricsLogger::default())
945                .add_handler(first_input_handler)
946                .add_handler(second_input_handler)
947                .into_components();
948        let inspector = fuchsia_inspect::Inspector::default();
949        let test_node = inspector.root().create_child("input_pipeline");
950        let input_pipeline = InputPipeline {
951            pipeline_sender: sender,
952            device_event_sender,
953            device_event_receiver,
954            input_device_types: vec![],
955            input_device_bindings: Arc::new(Mutex::new(HashMap::new())),
956            inspect_node: test_node,
957            metrics_logger: metrics::MetricsLogger::default(),
958        };
959        InputPipeline::run(receiver, handlers);
960
961        // Send an input event.
962        let input_event = send_input_event(input_device_binding.input_event_sender());
963
964        // Run the pipeline.
965        fasync::Task::local(async {
966            input_pipeline.handle_input_events().await;
967        })
968        .detach();
969
970        // Assert both handlers receive the event.
971        let first_handler_event = first_handler_event_receiver.next().await;
972        assert_eq!(first_handler_event, Some(input_event.clone()));
973        let second_handler_event = second_handler_event_receiver.next().await;
974        assert_eq!(second_handler_event, Some(input_event));
975    }
976
977    /// Tests that a single mouse device binding is created for the one input device in the
978    /// input report directory.
979    #[fasync::run_singlethreaded(test)]
980    async fn watch_devices_one_match_exists() {
981        // Create a file in a pseudo directory that represents an input device.
982        let mut count: i8 = 0;
983        let dir = pseudo_directory! {
984            "file_name" => pseudo_fs_service::host(
985                move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
986                    async move {
987                        while count < 3 {
988                            if let Some(input_device_request) =
989                                request_stream.try_next().await.unwrap()
990                            {
991                                handle_input_device_request(input_device_request);
992                                count += 1;
993                            }
994                        }
995
996                    }.boxed()
997                },
998            )
999        };
1000
1001        // Create a Watcher on the pseudo directory.
1002        let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1003        let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1004        // Get a proxy to the pseudo directory for the input pipeline. The input pipeline uses this
1005        // proxy to get connections to input devices.
1006        let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1007
1008        let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1009        let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1010        let supported_device_types = vec![input_device::InputDeviceType::Mouse];
1011
1012        let inspector = fuchsia_inspect::Inspector::default();
1013        let test_node = inspector.root().create_child("input_pipeline");
1014        test_node.record_string(
1015            "supported_input_devices",
1016            supported_device_types.clone().iter().join(", "),
1017        );
1018        let input_devices = test_node.create_child("input_devices");
1019        // Assert that inspect tree is initialized with no devices.
1020        diagnostics_assertions::assert_data_tree!(inspector, root: {
1021            input_pipeline: {
1022                supported_input_devices: "Mouse",
1023                input_devices: {}
1024            }
1025        });
1026
1027        let _ = InputPipeline::watch_for_devices(
1028            device_watcher,
1029            dir_proxy_for_pipeline,
1030            supported_device_types,
1031            input_event_sender,
1032            bindings.clone(),
1033            &input_devices,
1034            true, /* break_on_idle */
1035            metrics::MetricsLogger::default(),
1036        )
1037        .await;
1038
1039        // Assert that one mouse device with accurate device id was found.
1040        let bindings_hashmap = bindings.lock().await;
1041        assert_eq!(bindings_hashmap.len(), 1);
1042        let bindings_vector = bindings_hashmap.get(&10);
1043        assert!(bindings_vector.is_some());
1044        assert_eq!(bindings_vector.unwrap().len(), 1);
1045        let boxed_mouse_binding = bindings_vector.unwrap().get(0);
1046        assert!(boxed_mouse_binding.is_some());
1047        assert_eq!(
1048            boxed_mouse_binding.unwrap().get_device_descriptor(),
1049            input_device::InputDeviceDescriptor::Mouse(mouse_binding::MouseDeviceDescriptor {
1050                device_id: 10,
1051                absolute_x_range: None,
1052                absolute_y_range: None,
1053                wheel_v_range: None,
1054                wheel_h_range: None,
1055                buttons: Some(vec![0]),
1056                counts_per_mm: mouse_model_database::db::DEFAULT_COUNTS_PER_MM,
1057            })
1058        );
1059
1060        // Assert that inspect tree reflects new device discovered and connected.
1061        diagnostics_assertions::assert_data_tree!(inspector, root: {
1062            input_pipeline: {
1063                supported_input_devices: "Mouse",
1064                input_devices: {
1065                    devices_discovered: 1u64,
1066                    devices_connected: 1u64,
1067                    "file_name_Mouse": contains {
1068                        reports_received_count: 0u64,
1069                        reports_filtered_count: 0u64,
1070                        events_generated: 0u64,
1071                        last_received_timestamp_ns: 0u64,
1072                        last_generated_timestamp_ns: 0u64,
1073                        "fuchsia.inspect.Health": {
1074                            status: "OK",
1075                            // Timestamp value is unpredictable and not relevant in this context,
1076                            // so we only assert that the property is present.
1077                            start_timestamp_nanos: AnyProperty
1078                        },
1079                    }
1080                }
1081            }
1082        });
1083    }
1084
1085    /// Tests that no device bindings are created because the input pipeline looks for keyboard devices
1086    /// but only a mouse exists.
1087    #[fasync::run_singlethreaded(test)]
1088    async fn watch_devices_no_matches_exist() {
1089        // Create a file in a pseudo directory that represents an input device.
1090        let mut count: i8 = 0;
1091        let dir = pseudo_directory! {
1092            "file_name" => pseudo_fs_service::host(
1093                move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1094                    async move {
1095                        while count < 1 {
1096                            if let Some(input_device_request) =
1097                                request_stream.try_next().await.unwrap()
1098                            {
1099                                handle_input_device_request(input_device_request);
1100                                count += 1;
1101                            }
1102                        }
1103
1104                    }.boxed()
1105                },
1106            )
1107        };
1108
1109        // Create a Watcher on the pseudo directory.
1110        let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1111        let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1112        // Get a proxy to the pseudo directory for the input pipeline. The input pipeline uses this
1113        // proxy to get connections to input devices.
1114        let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1115
1116        let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1117        let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1118        let supported_device_types = vec![input_device::InputDeviceType::Keyboard];
1119
1120        let inspector = fuchsia_inspect::Inspector::default();
1121        let test_node = inspector.root().create_child("input_pipeline");
1122        test_node.record_string(
1123            "supported_input_devices",
1124            supported_device_types.clone().iter().join(", "),
1125        );
1126        let input_devices = test_node.create_child("input_devices");
1127        // Assert that inspect tree is initialized with no devices.
1128        diagnostics_assertions::assert_data_tree!(inspector, root: {
1129            input_pipeline: {
1130                supported_input_devices: "Keyboard",
1131                input_devices: {}
1132            }
1133        });
1134
1135        let _ = InputPipeline::watch_for_devices(
1136            device_watcher,
1137            dir_proxy_for_pipeline,
1138            supported_device_types,
1139            input_event_sender,
1140            bindings.clone(),
1141            &input_devices,
1142            true, /* break_on_idle */
1143            metrics::MetricsLogger::default(),
1144        )
1145        .await;
1146
1147        // Assert that no devices were found.
1148        let bindings = bindings.lock().await;
1149        assert_eq!(bindings.len(), 0);
1150
1151        // Assert that inspect tree reflects new device discovered, but not connected.
1152        diagnostics_assertions::assert_data_tree!(inspector, root: {
1153            input_pipeline: {
1154                supported_input_devices: "Keyboard",
1155                input_devices: {
1156                    devices_discovered: 1u64,
1157                    devices_connected: 0u64,
1158                    "file_name_Unsupported": {
1159                        "fuchsia.inspect.Health": {
1160                            status: "UNHEALTHY",
1161                            message: "Unsupported device type.",
1162                            // Timestamp value is unpredictable and not relevant in this context,
1163                            // so we only assert that the property is present.
1164                            start_timestamp_nanos: AnyProperty
1165                        },
1166                    }
1167                }
1168            }
1169        });
1170    }
1171
1172    /// Tests that a single keyboard device binding is created for the input device registered
1173    /// through InputDeviceRegistry.
1174    #[fasync::run_singlethreaded(test)]
1175    async fn handle_input_device_registry_request_stream() {
1176        let (input_device_registry_proxy, input_device_registry_request_stream) =
1177            create_proxy_and_stream::<fidl_fuchsia_input_injection::InputDeviceRegistryMarker>();
1178        let (input_device_client_end, mut input_device_request_stream) =
1179            create_request_stream::<fidl_fuchsia_input_report::InputDeviceMarker>();
1180
1181        let device_types = vec![input_device::InputDeviceType::Mouse];
1182        let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1183        let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1184
1185        // Handle input device requests.
1186        let mut count: i8 = 0;
1187        fasync::Task::local(async move {
1188            // Register a device.
1189            let _ = input_device_registry_proxy.register(input_device_client_end);
1190
1191            while count < 3 {
1192                if let Some(input_device_request) =
1193                    input_device_request_stream.try_next().await.unwrap()
1194                {
1195                    handle_input_device_request(input_device_request);
1196                    count += 1;
1197                }
1198            }
1199
1200            // End handle_input_device_registry_request_stream() by taking the event stream.
1201            input_device_registry_proxy.take_event_stream();
1202        })
1203        .detach();
1204
1205        let inspector = fuchsia_inspect::Inspector::default();
1206        let test_node = inspector.root().create_child("input_pipeline");
1207
1208        // Start listening for InputDeviceRegistryRequests.
1209        let bindings_clone = bindings.clone();
1210        let _ = InputPipeline::handle_input_device_registry_request_stream(
1211            input_device_registry_request_stream,
1212            &device_types,
1213            &input_event_sender,
1214            &bindings_clone,
1215            &test_node,
1216            metrics::MetricsLogger::default(),
1217        )
1218        .await;
1219
1220        // Assert that a device was registered.
1221        let bindings = bindings.lock().await;
1222        assert_eq!(bindings.len(), 1);
1223    }
1224
1225    // Tests that correct properties are added to inspect node when InputPipeline is created.
1226    #[fasync::run_singlethreaded(test)]
1227    async fn check_inspect_node_has_correct_properties() {
1228        let device_types = vec![
1229            input_device::InputDeviceType::Touch,
1230            input_device::InputDeviceType::ConsumerControls,
1231        ];
1232        let inspector = fuchsia_inspect::Inspector::default();
1233        let test_node = inspector.root().create_child("input_pipeline");
1234        // Create fake input handler for assembly
1235        let (fake_handler_event_sender, _fake_handler_event_receiver) =
1236            futures::channel::mpsc::channel(100);
1237        let fake_input_handler =
1238            observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
1239                fake_handler_event_sender,
1240            );
1241        let assembly = InputPipelineAssembly::new(metrics::MetricsLogger::default())
1242            .add_handler(fake_input_handler);
1243        let _test_input_pipeline = InputPipeline::new(
1244            device_types,
1245            assembly,
1246            test_node,
1247            metrics::MetricsLogger::default(),
1248        );
1249        diagnostics_assertions::assert_data_tree!(inspector, root: {
1250            input_pipeline: {
1251                supported_input_devices: "Touch, ConsumerControls",
1252                handlers_registered: 1u64,
1253                handlers_healthy: 1u64,
1254                input_devices: {}
1255            }
1256        });
1257    }
1258
1259    struct SpecificInterestFakeHandler {
1260        interest_types: Vec<input_device::InputEventType>,
1261        event_sender: std::cell::RefCell<futures::channel::mpsc::Sender<input_device::InputEvent>>,
1262    }
1263
1264    impl SpecificInterestFakeHandler {
1265        pub fn new(
1266            interest_types: Vec<input_device::InputEventType>,
1267            event_sender: futures::channel::mpsc::Sender<input_device::InputEvent>,
1268        ) -> Rc<Self> {
1269            Rc::new(SpecificInterestFakeHandler {
1270                interest_types,
1271                event_sender: std::cell::RefCell::new(event_sender),
1272            })
1273        }
1274    }
1275
1276    #[async_trait(?Send)]
1277    impl input_handler::InputHandler for SpecificInterestFakeHandler {
1278        async fn handle_input_event(
1279            self: Rc<Self>,
1280            input_event: input_device::InputEvent,
1281        ) -> Vec<input_device::InputEvent> {
1282            match self.event_sender.borrow_mut().try_send(input_event.clone()) {
1283                Err(e) => panic!("SpecificInterestFakeHandler failed to send event: {:?}", e),
1284                Ok(_) => {}
1285            }
1286            vec![input_event]
1287        }
1288
1289        fn set_handler_healthy(self: std::rc::Rc<Self>) {}
1290        fn set_handler_unhealthy(self: std::rc::Rc<Self>, _msg: &str) {}
1291        fn get_name(&self) -> &'static str {
1292            "SpecificInterestFakeHandler"
1293        }
1294
1295        fn interest(&self) -> Vec<input_device::InputEventType> {
1296            self.interest_types.clone()
1297        }
1298    }
1299
1300    #[fasync::run_singlethreaded(test)]
1301    async fn run_only_sends_events_to_interested_handlers() {
1302        // Mouse Handler (Specific Interest: Mouse)
1303        let (mouse_sender, mut mouse_receiver) = futures::channel::mpsc::channel(1);
1304        let mouse_handler =
1305            SpecificInterestFakeHandler::new(vec![InputEventType::Mouse], mouse_sender);
1306
1307        // Fake Handler (Specific Interest: Fake)
1308        let (fake_sender, mut fake_receiver) = futures::channel::mpsc::channel(1);
1309        let fake_handler =
1310            SpecificInterestFakeHandler::new(vec![InputEventType::Fake], fake_sender);
1311
1312        let (pipeline_sender, pipeline_receiver, handlers, _, _, _) =
1313            InputPipelineAssembly::new(metrics::MetricsLogger::default())
1314                .add_handler(mouse_handler)
1315                .add_handler(fake_handler)
1316                .into_components();
1317
1318        // Run the pipeline logic
1319        InputPipeline::run(pipeline_receiver, handlers);
1320
1321        // Create a Fake event
1322        let fake_event = input_device::InputEvent {
1323            device_event: input_device::InputDeviceEvent::Fake,
1324            device_descriptor: input_device::InputDeviceDescriptor::Fake,
1325            event_time: zx::MonotonicInstant::get(),
1326            handled: input_device::Handled::No,
1327            trace_id: None,
1328        };
1329
1330        // Send the Fake event
1331        pipeline_sender.unbounded_send(fake_event.clone()).expect("failed to send event");
1332
1333        // Verify Fake Handler received it
1334        let received_by_fake = fake_receiver.next().await;
1335        assert_eq!(received_by_fake, Some(fake_event));
1336
1337        // Verify Mouse Handler did NOT receive it
1338        assert!(mouse_receiver.try_next().is_err());
1339    }
1340}