input_pipeline/
input_pipeline.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::display_ownership::DisplayOwnership;
6use crate::focus_listener::FocusListener;
7use crate::input_handler::Handler;
8use crate::{input_device, input_handler, metrics};
9use anyhow::{Context, Error, format_err};
10use focus_chain_provider::FocusChainProviderPublisher;
11use fuchsia_fs::directory::{WatchEvent, Watcher};
12use fuchsia_inspect::NumericProperty;
13use fuchsia_inspect::health::Reporter;
14use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
15use futures::future::LocalBoxFuture;
16use futures::lock::Mutex;
17use futures::{StreamExt, TryStreamExt};
18use itertools::Itertools;
19use metrics_registry::*;
20use std::collections::HashMap;
21use std::path::PathBuf;
22use std::rc::Rc;
23use std::sync::atomic::{AtomicU32, Ordering};
24use std::sync::{Arc, LazyLock};
25use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
26
27/// Use a self incremental u32 unique id for device_id.
28///
29/// device id start from 10 to avoid conflict with default devices in Starnix.
30/// Currently, Starnix using 0 and 1 as default devices' id. Starnix need to
31/// use default devices to deliver events from physical devices until we have
32/// API to expose device changes to UI clients.
33static NEXT_DEVICE_ID: LazyLock<AtomicU32> = LazyLock::new(|| AtomicU32::new(10));
34
35/// Each time this function is invoked, it returns the current value of its
36/// internal counter (serving as a unique id for device_id) and then increments
37/// that counter in preparation for the next call.
38fn get_next_device_id() -> u32 {
39    NEXT_DEVICE_ID.fetch_add(1, Ordering::SeqCst)
40}
41
42type BoxedInputDeviceBinding = Box<dyn input_device::InputDeviceBinding>;
43
44/// An [`InputDeviceBindingHashMap`] maps an input device to one or more InputDeviceBindings.
45/// It uses unique device id as key.
46pub type InputDeviceBindingHashMap = Arc<Mutex<HashMap<u32, Vec<BoxedInputDeviceBinding>>>>;
47
48/// An input pipeline assembly.
49///
50/// Represents a partial stage of the input pipeline which accepts inputs through an asynchronous
51/// sender channel, and emits outputs through an asynchronous receiver channel.  Use [new] to
52/// create a new assembly.  Use [add_handler], or [add_all_handlers] to add the input pipeline
53/// handlers to use.  When done, [InputPipeline::new] can be used to make a new input pipeline.
54///
55/// # Implementation notes
56///
57/// Internally, when a new [InputPipelineAssembly] is created with multiple [InputHandler]s, the
58/// handlers are connected together using async queues.  This allows fully streamed processing of
59/// input events, and also allows some pipeline stages to generate events spontaneously, i.e.
60/// without an external stimulus.
61pub struct InputPipelineAssembly {
62    /// The top-level sender: send into this queue to inject an event into the input
63    /// pipeline.
64    sender: UnboundedSender<input_device::InputEvent>,
65    /// The bottom-level receiver: any events that fall through the entire pipeline can
66    /// be read from this receiver.
67    receiver: UnboundedReceiver<input_device::InputEvent>,
68
69    /// The input handlers that comprise the input pipeline.
70    handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>,
71
72    /// The display ownership watcher task.
73    display_ownership_fut: Option<LocalBoxFuture<'static, ()>>,
74
75    /// The focus listener task.
76    focus_listener_fut: Option<LocalBoxFuture<'static, ()>>,
77
78    /// The metrics logger.
79    metrics_logger: metrics::MetricsLogger,
80}
81
82impl InputPipelineAssembly {
83    /// Create a new but empty [InputPipelineAssembly]. Use [add_handler] or similar
84    /// to add new handlers to it.
85    pub fn new(metrics_logger: metrics::MetricsLogger) -> Self {
86        let (sender, receiver) = mpsc::unbounded();
87        InputPipelineAssembly {
88            sender,
89            receiver,
90            handlers: vec![],
91            metrics_logger,
92            display_ownership_fut: None,
93            focus_listener_fut: None,
94        }
95    }
96
97    /// Adds another [input_handler::BatchInputHandler] into the [InputPipelineAssembly]. The handlers
98    /// are invoked in the order they are added. Returns `Self` for chaining.
99    pub fn add_handler(mut self, handler: Rc<dyn input_handler::BatchInputHandler>) -> Self {
100        self.handlers.push(handler);
101        self
102    }
103
104    /// Adds all handlers into the assembly in the order they appear in `handlers`.
105    pub fn add_all_handlers(self, handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>) -> Self {
106        handlers.into_iter().fold(self, |assembly, handler| assembly.add_handler(handler))
107    }
108
109    pub fn add_display_ownership(
110        mut self,
111        display_ownership_event: zx::Event,
112        input_handlers_node: &fuchsia_inspect::Node,
113    ) -> InputPipelineAssembly {
114        let h = DisplayOwnership::new(display_ownership_event, input_handlers_node);
115        let metrics_logger_clone = self.metrics_logger.clone();
116        let h_clone = h.clone();
117        let sender_clone = self.sender.clone();
118        let display_ownership_fut = Box::pin(async move {
119            h_clone.clone().set_handler_healthy();
120            h_clone.clone()
121                .handle_ownership_change(sender_clone)
122                .await
123                .map_err(|e| {
124                    metrics_logger_clone.log_error(
125                        InputPipelineErrorMetricDimensionEvent::InputPipelineDisplayOwnershipIsNotSupposedToTerminate,
126                        std::format!(
127                            "display ownership is not supposed to terminate - this is likely a problem: {:?}", e));
128                        })
129                        .unwrap();
130            h_clone.set_handler_unhealthy("Receive loop terminated for handler: DisplayOwnership");
131        });
132        self.display_ownership_fut = Some(display_ownership_fut);
133        self.add_handler(h)
134    }
135
136    /// Deconstructs the assembly into constituent components, used when constructing
137    /// [InputPipeline].
138    ///
139    /// You should call [catch_unhandled] on the returned [async_channel::Receiver], and
140    /// [run] on the returned [fuchsia_async::Tasks] (or supply own equivalents).
141    fn into_components(
142        self,
143    ) -> (
144        UnboundedSender<input_device::InputEvent>,
145        UnboundedReceiver<input_device::InputEvent>,
146        Vec<Rc<dyn input_handler::BatchInputHandler>>,
147        metrics::MetricsLogger,
148        Option<LocalBoxFuture<'static, ()>>,
149        Option<LocalBoxFuture<'static, ()>>,
150    ) {
151        (
152            self.sender,
153            self.receiver,
154            self.handlers,
155            self.metrics_logger,
156            self.display_ownership_fut,
157            self.focus_listener_fut,
158        )
159    }
160
161    pub fn add_focus_listener(
162        mut self,
163        focus_chain_publisher: FocusChainProviderPublisher,
164    ) -> Self {
165        let metrics_logger_clone = self.metrics_logger.clone();
166        let focus_listener_fut = Box::pin(async move {
167            if let Ok(mut focus_listener) =
168                FocusListener::new(focus_chain_publisher, metrics_logger_clone).map_err(|e| {
169                    log::warn!(
170                        "could not create focus listener, focus will not be dispatched: {:?}",
171                        e
172                    )
173                })
174            {
175                // This will await indefinitely and process focus messages in a loop, unless there
176                // is a problem.
177                let _result = focus_listener
178                    .dispatch_focus_changes()
179                    .await
180                    .map(|_| {
181                        log::warn!("dispatch focus loop ended, focus will no longer be dispatched")
182                    })
183                    .map_err(|e| {
184                        panic!("could not dispatch focus changes, this is a fatal error: {:?}", e)
185                    });
186            }
187        });
188        self.focus_listener_fut = Some(focus_listener_fut);
189        self
190    }
191}
192
193/// An [`InputPipeline`] manages input devices and propagates input events through input handlers.
194///
195/// On creation, clients declare what types of input devices an [`InputPipeline`] manages. The
196/// [`InputPipeline`] will continuously detect new input devices of supported type(s).
197///
198/// # Example
199/// ```
200/// let ime_handler =
201///     ImeHandler::new(scene_manager.session.clone(), scene_manager.compositor_id).await?;
202/// let touch_handler = TouchHandler::new(
203///     scene_manager.session.clone(),
204///     scene_manager.compositor_id,
205///     scene_manager.display_size
206/// ).await?;
207///
208/// let assembly = InputPipelineAssembly::new()
209///     .add_handler(Box::new(ime_handler)),
210///     .add_handler(Box::new(touch_handler)),
211/// let input_pipeline = InputPipeline::new(
212///     vec![
213///         input_device::InputDeviceType::Touch,
214///         input_device::InputDeviceType::Keyboard,
215///     ],
216///     assembly,
217/// );
218/// input_pipeline.handle_input_events().await;
219/// ```
220pub struct InputPipeline {
221    /// The entry point into the input handler pipeline. Incoming input events should
222    /// be inserted into this async queue, and the input pipeline will ensure that they
223    /// are propagated through all the input handlers in the appropriate sequence.
224    pipeline_sender: UnboundedSender<input_device::InputEvent>,
225
226    /// A clone of this sender is given to every InputDeviceBinding that this pipeline owns.
227    /// Each InputDeviceBinding will send InputEvents to the pipeline through this channel.
228    device_event_sender: UnboundedSender<input_device::InputEvent>,
229
230    /// Receives InputEvents from all InputDeviceBindings that this pipeline owns.
231    device_event_receiver: UnboundedReceiver<input_device::InputEvent>,
232
233    /// The types of devices this pipeline supports.
234    input_device_types: Vec<input_device::InputDeviceType>,
235
236    /// The InputDeviceBindings bound to this pipeline.
237    input_device_bindings: InputDeviceBindingHashMap,
238
239    /// This node is bound to the lifetime of this InputPipeline.
240    /// Inspect data will be dumped for this pipeline as long as it exists.
241    inspect_node: fuchsia_inspect::Node,
242
243    /// The metrics logger.
244    metrics_logger: metrics::MetricsLogger,
245}
246
247impl InputPipeline {
248    fn new_common(
249        input_device_types: Vec<input_device::InputDeviceType>,
250        assembly: InputPipelineAssembly,
251        inspect_node: fuchsia_inspect::Node,
252    ) -> Self {
253        let (
254            pipeline_sender,
255            receiver,
256            handlers,
257            metrics_logger,
258            display_ownership_fut,
259            focus_listener_fut,
260        ) = assembly.into_components();
261
262        let mut handlers_count = handlers.len();
263        // TODO: b/469745447 - should use futures::select! instead of Task::local().detach().
264        if let Some(fut) = display_ownership_fut {
265            fasync::Task::local(fut).detach();
266            handlers_count += 1;
267        }
268
269        // TODO: b/469745447 - should use futures::select! instead of Task::local().detach().
270        if let Some(fut) = focus_listener_fut {
271            fasync::Task::local(fut).detach();
272            handlers_count += 1;
273        }
274
275        // Add properties to inspect node
276        inspect_node.record_string("supported_input_devices", input_device_types.iter().join(", "));
277        inspect_node.record_uint("handlers_registered", handlers_count as u64);
278        inspect_node.record_uint("handlers_healthy", handlers_count as u64);
279
280        // Initializes all handlers and starts the input pipeline loop.
281        InputPipeline::run(receiver, handlers);
282
283        let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
284        let input_device_bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
285        InputPipeline {
286            pipeline_sender,
287            device_event_sender,
288            device_event_receiver,
289            input_device_types,
290            input_device_bindings,
291            inspect_node,
292            metrics_logger,
293        }
294    }
295
296    /// Creates a new [`InputPipeline`] for integration testing.
297    /// Unlike a production input pipeline, this pipeline will not monitor
298    /// `/dev/class/input-report` for devices.
299    ///
300    /// # Parameters
301    /// - `input_device_types`: The types of devices the new [`InputPipeline`] will support.
302    /// - `assembly`: The input handlers that the [`InputPipeline`] sends InputEvents to.
303    pub fn new_for_test(
304        input_device_types: Vec<input_device::InputDeviceType>,
305        assembly: InputPipelineAssembly,
306    ) -> Self {
307        let inspector = fuchsia_inspect::Inspector::default();
308        let root = inspector.root();
309        let test_node = root.create_child("input_pipeline");
310        Self::new_common(input_device_types, assembly, test_node)
311    }
312
313    /// Creates a new [`InputPipeline`] for production use.
314    ///
315    /// # Parameters
316    /// - `input_device_types`: The types of devices the new [`InputPipeline`] will support.
317    /// - `assembly`: The input handlers that the [`InputPipeline`] sends InputEvents to.
318    /// - `inspect_node`: The root node for InputPipeline's Inspect tree
319    pub fn new(
320        input_device_types: Vec<input_device::InputDeviceType>,
321        assembly: InputPipelineAssembly,
322        inspect_node: fuchsia_inspect::Node,
323        metrics_logger: metrics::MetricsLogger,
324    ) -> Result<Self, Error> {
325        let input_pipeline = Self::new_common(input_device_types, assembly, inspect_node);
326        let input_device_types = input_pipeline.input_device_types.clone();
327        let input_event_sender = input_pipeline.device_event_sender.clone();
328        let input_device_bindings = input_pipeline.input_device_bindings.clone();
329        let devices_node = input_pipeline.inspect_node.create_child("input_devices");
330        fasync::Task::local(async move {
331            // Watches the input device directory for new input devices. Creates new InputDeviceBindings
332            // that send InputEvents to `input_event_receiver`.
333            match async {
334                let dir_proxy = fuchsia_fs::directory::open_in_namespace(
335                    input_device::INPUT_REPORT_PATH,
336                    fuchsia_fs::PERM_READABLE,
337                )
338                .with_context(|| format!("failed to open {}", input_device::INPUT_REPORT_PATH))?;
339                let device_watcher =
340                    Watcher::new(&dir_proxy).await.context("failed to create watcher")?;
341                Self::watch_for_devices(
342                    device_watcher,
343                    dir_proxy,
344                    input_device_types,
345                    input_event_sender,
346                    input_device_bindings,
347                    &devices_node,
348                    false, /* break_on_idle */
349                    metrics_logger.clone(),
350                )
351                .await
352                .context("failed to watch for devices")
353            }
354            .await
355            {
356                Ok(()) => {}
357                Err(err) => {
358                    // This error is usually benign in tests: it means that the setup does not
359                    // support dynamic device discovery. Almost no tests support dynamic
360                    // device discovery, and they also do not need those.
361                    metrics_logger.log_warn(
362                        InputPipelineErrorMetricDimensionEvent::InputPipelineUnableToWatchForNewInputDevices,
363                        std::format!(
364                            "Input pipeline is unable to watch for new input devices: {:?}",
365                            err
366                        ));
367                }
368            }
369        })
370        .detach();
371
372        Ok(input_pipeline)
373    }
374
375    /// Gets the input device bindings.
376    pub fn input_device_bindings(&self) -> &InputDeviceBindingHashMap {
377        &self.input_device_bindings
378    }
379
380    /// Gets the input device sender: this is the channel that should be cloned
381    /// and used for injecting events from the drivers into the input pipeline.
382    pub fn input_event_sender(&self) -> &UnboundedSender<input_device::InputEvent> {
383        &self.device_event_sender
384    }
385
386    /// Gets a list of input device types supported by this input pipeline.
387    pub fn input_device_types(&self) -> &Vec<input_device::InputDeviceType> {
388        &self.input_device_types
389    }
390
391    /// Forwards all input events into the input pipeline.
392    pub async fn handle_input_events(mut self) {
393        let metrics_logger_clone = self.metrics_logger.clone();
394        while let Some(input_event) = self.device_event_receiver.next().await {
395            if let Err(e) = self.pipeline_sender.unbounded_send(input_event) {
396                metrics_logger_clone.log_error(
397                    InputPipelineErrorMetricDimensionEvent::InputPipelineCouldNotForwardEventFromDriver,
398                    std::format!("could not forward event from driver: {:?}", &e));
399            }
400        }
401
402        metrics_logger_clone.log_error(
403            InputPipelineErrorMetricDimensionEvent::InputPipelineStopHandlingEvents,
404            "Input pipeline stopped handling input events.".to_string(),
405        );
406    }
407
408    /// Watches the input report directory for new input devices. Creates InputDeviceBindings
409    /// if new devices match a type in `device_types`.
410    ///
411    /// # Parameters
412    /// - `device_watcher`: Watches the input report directory for new devices.
413    /// - `dir_proxy`: The directory containing InputDevice connections.
414    /// - `device_types`: The types of devices to watch for.
415    /// - `input_event_sender`: The channel new InputDeviceBindings will send InputEvents to.
416    /// - `bindings`: Holds all the InputDeviceBindings
417    /// - `input_devices_node`: The parent node for all device bindings' inspect nodes.
418    /// - `break_on_idle`: If true, stops watching for devices once all existing devices are handled.
419    /// - `metrics_logger`: The metrics logger.
420    ///
421    /// # Errors
422    /// If the input report directory or a file within it cannot be read.
423    async fn watch_for_devices(
424        mut device_watcher: Watcher,
425        dir_proxy: fio::DirectoryProxy,
426        device_types: Vec<input_device::InputDeviceType>,
427        input_event_sender: UnboundedSender<input_device::InputEvent>,
428        bindings: InputDeviceBindingHashMap,
429        input_devices_node: &fuchsia_inspect::Node,
430        break_on_idle: bool,
431        metrics_logger: metrics::MetricsLogger,
432    ) -> Result<(), Error> {
433        // Add non-static properties to inspect node.
434        let devices_discovered = input_devices_node.create_uint("devices_discovered", 0);
435        let devices_connected = input_devices_node.create_uint("devices_connected", 0);
436        while let Some(msg) = device_watcher.try_next().await? {
437            if let Ok(filename) = msg.filename.into_os_string().into_string() {
438                if filename == "." {
439                    continue;
440                }
441
442                let pathbuf = PathBuf::from(filename.clone());
443                match msg.event {
444                    WatchEvent::EXISTING | WatchEvent::ADD_FILE => {
445                        log::info!("found input device {}", filename);
446                        devices_discovered.add(1);
447                        let device_proxy =
448                            input_device::get_device_from_dir_entry_path(&dir_proxy, &pathbuf)?;
449                        add_device_bindings(
450                            &device_types,
451                            &filename,
452                            device_proxy,
453                            &input_event_sender,
454                            &bindings,
455                            get_next_device_id(),
456                            input_devices_node,
457                            Some(&devices_connected),
458                            metrics_logger.clone(),
459                        )
460                        .await;
461                    }
462                    WatchEvent::IDLE => {
463                        if break_on_idle {
464                            break;
465                        }
466                    }
467                    _ => (),
468                }
469            }
470        }
471        // Ensure inspect properties persist for debugging if device watch loop ends.
472        input_devices_node.record(devices_discovered);
473        input_devices_node.record(devices_connected);
474        Err(format_err!("Input pipeline stopped watching for new input devices."))
475    }
476
477    /// Handles the incoming InputDeviceRegistryRequestStream.
478    ///
479    /// This method will end when the request stream is closed. If the stream closes with an
480    /// error the error will be returned in the Result.
481    ///
482    /// **NOTE**: Only one stream is handled at a time. https://fxbug.dev/42061078
483    ///
484    /// # Parameters
485    /// - `stream`: The stream of InputDeviceRegistryRequests.
486    /// - `device_types`: The types of devices to watch for.
487    /// - `input_event_sender`: The channel new InputDeviceBindings will send InputEvents to.
488    /// - `bindings`: Holds all the InputDeviceBindings associated with the InputPipeline.
489    /// - `input_devices_node`: The parent node for all injected devices' inspect nodes.
490    /// - `metrics_logger`: The metrics logger.
491    pub async fn handle_input_device_registry_request_stream(
492        mut stream: fidl_fuchsia_input_injection::InputDeviceRegistryRequestStream,
493        device_types: &Vec<input_device::InputDeviceType>,
494        input_event_sender: &UnboundedSender<input_device::InputEvent>,
495        bindings: &InputDeviceBindingHashMap,
496        input_devices_node: &fuchsia_inspect::Node,
497        metrics_logger: metrics::MetricsLogger,
498    ) -> Result<(), Error> {
499        while let Some(request) = stream
500            .try_next()
501            .await
502            .context("Error handling input device registry request stream")?
503        {
504            match request {
505                fidl_fuchsia_input_injection::InputDeviceRegistryRequest::Register {
506                    device,
507                    ..
508                } => {
509                    // Add a binding if the device is a type being tracked
510                    let device_proxy = device.into_proxy();
511
512                    let device_id = get_next_device_id();
513
514                    add_device_bindings(
515                        device_types,
516                        &format!("input-device-registry-{}", device_id),
517                        device_proxy,
518                        input_event_sender,
519                        bindings,
520                        device_id,
521                        input_devices_node,
522                        None,
523                        metrics_logger.clone(),
524                    )
525                    .await;
526                }
527                fidl_fuchsia_input_injection::InputDeviceRegistryRequest::RegisterAndGetDeviceInfo {
528                    device,
529                    responder,
530                    .. } => {
531                    // Add a binding if the device is a type being tracked
532                    let device_proxy = device.into_proxy();
533
534                    let device_id = get_next_device_id();
535
536                    add_device_bindings(
537                        device_types,
538                        &format!("input-device-registry-{}", device_id),
539                        device_proxy,
540                        input_event_sender,
541                        bindings,
542                        device_id,
543                        input_devices_node,
544                        None,
545                        metrics_logger.clone(),
546                    )
547                    .await;
548
549                    responder.send(fidl_fuchsia_input_injection::InputDeviceRegistryRegisterAndGetDeviceInfoResponse{
550                        device_id: Some(device_id),
551                        ..Default::default()
552                    }).expect("Failed to respond to RegisterAndGetDeviceInfo request");
553                }
554            }
555        }
556
557        Ok(())
558    }
559
560    /// Initializes all handlers and starts the input pipeline loop in an asynchronous executor.
561    fn run(
562        mut receiver: UnboundedReceiver<input_device::InputEvent>,
563        handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>,
564    ) {
565        fasync::Task::local(async move {
566            for handler in &handlers {
567                handler.clone().set_handler_healthy();
568            }
569
570            use input_device::InputEventType;
571            use std::collections::HashMap;
572
573            // Pre-compute handler lists for each event type.
574            let mut handlers_by_type: HashMap<
575                InputEventType,
576                Vec<Rc<dyn input_handler::BatchInputHandler>>,
577            > = HashMap::new();
578
579            // TODO: b/478262850 - We can use supported_input_devices to populate this list.
580            let event_types = vec![
581                InputEventType::Keyboard,
582                InputEventType::LightSensor,
583                InputEventType::ConsumerControls,
584                InputEventType::Mouse,
585                InputEventType::TouchScreen,
586                InputEventType::Touchpad,
587                #[cfg(test)]
588                InputEventType::Fake,
589            ];
590
591            for event_type in event_types {
592                let handlers_for_type: Vec<Rc<dyn input_handler::BatchInputHandler>> = handlers
593                    .iter()
594                    .filter(|h| h.interest().contains(&event_type))
595                    .cloned()
596                    .collect();
597                handlers_by_type.insert(event_type, handlers_for_type);
598            }
599
600            while let Some(event) = receiver.next().await {
601                let event_type = InputEventType::from(&event.device_event);
602
603                // Get pre-computed handlers for this event type.
604                let handlers = handlers_by_type.get(&event_type).unwrap();
605
606                let mut events = vec![event];
607                for handler in handlers {
608                    let handler_name = handler.get_name();
609                    events = {
610                        let _async_trace = fuchsia_trace::async_enter!(
611                            fuchsia_trace::Id::random(),
612                            "input",
613                            "handle_input_events",
614                            "name" => handler_name
615                        );
616                        handler.clone().handle_input_events(events).await
617                    };
618                }
619
620                for event in events {
621                    if event.handled == input_device::Handled::No {
622                        log::warn!("unhandled input event: {:?}", &event);
623                    }
624                }
625            }
626            for handler in &handlers {
627                handler.clone().set_handler_unhealthy("Pipeline loop terminated");
628            }
629            panic!("Runner task is not supposed to terminate.")
630        })
631        .detach();
632    }
633}
634
635/// Adds `InputDeviceBinding`s to `bindings` for all `device_types` exposed by `device_proxy`.
636///
637/// # Parameters
638/// - `device_types`: The types of devices to watch for.
639/// - `device_proxy`: A proxy to the input device.
640/// - `input_event_sender`: The channel new InputDeviceBindings will send InputEvents to.
641/// - `bindings`: Holds all the InputDeviceBindings associated with the InputPipeline.
642/// - `device_id`: The device id of the associated bindings.
643/// - `input_devices_node`: The parent node for all device bindings' inspect nodes.
644///
645/// # Note
646/// This will create multiple bindings, in the case where
647/// * `device_proxy().get_descriptor()` returns a `fidl_fuchsia_input_report::DeviceDescriptor`
648///   with multiple table fields populated, and
649/// * multiple populated table fields correspond to device types present in `device_types`
650///
651/// This is used, for example, to support the Atlas touchpad. In that case, a single
652/// node in `/dev/class/input-report` provides both a `fuchsia.input.report.MouseDescriptor` and
653/// a `fuchsia.input.report.TouchDescriptor`.
654async fn add_device_bindings(
655    device_types: &Vec<input_device::InputDeviceType>,
656    filename: &String,
657    device_proxy: fidl_fuchsia_input_report::InputDeviceProxy,
658    input_event_sender: &UnboundedSender<input_device::InputEvent>,
659    bindings: &InputDeviceBindingHashMap,
660    device_id: u32,
661    input_devices_node: &fuchsia_inspect::Node,
662    devices_connected: Option<&fuchsia_inspect::UintProperty>,
663    metrics_logger: metrics::MetricsLogger,
664) {
665    let mut matched_device_types = vec![];
666    if let Ok(descriptor) = device_proxy.get_descriptor().await {
667        for device_type in device_types {
668            if input_device::is_device_type(&descriptor, *device_type).await {
669                matched_device_types.push(device_type);
670                match devices_connected {
671                    Some(dev_connected) => {
672                        let _ = dev_connected.add(1);
673                    }
674                    None => (),
675                };
676            }
677        }
678        if matched_device_types.is_empty() {
679            log::info!(
680                "device {} did not match any supported device types: {:?}",
681                filename,
682                device_types
683            );
684            let device_node = input_devices_node.create_child(format!("{}_Unsupported", filename));
685            let mut health = fuchsia_inspect::health::Node::new(&device_node);
686            health.set_unhealthy("Unsupported device type.");
687            device_node.record(health);
688            input_devices_node.record(device_node);
689            return;
690        }
691    } else {
692        metrics_logger.clone().log_error(
693            InputPipelineErrorMetricDimensionEvent::InputPipelineNoDeviceDescriptor,
694            std::format!("cannot bind device {} without a device descriptor", filename),
695        );
696        return;
697    }
698
699    log::info!(
700        "binding {} to device types: {}",
701        filename,
702        matched_device_types
703            .iter()
704            .fold(String::new(), |device_types_string, device_type| device_types_string
705                + &format!("{:?}, ", device_type))
706    );
707
708    let mut new_bindings: Vec<BoxedInputDeviceBinding> = vec![];
709    for device_type in matched_device_types {
710        // Clone `device_proxy`, so that multiple bindings (e.g. a `MouseBinding` and a
711        // `TouchBinding`) can read data from the same `/dev/class/input-report` node.
712        //
713        // There's no conflict in having multiple bindings read from the same node,
714        // since:
715        // * each binding will create its own `fuchsia.input.report.InputReportsReader`, and
716        // * the device driver will copy each incoming report to each connected reader.
717        //
718        // This does mean that reports from the Atlas touchpad device get read twice
719        // (by a `MouseBinding` and a `TouchBinding`), regardless of whether the device
720        // is operating in mouse mode or touchpad mode.
721        //
722        // This hasn't been an issue because:
723        // * Semantically: things are fine, because each binding discards irrelevant reports.
724        //   (E.g. `MouseBinding` discards anything that isn't a `MouseInputReport`), and
725        // * Performance wise: things are fine, because the data rate of the touchpad is low
726        //   (125 HZ).
727        //
728        // If we add additional cases where bindings share an underlying `input-report` node,
729        // we might consider adding a multiplexing binding, to avoid reading duplicate reports.
730        let proxy = device_proxy.clone();
731        let device_node = input_devices_node.create_child(format!("{}_{}", filename, device_type));
732        match input_device::get_device_binding(
733            *device_type,
734            proxy,
735            device_id,
736            input_event_sender.clone(),
737            device_node,
738            metrics_logger.clone(),
739        )
740        .await
741        {
742            Ok(binding) => new_bindings.push(binding),
743            Err(e) => {
744                metrics_logger.log_error(
745                    InputPipelineErrorMetricDimensionEvent::InputPipelineFailedToBind,
746                    std::format!("failed to bind {} as {:?}: {}", filename, device_type, e),
747                );
748            }
749        }
750    }
751
752    if !new_bindings.is_empty() {
753        let mut bindings = bindings.lock().await;
754        bindings.entry(device_id).or_insert(Vec::new()).extend(new_bindings);
755    }
756}
757
758#[cfg(test)]
759mod tests {
760    use super::*;
761    use crate::input_device::{InputDeviceBinding, InputEventType};
762    use crate::utils::Position;
763    use crate::{
764        fake_input_device_binding, mouse_binding, mouse_model_database,
765        observe_fake_events_input_handler,
766    };
767    use async_trait::async_trait;
768    use diagnostics_assertions::AnyProperty;
769    use fidl::endpoints::{create_proxy_and_stream, create_request_stream};
770    use fuchsia_async as fasync;
771    use futures::FutureExt;
772    use pretty_assertions::assert_eq;
773    use rand::Rng;
774    use std::collections::HashSet;
775    use vfs::{pseudo_directory, service as pseudo_fs_service};
776
777    const COUNTS_PER_MM: u32 = 12;
778
779    /// Returns the InputEvent sent over `sender`.
780    ///
781    /// # Parameters
782    /// - `sender`: The channel to send the InputEvent over.
783    fn send_input_event(
784        sender: UnboundedSender<input_device::InputEvent>,
785    ) -> input_device::InputEvent {
786        let mut rng = rand::rng();
787        let offset =
788            Position { x: rng.random_range(0..10) as f32, y: rng.random_range(0..10) as f32 };
789        let input_event = input_device::InputEvent {
790            device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
791                mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
792                    millimeters: Position {
793                        x: offset.x / COUNTS_PER_MM as f32,
794                        y: offset.y / COUNTS_PER_MM as f32,
795                    },
796                }),
797                None, /* wheel_delta_v */
798                None, /* wheel_delta_h */
799                mouse_binding::MousePhase::Move,
800                HashSet::new(),
801                HashSet::new(),
802                None, /* is_precision_scroll */
803                None, /* wake_lease */
804            )),
805            device_descriptor: input_device::InputDeviceDescriptor::Mouse(
806                mouse_binding::MouseDeviceDescriptor {
807                    device_id: 1,
808                    absolute_x_range: None,
809                    absolute_y_range: None,
810                    wheel_v_range: None,
811                    wheel_h_range: None,
812                    buttons: None,
813                    counts_per_mm: COUNTS_PER_MM,
814                },
815            ),
816            event_time: zx::MonotonicInstant::get(),
817            handled: input_device::Handled::No,
818            trace_id: None,
819        };
820        match sender.unbounded_send(input_event.clone()) {
821            Err(_) => assert!(false),
822            _ => {}
823        }
824
825        input_event
826    }
827
828    /// Returns a MouseDescriptor on an InputDeviceRequest.
829    ///
830    /// # Parameters
831    /// - `input_device_request`: The request to handle.
832    fn handle_input_device_request(
833        input_device_request: fidl_fuchsia_input_report::InputDeviceRequest,
834    ) {
835        match input_device_request {
836            fidl_fuchsia_input_report::InputDeviceRequest::GetDescriptor { responder } => {
837                let _ = responder.send(&fidl_fuchsia_input_report::DeviceDescriptor {
838                    device_information: None,
839                    mouse: Some(fidl_fuchsia_input_report::MouseDescriptor {
840                        input: Some(fidl_fuchsia_input_report::MouseInputDescriptor {
841                            movement_x: None,
842                            movement_y: None,
843                            scroll_v: None,
844                            scroll_h: None,
845                            buttons: Some(vec![0]),
846                            position_x: None,
847                            position_y: None,
848                            ..Default::default()
849                        }),
850                        ..Default::default()
851                    }),
852                    sensor: None,
853                    touch: None,
854                    keyboard: None,
855                    consumer_control: None,
856                    ..Default::default()
857                });
858            }
859            _ => {}
860        }
861    }
862
863    /// Tests that an input pipeline handles events from multiple devices.
864    #[fasync::run_singlethreaded(test)]
865    async fn multiple_devices_single_handler() {
866        // Create two fake device bindings.
867        let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
868        let first_device_binding =
869            fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
870        let second_device_binding =
871            fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
872
873        // Create a fake input handler.
874        let (handler_event_sender, mut handler_event_receiver) =
875            futures::channel::mpsc::channel(100);
876        let input_handler = observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
877            handler_event_sender,
878        );
879
880        // Build the input pipeline.
881        let (sender, receiver, handlers, _, _, _) =
882            InputPipelineAssembly::new(metrics::MetricsLogger::default())
883                .add_handler(input_handler)
884                .into_components();
885        let inspector = fuchsia_inspect::Inspector::default();
886        let test_node = inspector.root().create_child("input_pipeline");
887        let input_pipeline = InputPipeline {
888            pipeline_sender: sender,
889            device_event_sender,
890            device_event_receiver,
891            input_device_types: vec![],
892            input_device_bindings: Arc::new(Mutex::new(HashMap::new())),
893            inspect_node: test_node,
894            metrics_logger: metrics::MetricsLogger::default(),
895        };
896        InputPipeline::run(receiver, handlers);
897
898        // Send an input event from each device.
899        let first_device_event = send_input_event(first_device_binding.input_event_sender());
900        let second_device_event = send_input_event(second_device_binding.input_event_sender());
901
902        // Run the pipeline.
903        fasync::Task::local(async {
904            input_pipeline.handle_input_events().await;
905        })
906        .detach();
907
908        // Assert the handler receives the events.
909        let first_handled_event = handler_event_receiver.next().await;
910        assert_eq!(first_handled_event, Some(first_device_event));
911
912        let second_handled_event = handler_event_receiver.next().await;
913        assert_eq!(second_handled_event, Some(second_device_event));
914    }
915
916    /// Tests that an input pipeline handles events through multiple input handlers.
917    #[fasync::run_singlethreaded(test)]
918    async fn single_device_multiple_handlers() {
919        // Create two fake device bindings.
920        let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
921        let input_device_binding =
922            fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
923
924        // Create two fake input handlers.
925        let (first_handler_event_sender, mut first_handler_event_receiver) =
926            futures::channel::mpsc::channel(100);
927        let first_input_handler =
928            observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
929                first_handler_event_sender,
930            );
931        let (second_handler_event_sender, mut second_handler_event_receiver) =
932            futures::channel::mpsc::channel(100);
933        let second_input_handler =
934            observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
935                second_handler_event_sender,
936            );
937
938        // Build the input pipeline.
939        let (sender, receiver, handlers, _, _, _) =
940            InputPipelineAssembly::new(metrics::MetricsLogger::default())
941                .add_handler(first_input_handler)
942                .add_handler(second_input_handler)
943                .into_components();
944        let inspector = fuchsia_inspect::Inspector::default();
945        let test_node = inspector.root().create_child("input_pipeline");
946        let input_pipeline = InputPipeline {
947            pipeline_sender: sender,
948            device_event_sender,
949            device_event_receiver,
950            input_device_types: vec![],
951            input_device_bindings: Arc::new(Mutex::new(HashMap::new())),
952            inspect_node: test_node,
953            metrics_logger: metrics::MetricsLogger::default(),
954        };
955        InputPipeline::run(receiver, handlers);
956
957        // Send an input event.
958        let input_event = send_input_event(input_device_binding.input_event_sender());
959
960        // Run the pipeline.
961        fasync::Task::local(async {
962            input_pipeline.handle_input_events().await;
963        })
964        .detach();
965
966        // Assert both handlers receive the event.
967        let first_handler_event = first_handler_event_receiver.next().await;
968        assert_eq!(first_handler_event, Some(input_event.clone()));
969        let second_handler_event = second_handler_event_receiver.next().await;
970        assert_eq!(second_handler_event, Some(input_event));
971    }
972
973    /// Tests that a single mouse device binding is created for the one input device in the
974    /// input report directory.
975    #[fasync::run_singlethreaded(test)]
976    async fn watch_devices_one_match_exists() {
977        // Create a file in a pseudo directory that represents an input device.
978        let mut count: i8 = 0;
979        let dir = pseudo_directory! {
980            "file_name" => pseudo_fs_service::host(
981                move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
982                    async move {
983                        while count < 3 {
984                            if let Some(input_device_request) =
985                                request_stream.try_next().await.unwrap()
986                            {
987                                handle_input_device_request(input_device_request);
988                                count += 1;
989                            }
990                        }
991
992                    }.boxed()
993                },
994            )
995        };
996
997        // Create a Watcher on the pseudo directory.
998        let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
999        let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1000        // Get a proxy to the pseudo directory for the input pipeline. The input pipeline uses this
1001        // proxy to get connections to input devices.
1002        let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1003
1004        let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1005        let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1006        let supported_device_types = vec![input_device::InputDeviceType::Mouse];
1007
1008        let inspector = fuchsia_inspect::Inspector::default();
1009        let test_node = inspector.root().create_child("input_pipeline");
1010        test_node.record_string(
1011            "supported_input_devices",
1012            supported_device_types.clone().iter().join(", "),
1013        );
1014        let input_devices = test_node.create_child("input_devices");
1015        // Assert that inspect tree is initialized with no devices.
1016        diagnostics_assertions::assert_data_tree!(inspector, root: {
1017            input_pipeline: {
1018                supported_input_devices: "Mouse",
1019                input_devices: {}
1020            }
1021        });
1022
1023        let _ = InputPipeline::watch_for_devices(
1024            device_watcher,
1025            dir_proxy_for_pipeline,
1026            supported_device_types,
1027            input_event_sender,
1028            bindings.clone(),
1029            &input_devices,
1030            true, /* break_on_idle */
1031            metrics::MetricsLogger::default(),
1032        )
1033        .await;
1034
1035        // Assert that one mouse device with accurate device id was found.
1036        let bindings_hashmap = bindings.lock().await;
1037        assert_eq!(bindings_hashmap.len(), 1);
1038        let bindings_vector = bindings_hashmap.get(&10);
1039        assert!(bindings_vector.is_some());
1040        assert_eq!(bindings_vector.unwrap().len(), 1);
1041        let boxed_mouse_binding = bindings_vector.unwrap().get(0);
1042        assert!(boxed_mouse_binding.is_some());
1043        assert_eq!(
1044            boxed_mouse_binding.unwrap().get_device_descriptor(),
1045            input_device::InputDeviceDescriptor::Mouse(mouse_binding::MouseDeviceDescriptor {
1046                device_id: 10,
1047                absolute_x_range: None,
1048                absolute_y_range: None,
1049                wheel_v_range: None,
1050                wheel_h_range: None,
1051                buttons: Some(vec![0]),
1052                counts_per_mm: mouse_model_database::db::DEFAULT_COUNTS_PER_MM,
1053            })
1054        );
1055
1056        // Assert that inspect tree reflects new device discovered and connected.
1057        diagnostics_assertions::assert_data_tree!(inspector, root: {
1058            input_pipeline: {
1059                supported_input_devices: "Mouse",
1060                input_devices: {
1061                    devices_discovered: 1u64,
1062                    devices_connected: 1u64,
1063                    "file_name_Mouse": contains {
1064                        reports_received_count: 0u64,
1065                        reports_filtered_count: 0u64,
1066                        events_generated: 0u64,
1067                        last_received_timestamp_ns: 0u64,
1068                        last_generated_timestamp_ns: 0u64,
1069                        "fuchsia.inspect.Health": {
1070                            status: "OK",
1071                            // Timestamp value is unpredictable and not relevant in this context,
1072                            // so we only assert that the property is present.
1073                            start_timestamp_nanos: AnyProperty
1074                        },
1075                    }
1076                }
1077            }
1078        });
1079    }
1080
1081    /// Tests that no device bindings are created because the input pipeline looks for keyboard devices
1082    /// but only a mouse exists.
1083    #[fasync::run_singlethreaded(test)]
1084    async fn watch_devices_no_matches_exist() {
1085        // Create a file in a pseudo directory that represents an input device.
1086        let mut count: i8 = 0;
1087        let dir = pseudo_directory! {
1088            "file_name" => pseudo_fs_service::host(
1089                move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1090                    async move {
1091                        while count < 1 {
1092                            if let Some(input_device_request) =
1093                                request_stream.try_next().await.unwrap()
1094                            {
1095                                handle_input_device_request(input_device_request);
1096                                count += 1;
1097                            }
1098                        }
1099
1100                    }.boxed()
1101                },
1102            )
1103        };
1104
1105        // Create a Watcher on the pseudo directory.
1106        let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1107        let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1108        // Get a proxy to the pseudo directory for the input pipeline. The input pipeline uses this
1109        // proxy to get connections to input devices.
1110        let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1111
1112        let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1113        let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1114        let supported_device_types = vec![input_device::InputDeviceType::Keyboard];
1115
1116        let inspector = fuchsia_inspect::Inspector::default();
1117        let test_node = inspector.root().create_child("input_pipeline");
1118        test_node.record_string(
1119            "supported_input_devices",
1120            supported_device_types.clone().iter().join(", "),
1121        );
1122        let input_devices = test_node.create_child("input_devices");
1123        // Assert that inspect tree is initialized with no devices.
1124        diagnostics_assertions::assert_data_tree!(inspector, root: {
1125            input_pipeline: {
1126                supported_input_devices: "Keyboard",
1127                input_devices: {}
1128            }
1129        });
1130
1131        let _ = InputPipeline::watch_for_devices(
1132            device_watcher,
1133            dir_proxy_for_pipeline,
1134            supported_device_types,
1135            input_event_sender,
1136            bindings.clone(),
1137            &input_devices,
1138            true, /* break_on_idle */
1139            metrics::MetricsLogger::default(),
1140        )
1141        .await;
1142
1143        // Assert that no devices were found.
1144        let bindings = bindings.lock().await;
1145        assert_eq!(bindings.len(), 0);
1146
1147        // Assert that inspect tree reflects new device discovered, but not connected.
1148        diagnostics_assertions::assert_data_tree!(inspector, root: {
1149            input_pipeline: {
1150                supported_input_devices: "Keyboard",
1151                input_devices: {
1152                    devices_discovered: 1u64,
1153                    devices_connected: 0u64,
1154                    "file_name_Unsupported": {
1155                        "fuchsia.inspect.Health": {
1156                            status: "UNHEALTHY",
1157                            message: "Unsupported device type.",
1158                            // Timestamp value is unpredictable and not relevant in this context,
1159                            // so we only assert that the property is present.
1160                            start_timestamp_nanos: AnyProperty
1161                        },
1162                    }
1163                }
1164            }
1165        });
1166    }
1167
1168    /// Tests that a single keyboard device binding is created for the input device registered
1169    /// through InputDeviceRegistry.
1170    #[fasync::run_singlethreaded(test)]
1171    async fn handle_input_device_registry_request_stream() {
1172        let (input_device_registry_proxy, input_device_registry_request_stream) =
1173            create_proxy_and_stream::<fidl_fuchsia_input_injection::InputDeviceRegistryMarker>();
1174        let (input_device_client_end, mut input_device_request_stream) =
1175            create_request_stream::<fidl_fuchsia_input_report::InputDeviceMarker>();
1176
1177        let device_types = vec![input_device::InputDeviceType::Mouse];
1178        let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1179        let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1180
1181        // Handle input device requests.
1182        let mut count: i8 = 0;
1183        fasync::Task::local(async move {
1184            // Register a device.
1185            let _ = input_device_registry_proxy.register(input_device_client_end);
1186
1187            while count < 3 {
1188                if let Some(input_device_request) =
1189                    input_device_request_stream.try_next().await.unwrap()
1190                {
1191                    handle_input_device_request(input_device_request);
1192                    count += 1;
1193                }
1194            }
1195
1196            // End handle_input_device_registry_request_stream() by taking the event stream.
1197            input_device_registry_proxy.take_event_stream();
1198        })
1199        .detach();
1200
1201        let inspector = fuchsia_inspect::Inspector::default();
1202        let test_node = inspector.root().create_child("input_pipeline");
1203
1204        // Start listening for InputDeviceRegistryRequests.
1205        let bindings_clone = bindings.clone();
1206        let _ = InputPipeline::handle_input_device_registry_request_stream(
1207            input_device_registry_request_stream,
1208            &device_types,
1209            &input_event_sender,
1210            &bindings_clone,
1211            &test_node,
1212            metrics::MetricsLogger::default(),
1213        )
1214        .await;
1215
1216        // Assert that a device was registered.
1217        let bindings = bindings.lock().await;
1218        assert_eq!(bindings.len(), 1);
1219    }
1220
1221    // Tests that correct properties are added to inspect node when InputPipeline is created.
1222    #[fasync::run_singlethreaded(test)]
1223    async fn check_inspect_node_has_correct_properties() {
1224        let device_types = vec![
1225            input_device::InputDeviceType::Touch,
1226            input_device::InputDeviceType::ConsumerControls,
1227        ];
1228        let inspector = fuchsia_inspect::Inspector::default();
1229        let test_node = inspector.root().create_child("input_pipeline");
1230        // Create fake input handler for assembly
1231        let (fake_handler_event_sender, _fake_handler_event_receiver) =
1232            futures::channel::mpsc::channel(100);
1233        let fake_input_handler =
1234            observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
1235                fake_handler_event_sender,
1236            );
1237        let assembly = InputPipelineAssembly::new(metrics::MetricsLogger::default())
1238            .add_handler(fake_input_handler);
1239        let _test_input_pipeline = InputPipeline::new(
1240            device_types,
1241            assembly,
1242            test_node,
1243            metrics::MetricsLogger::default(),
1244        );
1245        diagnostics_assertions::assert_data_tree!(inspector, root: {
1246            input_pipeline: {
1247                supported_input_devices: "Touch, ConsumerControls",
1248                handlers_registered: 1u64,
1249                handlers_healthy: 1u64,
1250                input_devices: {}
1251            }
1252        });
1253    }
1254
1255    struct SpecificInterestFakeHandler {
1256        interest_types: Vec<input_device::InputEventType>,
1257        event_sender: std::cell::RefCell<futures::channel::mpsc::Sender<input_device::InputEvent>>,
1258    }
1259
1260    impl SpecificInterestFakeHandler {
1261        pub fn new(
1262            interest_types: Vec<input_device::InputEventType>,
1263            event_sender: futures::channel::mpsc::Sender<input_device::InputEvent>,
1264        ) -> Rc<Self> {
1265            Rc::new(SpecificInterestFakeHandler {
1266                interest_types,
1267                event_sender: std::cell::RefCell::new(event_sender),
1268            })
1269        }
1270    }
1271
1272    impl Handler for SpecificInterestFakeHandler {
1273        fn set_handler_healthy(self: std::rc::Rc<Self>) {}
1274        fn set_handler_unhealthy(self: std::rc::Rc<Self>, _msg: &str) {}
1275        fn get_name(&self) -> &'static str {
1276            "SpecificInterestFakeHandler"
1277        }
1278
1279        fn interest(&self) -> Vec<input_device::InputEventType> {
1280            self.interest_types.clone()
1281        }
1282    }
1283
1284    #[async_trait(?Send)]
1285    impl input_handler::InputHandler for SpecificInterestFakeHandler {
1286        async fn handle_input_event(
1287            self: Rc<Self>,
1288            input_event: input_device::InputEvent,
1289        ) -> Vec<input_device::InputEvent> {
1290            match self.event_sender.borrow_mut().try_send(input_event.clone()) {
1291                Err(e) => panic!("SpecificInterestFakeHandler failed to send event: {:?}", e),
1292                Ok(_) => {}
1293            }
1294            vec![input_event]
1295        }
1296    }
1297
1298    #[fasync::run_singlethreaded(test)]
1299    async fn run_only_sends_events_to_interested_handlers() {
1300        // Mouse Handler (Specific Interest: Mouse)
1301        let (mouse_sender, mut mouse_receiver) = futures::channel::mpsc::channel(1);
1302        let mouse_handler =
1303            SpecificInterestFakeHandler::new(vec![InputEventType::Mouse], mouse_sender);
1304
1305        // Fake Handler (Specific Interest: Fake)
1306        let (fake_sender, mut fake_receiver) = futures::channel::mpsc::channel(1);
1307        let fake_handler =
1308            SpecificInterestFakeHandler::new(vec![InputEventType::Fake], fake_sender);
1309
1310        let (pipeline_sender, pipeline_receiver, handlers, _, _, _) =
1311            InputPipelineAssembly::new(metrics::MetricsLogger::default())
1312                .add_handler(mouse_handler)
1313                .add_handler(fake_handler)
1314                .into_components();
1315
1316        // Run the pipeline logic
1317        InputPipeline::run(pipeline_receiver, handlers);
1318
1319        // Create a Fake event
1320        let fake_event = input_device::InputEvent {
1321            device_event: input_device::InputDeviceEvent::Fake,
1322            device_descriptor: input_device::InputDeviceDescriptor::Fake,
1323            event_time: zx::MonotonicInstant::get(),
1324            handled: input_device::Handled::No,
1325            trace_id: None,
1326        };
1327
1328        // Send the Fake event
1329        pipeline_sender.unbounded_send(fake_event.clone()).expect("failed to send event");
1330
1331        // Verify Fake Handler received it
1332        let received_by_fake = fake_receiver.next().await;
1333        assert_eq!(received_by_fake, Some(fake_event));
1334
1335        // Verify Mouse Handler did NOT receive it
1336        assert!(mouse_receiver.try_next().is_err());
1337    }
1338}