Skip to main content

input_pipeline/
input_pipeline.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::display_ownership::DisplayOwnership;
6use crate::focus_listener::FocusListener;
7use crate::input_device::InputPipelineFeatureFlags;
8use crate::input_handler::Handler;
9use crate::{input_device, input_handler, metrics};
10use anyhow::{Context, Error, format_err};
11use focus_chain_provider::FocusChainProviderPublisher;
12use fuchsia_fs::directory::{WatchEvent, Watcher};
13use fuchsia_inspect::NumericProperty;
14use fuchsia_inspect::health::Reporter;
15use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
16use futures::future::LocalBoxFuture;
17use futures::lock::Mutex;
18use futures::{StreamExt, TryStreamExt};
19use itertools::Itertools;
20use metrics_registry::*;
21use std::collections::HashMap;
22use std::path::PathBuf;
23use std::rc::Rc;
24use std::sync::atomic::{AtomicU32, Ordering};
25use std::sync::{Arc, LazyLock};
26use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
27
28/// Use a self incremental u32 unique id for device_id.
29///
30/// device id start from 10 to avoid conflict with default devices in Starnix.
31/// Currently, Starnix using 0 and 1 as default devices' id. Starnix need to
32/// use default devices to deliver events from physical devices until we have
33/// API to expose device changes to UI clients.
34static NEXT_DEVICE_ID: LazyLock<AtomicU32> = LazyLock::new(|| AtomicU32::new(10));
35
36/// Each time this function is invoked, it returns the current value of its
37/// internal counter (serving as a unique id for device_id) and then increments
38/// that counter in preparation for the next call.
39fn get_next_device_id() -> u32 {
40    NEXT_DEVICE_ID.fetch_add(1, Ordering::SeqCst)
41}
42
43type BoxedInputDeviceBinding = Box<dyn input_device::InputDeviceBinding>;
44
45/// An [`InputDeviceBindingHashMap`] maps an input device to one or more InputDeviceBindings.
46/// It uses unique device id as key.
47pub type InputDeviceBindingHashMap = Arc<Mutex<HashMap<u32, Vec<BoxedInputDeviceBinding>>>>;
48
49/// An input pipeline assembly.
50///
51/// Represents a partial stage of the input pipeline which accepts inputs through an asynchronous
52/// sender channel, and emits outputs through an asynchronous receiver channel.  Use [new] to
53/// create a new assembly.  Use [add_handler], or [add_all_handlers] to add the input pipeline
54/// handlers to use.  When done, [InputPipeline::new] can be used to make a new input pipeline.
55///
56/// # Implementation notes
57///
58/// Internally, when a new [InputPipelineAssembly] is created with multiple [InputHandler]s, the
59/// handlers are connected together using async queues.  This allows fully streamed processing of
60/// input events, and also allows some pipeline stages to generate events spontaneously, i.e.
61/// without an external stimulus.
62pub struct InputPipelineAssembly {
63    /// The top-level sender: send into this queue to inject an event into the input
64    /// pipeline.
65    sender: UnboundedSender<Vec<input_device::InputEvent>>,
66    /// The bottom-level receiver: any events that fall through the entire pipeline can
67    /// be read from this receiver.
68    receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
69
70    /// The input handlers that comprise the input pipeline.
71    handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>,
72
73    /// The display ownership watcher task.
74    display_ownership_fut: Option<LocalBoxFuture<'static, ()>>,
75
76    /// The focus listener task.
77    focus_listener_fut: Option<LocalBoxFuture<'static, ()>>,
78
79    /// The metrics logger.
80    metrics_logger: metrics::MetricsLogger,
81}
82
83impl InputPipelineAssembly {
84    /// Create a new but empty [InputPipelineAssembly]. Use [add_handler] or similar
85    /// to add new handlers to it.
86    pub fn new(metrics_logger: metrics::MetricsLogger) -> Self {
87        let (sender, receiver) = mpsc::unbounded();
88        InputPipelineAssembly {
89            sender,
90            receiver,
91            handlers: vec![],
92            metrics_logger,
93            display_ownership_fut: None,
94            focus_listener_fut: None,
95        }
96    }
97
98    /// Adds another [input_handler::BatchInputHandler] into the [InputPipelineAssembly]. The handlers
99    /// are invoked in the order they are added. Returns `Self` for chaining.
100    pub fn add_handler(mut self, handler: Rc<dyn input_handler::BatchInputHandler>) -> Self {
101        self.handlers.push(handler);
102        self
103    }
104
105    /// Adds all handlers into the assembly in the order they appear in `handlers`.
106    pub fn add_all_handlers(self, handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>) -> Self {
107        handlers.into_iter().fold(self, |assembly, handler| assembly.add_handler(handler))
108    }
109
110    pub fn add_display_ownership(
111        mut self,
112        display_ownership_event: zx::Event,
113        input_handlers_node: &fuchsia_inspect::Node,
114    ) -> InputPipelineAssembly {
115        let h = DisplayOwnership::new(
116            display_ownership_event,
117            input_handlers_node,
118            self.metrics_logger.clone(),
119        );
120        let metrics_logger_clone = self.metrics_logger.clone();
121        let h_clone = h.clone();
122        let sender_clone = self.sender.clone();
123        let display_ownership_fut = Box::pin(async move {
124            h_clone.clone().set_handler_healthy();
125            h_clone.clone()
126                .handle_ownership_change(sender_clone)
127                .await
128                .map_err(|e| {
129                    metrics_logger_clone.log_error(
130                        InputPipelineErrorMetricDimensionEvent::InputPipelineDisplayOwnershipIsNotSupposedToTerminate,
131                        std::format!(
132                            "display ownership is not supposed to terminate - this is likely a problem: {:?}", e));
133                        })
134                        .unwrap();
135            h_clone.set_handler_unhealthy("Receive loop terminated for handler: DisplayOwnership");
136        });
137        self.display_ownership_fut = Some(display_ownership_fut);
138        self.add_handler(h)
139    }
140
141    /// Deconstructs the assembly into constituent components, used when constructing
142    /// [InputPipeline].
143    ///
144    /// You should call [catch_unhandled] on the returned [async_channel::Receiver], and
145    /// [run] on the returned [fuchsia_async::Tasks] (or supply own equivalents).
146    fn into_components(
147        self,
148    ) -> (
149        UnboundedSender<Vec<input_device::InputEvent>>,
150        UnboundedReceiver<Vec<input_device::InputEvent>>,
151        Vec<Rc<dyn input_handler::BatchInputHandler>>,
152        metrics::MetricsLogger,
153        Option<LocalBoxFuture<'static, ()>>,
154        Option<LocalBoxFuture<'static, ()>>,
155    ) {
156        (
157            self.sender,
158            self.receiver,
159            self.handlers,
160            self.metrics_logger,
161            self.display_ownership_fut,
162            self.focus_listener_fut,
163        )
164    }
165
166    pub fn add_focus_listener(
167        mut self,
168        focus_chain_publisher: FocusChainProviderPublisher,
169    ) -> Self {
170        let metrics_logger_clone = self.metrics_logger.clone();
171        let focus_listener_fut = Box::pin(async move {
172            if let Ok(mut focus_listener) =
173                FocusListener::new(focus_chain_publisher, metrics_logger_clone).map_err(|e| {
174                    log::warn!(
175                        "could not create focus listener, focus will not be dispatched: {:?}",
176                        e
177                    )
178                })
179            {
180                // This will await indefinitely and process focus messages in a loop, unless there
181                // is a problem.
182                let _result = focus_listener
183                    .dispatch_focus_changes()
184                    .await
185                    .map(|_| {
186                        log::warn!("dispatch focus loop ended, focus will no longer be dispatched")
187                    })
188                    .map_err(|e| {
189                        panic!("could not dispatch focus changes, this is a fatal error: {:?}", e)
190                    });
191            }
192        });
193        self.focus_listener_fut = Some(focus_listener_fut);
194        self
195    }
196}
197
198/// An [`InputPipeline`] manages input devices and propagates input events through input handlers.
199///
200/// On creation, clients declare what types of input devices an [`InputPipeline`] manages. The
201/// [`InputPipeline`] will continuously detect new input devices of supported type(s).
202///
203/// # Example
204/// ```
205/// let ime_handler =
206///     ImeHandler::new(scene_manager.session.clone(), scene_manager.compositor_id).await?;
207/// let touch_handler = TouchHandler::new(
208///     scene_manager.session.clone(),
209///     scene_manager.compositor_id,
210///     scene_manager.display_size
211/// ).await?;
212///
213/// let assembly = InputPipelineAssembly::new()
214///     .add_handler(Box::new(ime_handler)),
215///     .add_handler(Box::new(touch_handler)),
216/// let input_pipeline = InputPipeline::new(
217///     vec![
218///         input_device::InputDeviceType::Touch,
219///         input_device::InputDeviceType::Keyboard,
220///     ],
221///     assembly,
222/// );
223/// input_pipeline.handle_input_events().await;
224/// ```
225pub struct InputPipeline {
226    /// The entry point into the input handler pipeline. Incoming input events should
227    /// be inserted into this async queue, and the input pipeline will ensure that they
228    /// are propagated through all the input handlers in the appropriate sequence.
229    pipeline_sender: UnboundedSender<Vec<input_device::InputEvent>>,
230
231    /// A clone of this sender is given to every InputDeviceBinding that this pipeline owns.
232    /// Each InputDeviceBinding will send InputEvents to the pipeline through this channel.
233    device_event_sender: UnboundedSender<Vec<input_device::InputEvent>>,
234
235    /// Receives InputEvents from all InputDeviceBindings that this pipeline owns.
236    device_event_receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
237
238    /// The types of devices this pipeline supports.
239    input_device_types: Vec<input_device::InputDeviceType>,
240
241    /// The InputDeviceBindings bound to this pipeline.
242    input_device_bindings: InputDeviceBindingHashMap,
243
244    /// This node is bound to the lifetime of this InputPipeline.
245    /// Inspect data will be dumped for this pipeline as long as it exists.
246    inspect_node: fuchsia_inspect::Node,
247
248    /// The metrics logger.
249    metrics_logger: metrics::MetricsLogger,
250
251    /// The feature flags for the input pipeline.
252    pub feature_flags: input_device::InputPipelineFeatureFlags,
253}
254
255impl InputPipeline {
256    fn new_common(
257        input_device_types: Vec<input_device::InputDeviceType>,
258        assembly: InputPipelineAssembly,
259        inspect_node: fuchsia_inspect::Node,
260        feature_flags: input_device::InputPipelineFeatureFlags,
261    ) -> Self {
262        let (
263            pipeline_sender,
264            receiver,
265            handlers,
266            metrics_logger,
267            display_ownership_fut,
268            focus_listener_fut,
269        ) = assembly.into_components();
270
271        let mut handlers_count = handlers.len();
272        // TODO: b/469745447 - should use futures::select! instead of Task::local().detach().
273        if let Some(fut) = display_ownership_fut {
274            fasync::Task::local(fut).detach();
275            handlers_count += 1;
276        }
277
278        // TODO: b/469745447 - should use futures::select! instead of Task::local().detach().
279        if let Some(fut) = focus_listener_fut {
280            fasync::Task::local(fut).detach();
281            handlers_count += 1;
282        }
283
284        // Add properties to inspect node
285        inspect_node.record_string("supported_input_devices", input_device_types.iter().join(", "));
286        inspect_node.record_uint("handlers_registered", handlers_count as u64);
287        inspect_node.record_uint("handlers_healthy", handlers_count as u64);
288
289        // Initializes all handlers and starts the input pipeline loop.
290        InputPipeline::run(receiver, handlers, metrics_logger.clone());
291
292        let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
293        let input_device_bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
294        InputPipeline {
295            pipeline_sender,
296            device_event_sender,
297            device_event_receiver,
298            input_device_types,
299            input_device_bindings,
300            inspect_node,
301            metrics_logger,
302            feature_flags,
303        }
304    }
305
306    /// Creates a new [`InputPipeline`] for integration testing.
307    /// Unlike a production input pipeline, this pipeline will not monitor
308    /// `/dev/class/input-report` for devices.
309    ///
310    /// # Parameters
311    /// - `input_device_types`: The types of devices the new [`InputPipeline`] will support.
312    /// - `assembly`: The input handlers that the [`InputPipeline`] sends InputEvents to.
313    pub fn new_for_test(
314        input_device_types: Vec<input_device::InputDeviceType>,
315        assembly: InputPipelineAssembly,
316    ) -> Self {
317        let inspector = fuchsia_inspect::Inspector::default();
318        let root = inspector.root();
319        let test_node = root.create_child("input_pipeline");
320        Self::new_common(
321            input_device_types,
322            assembly,
323            test_node,
324            input_device::InputPipelineFeatureFlags { enable_merge_touch_events: false },
325        )
326    }
327
328    /// Creates a new [`InputPipeline`] for production use.
329    ///
330    /// # Parameters
331    /// - `input_device_types`: The types of devices the new [`InputPipeline`] will support.
332    /// - `assembly`: The input handlers that the [`InputPipeline`] sends InputEvents to.
333    /// - `inspect_node`: The root node for InputPipeline's Inspect tree
334    pub fn new(
335        input_device_types: Vec<input_device::InputDeviceType>,
336        assembly: InputPipelineAssembly,
337        inspect_node: fuchsia_inspect::Node,
338        feature_flags: input_device::InputPipelineFeatureFlags,
339        metrics_logger: metrics::MetricsLogger,
340    ) -> Result<Self, Error> {
341        let input_pipeline =
342            Self::new_common(input_device_types, assembly, inspect_node, feature_flags);
343        let input_device_types = input_pipeline.input_device_types.clone();
344        let input_event_sender = input_pipeline.device_event_sender.clone();
345        let input_device_bindings = input_pipeline.input_device_bindings.clone();
346        let devices_node = input_pipeline.inspect_node.create_child("input_devices");
347        let feature_flags = input_pipeline.feature_flags.clone();
348        fasync::Task::local(async move {
349            // Watches the input device directory for new input devices. Creates new InputDeviceBindings
350            // that send InputEvents to `input_event_receiver`.
351            match async {
352                let dir_proxy = fuchsia_fs::directory::open_in_namespace(
353                    input_device::INPUT_REPORT_PATH,
354                    fuchsia_fs::PERM_READABLE,
355                )
356                .with_context(|| format!("failed to open {}", input_device::INPUT_REPORT_PATH))?;
357                let device_watcher =
358                    Watcher::new(&dir_proxy).await.context("failed to create watcher")?;
359                Self::watch_for_devices(
360                    device_watcher,
361                    dir_proxy,
362                    input_device_types,
363                    input_event_sender,
364                    input_device_bindings,
365                    &devices_node,
366                    false, /* break_on_idle */
367                    feature_flags,
368                    metrics_logger.clone(),
369                )
370                .await
371                .context("failed to watch for devices")
372            }
373            .await
374            {
375                Ok(()) => {}
376                Err(err) => {
377                    // This error is usually benign in tests: it means that the setup does not
378                    // support dynamic device discovery. Almost no tests support dynamic
379                    // device discovery, and they also do not need those.
380                    metrics_logger.log_warn(
381                        InputPipelineErrorMetricDimensionEvent::InputPipelineUnableToWatchForNewInputDevices,
382                        std::format!(
383                            "Input pipeline is unable to watch for new input devices: {:?}",
384                            err
385                        ));
386                }
387            }
388        })
389        .detach();
390
391        Ok(input_pipeline)
392    }
393
394    /// Gets the input device bindings.
395    pub fn input_device_bindings(&self) -> &InputDeviceBindingHashMap {
396        &self.input_device_bindings
397    }
398
399    /// Gets the input device sender: this is the channel that should be cloned
400    /// and used for injecting events from the drivers into the input pipeline.
401    pub fn input_event_sender(&self) -> &UnboundedSender<Vec<input_device::InputEvent>> {
402        &self.device_event_sender
403    }
404
405    /// Gets a list of input device types supported by this input pipeline.
406    pub fn input_device_types(&self) -> &Vec<input_device::InputDeviceType> {
407        &self.input_device_types
408    }
409
410    /// Forwards all input events into the input pipeline.
411    pub async fn handle_input_events(mut self) {
412        let metrics_logger_clone = self.metrics_logger.clone();
413        while let Some(input_event) = self.device_event_receiver.next().await {
414            if let Err(e) = self.pipeline_sender.unbounded_send(input_event) {
415                metrics_logger_clone.log_error(
416                    InputPipelineErrorMetricDimensionEvent::InputPipelineCouldNotForwardEventFromDriver,
417                    std::format!("could not forward event from driver: {:?}", &e));
418            }
419        }
420
421        metrics_logger_clone.log_error(
422            InputPipelineErrorMetricDimensionEvent::InputPipelineStopHandlingEvents,
423            "Input pipeline stopped handling input events.".to_string(),
424        );
425    }
426
427    /// Watches the input report directory for new input devices. Creates InputDeviceBindings
428    /// if new devices match a type in `device_types`.
429    ///
430    /// # Parameters
431    /// - `device_watcher`: Watches the input report directory for new devices.
432    /// - `dir_proxy`: The directory containing InputDevice connections.
433    /// - `device_types`: The types of devices to watch for.
434    /// - `input_event_sender`: The channel new InputDeviceBindings will send InputEvents to.
435    /// - `bindings`: Holds all the InputDeviceBindings
436    /// - `input_devices_node`: The parent node for all device bindings' inspect nodes.
437    /// - `break_on_idle`: If true, stops watching for devices once all existing devices are handled.
438    /// - `metrics_logger`: The metrics logger.
439    ///
440    /// # Errors
441    /// If the input report directory or a file within it cannot be read.
442    async fn watch_for_devices(
443        mut device_watcher: Watcher,
444        dir_proxy: fio::DirectoryProxy,
445        device_types: Vec<input_device::InputDeviceType>,
446        input_event_sender: UnboundedSender<Vec<input_device::InputEvent>>,
447        bindings: InputDeviceBindingHashMap,
448        input_devices_node: &fuchsia_inspect::Node,
449        break_on_idle: bool,
450        feature_flags: input_device::InputPipelineFeatureFlags,
451        metrics_logger: metrics::MetricsLogger,
452    ) -> Result<(), Error> {
453        // Add non-static properties to inspect node.
454        let devices_discovered = input_devices_node.create_uint("devices_discovered", 0);
455        let devices_connected = input_devices_node.create_uint("devices_connected", 0);
456        while let Some(msg) = device_watcher.try_next().await? {
457            if let Ok(filename) = msg.filename.into_os_string().into_string() {
458                if filename == "." {
459                    continue;
460                }
461
462                let pathbuf = PathBuf::from(filename.clone());
463                match msg.event {
464                    WatchEvent::EXISTING | WatchEvent::ADD_FILE => {
465                        log::info!("found input device {}", filename);
466                        devices_discovered.add(1);
467                        let device_proxy =
468                            input_device::get_device_from_dir_entry_path(&dir_proxy, &pathbuf)?;
469                        add_device_bindings(
470                            &device_types,
471                            &filename,
472                            device_proxy,
473                            &input_event_sender,
474                            &bindings,
475                            get_next_device_id(),
476                            input_devices_node,
477                            Some(&devices_connected),
478                            feature_flags.clone(),
479                            metrics_logger.clone(),
480                        )
481                        .await;
482                    }
483                    WatchEvent::IDLE => {
484                        if break_on_idle {
485                            break;
486                        }
487                    }
488                    _ => (),
489                }
490            }
491        }
492        // Ensure inspect properties persist for debugging if device watch loop ends.
493        input_devices_node.record(devices_discovered);
494        input_devices_node.record(devices_connected);
495        Err(format_err!("Input pipeline stopped watching for new input devices."))
496    }
497
498    /// Handles the incoming InputDeviceRegistryRequestStream.
499    ///
500    /// This method will end when the request stream is closed. If the stream closes with an
501    /// error the error will be returned in the Result.
502    ///
503    /// **NOTE**: Only one stream is handled at a time. https://fxbug.dev/42061078
504    ///
505    /// # Parameters
506    /// - `stream`: The stream of InputDeviceRegistryRequests.
507    /// - `device_types`: The types of devices to watch for.
508    /// - `input_event_sender`: The channel new InputDeviceBindings will send InputEvents to.
509    /// - `bindings`: Holds all the InputDeviceBindings associated with the InputPipeline.
510    /// - `input_devices_node`: The parent node for all injected devices' inspect nodes.
511    /// - `metrics_logger`: The metrics logger.
512    pub async fn handle_input_device_registry_request_stream(
513        mut stream: fidl_fuchsia_input_injection::InputDeviceRegistryRequestStream,
514        device_types: &Vec<input_device::InputDeviceType>,
515        input_event_sender: &UnboundedSender<Vec<input_device::InputEvent>>,
516        bindings: &InputDeviceBindingHashMap,
517        input_devices_node: &fuchsia_inspect::Node,
518        feature_flags: input_device::InputPipelineFeatureFlags,
519        metrics_logger: metrics::MetricsLogger,
520    ) -> Result<(), Error> {
521        while let Some(request) = stream
522            .try_next()
523            .await
524            .context("Error handling input device registry request stream")?
525        {
526            match request {
527                fidl_fuchsia_input_injection::InputDeviceRegistryRequest::Register {
528                    device,
529                    ..
530                } => {
531                    // Add a binding if the device is a type being tracked
532                    let device_proxy = device.into_proxy();
533
534                    let device_id = get_next_device_id();
535
536                    add_device_bindings(
537                        device_types,
538                        &format!("input-device-registry-{}", device_id),
539                        device_proxy,
540                        input_event_sender,
541                        bindings,
542                        device_id,
543                        input_devices_node,
544                        None,
545                        feature_flags.clone(),
546                        metrics_logger.clone(),
547                    )
548                    .await;
549                }
550                fidl_fuchsia_input_injection::InputDeviceRegistryRequest::RegisterAndGetDeviceInfo {
551                    device,
552                    responder,
553                    .. } => {
554                    // Add a binding if the device is a type being tracked
555                    let device_proxy = device.into_proxy();
556
557                    let device_id = get_next_device_id();
558
559                    add_device_bindings(
560                        device_types,
561                        &format!("input-device-registry-{}", device_id),
562                        device_proxy,
563                        input_event_sender,
564                        bindings,
565                        device_id,
566                        input_devices_node,
567                        None,
568                        feature_flags.clone(),
569                        metrics_logger.clone(),
570                    )
571                    .await;
572
573                    responder.send(fidl_fuchsia_input_injection::InputDeviceRegistryRegisterAndGetDeviceInfoResponse{
574                        device_id: Some(device_id),
575                        ..Default::default()
576                    }).expect("Failed to respond to RegisterAndGetDeviceInfo request");
577                }
578            }
579        }
580
581        Ok(())
582    }
583
584    /// Initializes all handlers and starts the input pipeline loop in an asynchronous executor.
585    fn run(
586        mut receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
587        handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>,
588        metrics_logger: metrics::MetricsLogger,
589    ) {
590        fasync::Task::local(async move {
591            for handler in &handlers {
592                handler.clone().set_handler_healthy();
593            }
594
595            use input_device::InputEventType;
596            use std::collections::HashMap;
597
598            // Pre-compute handler lists for each event type.
599            let mut handlers_by_type: HashMap<
600                InputEventType,
601                Vec<Rc<dyn input_handler::BatchInputHandler>>,
602            > = HashMap::new();
603
604            // TODO: b/478262850 - We can use supported_input_devices to populate this list.
605            let event_types = vec![
606                InputEventType::Keyboard,
607                InputEventType::LightSensor,
608                InputEventType::ConsumerControls,
609                InputEventType::Mouse,
610                InputEventType::TouchScreen,
611                InputEventType::Touchpad,
612                #[cfg(test)]
613                InputEventType::Fake,
614            ];
615
616            for event_type in event_types {
617                let handlers_for_type: Vec<Rc<dyn input_handler::BatchInputHandler>> = handlers
618                    .iter()
619                    .filter(|h| h.interest().contains(&event_type))
620                    .cloned()
621                    .collect();
622                handlers_by_type.insert(event_type, handlers_for_type);
623            }
624
625            while let Some(events) = receiver.next().await {
626                if events.is_empty() {
627                    continue;
628                }
629
630                let mut groups_seen = 0;
631                for (event_type, event_group) in events
632                    .into_iter()
633                    .chunk_by(|e| InputEventType::from(&e.device_event))
634                    .into_iter()
635                {
636                    groups_seen += 1;
637                    if groups_seen == 2 {
638                        metrics_logger.log_error(
639                            InputPipelineErrorMetricDimensionEvent::InputFrameContainsMultipleTypesOfEvents,
640                            "it is not recommended to contain multiple types of events in 1 send".to_string(),
641                        );
642                    }
643                    let mut events_in_group: Vec<_> = event_group.collect();
644
645                    // Get pre-computed handlers for this event type.
646                    let handlers = handlers_by_type.get(&event_type).unwrap();
647
648                    for handler in handlers {
649                        events_in_group =
650                            handler.clone().handle_input_events(events_in_group).await;
651                    }
652
653                    for event in events_in_group {
654                        if event.handled == input_device::Handled::No {
655                            log::warn!("unhandled input event: {:?}", &event);
656                        }
657                        if let Some(trace_id) = event.trace_id {
658                            fuchsia_trace::flow_end!(
659                                "input",
660                                "event_in_input_pipeline",
661                                trace_id.into()
662                            );
663                        }
664                    }
665                }
666            }
667            for handler in &handlers {
668                handler.clone().set_handler_unhealthy("Pipeline loop terminated");
669            }
670            panic!("Runner task is not supposed to terminate.")
671        })
672        .detach();
673    }
674}
675
676/// Adds `InputDeviceBinding`s to `bindings` for all `device_types` exposed by `device_proxy`.
677///
678/// # Parameters
679/// - `device_types`: The types of devices to watch for.
680/// - `device_proxy`: A proxy to the input device.
681/// - `input_event_sender`: The channel new InputDeviceBindings will send InputEvents to.
682/// - `bindings`: Holds all the InputDeviceBindings associated with the InputPipeline.
683/// - `device_id`: The device id of the associated bindings.
684/// - `input_devices_node`: The parent node for all device bindings' inspect nodes.
685///
686/// # Note
687/// This will create multiple bindings, in the case where
688/// * `device_proxy().get_descriptor()` returns a `fidl_fuchsia_input_report::DeviceDescriptor`
689///   with multiple table fields populated, and
690/// * multiple populated table fields correspond to device types present in `device_types`
691///
692/// This is used, for example, to support the Atlas touchpad. In that case, a single
693/// node in `/dev/class/input-report` provides both a `fuchsia.input.report.MouseDescriptor` and
694/// a `fuchsia.input.report.TouchDescriptor`.
695async fn add_device_bindings(
696    device_types: &Vec<input_device::InputDeviceType>,
697    filename: &String,
698    device_proxy: fidl_fuchsia_input_report::InputDeviceProxy,
699    input_event_sender: &UnboundedSender<Vec<input_device::InputEvent>>,
700    bindings: &InputDeviceBindingHashMap,
701    device_id: u32,
702    input_devices_node: &fuchsia_inspect::Node,
703    devices_connected: Option<&fuchsia_inspect::UintProperty>,
704    feature_flags: InputPipelineFeatureFlags,
705    metrics_logger: metrics::MetricsLogger,
706) {
707    let mut matched_device_types = vec![];
708    if let Ok(descriptor) = device_proxy.get_descriptor().await {
709        for device_type in device_types {
710            if input_device::is_device_type(&descriptor, *device_type).await {
711                matched_device_types.push(device_type);
712                match devices_connected {
713                    Some(dev_connected) => {
714                        let _ = dev_connected.add(1);
715                    }
716                    None => (),
717                };
718            }
719        }
720        if matched_device_types.is_empty() {
721            log::info!(
722                "device {} did not match any supported device types: {:?}",
723                filename,
724                device_types
725            );
726            let device_node = input_devices_node.create_child(format!("{}_Unsupported", filename));
727            let mut health = fuchsia_inspect::health::Node::new(&device_node);
728            health.set_unhealthy("Unsupported device type.");
729            device_node.record(health);
730            input_devices_node.record(device_node);
731            return;
732        }
733    } else {
734        metrics_logger.clone().log_error(
735            InputPipelineErrorMetricDimensionEvent::InputPipelineNoDeviceDescriptor,
736            std::format!("cannot bind device {} without a device descriptor", filename),
737        );
738        return;
739    }
740
741    log::info!(
742        "binding {} to device types: {}",
743        filename,
744        matched_device_types
745            .iter()
746            .fold(String::new(), |device_types_string, device_type| device_types_string
747                + &format!("{:?}, ", device_type))
748    );
749
750    let mut new_bindings: Vec<BoxedInputDeviceBinding> = vec![];
751    for device_type in matched_device_types {
752        // Clone `device_proxy`, so that multiple bindings (e.g. a `MouseBinding` and a
753        // `TouchBinding`) can read data from the same `/dev/class/input-report` node.
754        //
755        // There's no conflict in having multiple bindings read from the same node,
756        // since:
757        // * each binding will create its own `fuchsia.input.report.InputReportsReader`, and
758        // * the device driver will copy each incoming report to each connected reader.
759        //
760        // This does mean that reports from the Atlas touchpad device get read twice
761        // (by a `MouseBinding` and a `TouchBinding`), regardless of whether the device
762        // is operating in mouse mode or touchpad mode.
763        //
764        // This hasn't been an issue because:
765        // * Semantically: things are fine, because each binding discards irrelevant reports.
766        //   (E.g. `MouseBinding` discards anything that isn't a `MouseInputReport`), and
767        // * Performance wise: things are fine, because the data rate of the touchpad is low
768        //   (125 HZ).
769        //
770        // If we add additional cases where bindings share an underlying `input-report` node,
771        // we might consider adding a multiplexing binding, to avoid reading duplicate reports.
772        let proxy = device_proxy.clone();
773        let device_node = input_devices_node.create_child(format!("{}_{}", filename, device_type));
774        match input_device::get_device_binding(
775            *device_type,
776            proxy,
777            device_id,
778            input_event_sender.clone(),
779            device_node,
780            feature_flags.clone(),
781            metrics_logger.clone(),
782        )
783        .await
784        {
785            Ok(binding) => new_bindings.push(binding),
786            Err(e) => {
787                metrics_logger.log_error(
788                    InputPipelineErrorMetricDimensionEvent::InputPipelineFailedToBind,
789                    std::format!("failed to bind {} as {:?}: {}", filename, device_type, e),
790                );
791            }
792        }
793    }
794
795    if !new_bindings.is_empty() {
796        let mut bindings = bindings.lock().await;
797        bindings.entry(device_id).or_insert(Vec::new()).extend(new_bindings);
798    }
799}
800
801#[cfg(test)]
802mod tests {
803    use super::*;
804    use crate::input_device::{InputDeviceBinding, InputEventType};
805    use crate::utils::Position;
806    use crate::{
807        fake_input_device_binding, mouse_binding, mouse_model_database,
808        observe_fake_events_input_handler,
809    };
810    use async_trait::async_trait;
811    use diagnostics_assertions::AnyProperty;
812    use fidl::endpoints::{create_proxy_and_stream, create_request_stream};
813    use fuchsia_async as fasync;
814    use futures::FutureExt;
815    use pretty_assertions::assert_eq;
816    use rand::Rng;
817    use std::collections::HashSet;
818    use vfs::{pseudo_directory, service as pseudo_fs_service};
819
820    const COUNTS_PER_MM: u32 = 12;
821
822    /// Returns the InputEvent sent over `sender`.
823    ///
824    /// # Parameters
825    /// - `sender`: The channel to send the InputEvent over.
826    fn send_input_event(
827        sender: UnboundedSender<Vec<input_device::InputEvent>>,
828    ) -> Vec<input_device::InputEvent> {
829        let mut rng = rand::rng();
830        let offset =
831            Position { x: rng.random_range(0..10) as f32, y: rng.random_range(0..10) as f32 };
832        let input_event = input_device::InputEvent {
833            device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
834                mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
835                    millimeters: Position {
836                        x: offset.x / COUNTS_PER_MM as f32,
837                        y: offset.y / COUNTS_PER_MM as f32,
838                    },
839                }),
840                None, /* wheel_delta_v */
841                None, /* wheel_delta_h */
842                mouse_binding::MousePhase::Move,
843                HashSet::new(),
844                HashSet::new(),
845                None, /* is_precision_scroll */
846                None, /* wake_lease */
847            )),
848            device_descriptor: input_device::InputDeviceDescriptor::Mouse(
849                mouse_binding::MouseDeviceDescriptor {
850                    device_id: 1,
851                    absolute_x_range: None,
852                    absolute_y_range: None,
853                    wheel_v_range: None,
854                    wheel_h_range: None,
855                    buttons: None,
856                    counts_per_mm: COUNTS_PER_MM,
857                },
858            ),
859            event_time: zx::MonotonicInstant::get(),
860            handled: input_device::Handled::No,
861            trace_id: None,
862        };
863        match sender.unbounded_send(vec![input_event.clone()]) {
864            Err(_) => assert!(false),
865            _ => {}
866        }
867
868        vec![input_event]
869    }
870
871    /// Returns a MouseDescriptor on an InputDeviceRequest.
872    ///
873    /// # Parameters
874    /// - `input_device_request`: The request to handle.
875    fn handle_input_device_request(
876        input_device_request: fidl_fuchsia_input_report::InputDeviceRequest,
877    ) {
878        match input_device_request {
879            fidl_fuchsia_input_report::InputDeviceRequest::GetDescriptor { responder } => {
880                let _ = responder.send(&fidl_fuchsia_input_report::DeviceDescriptor {
881                    device_information: None,
882                    mouse: Some(fidl_fuchsia_input_report::MouseDescriptor {
883                        input: Some(fidl_fuchsia_input_report::MouseInputDescriptor {
884                            movement_x: None,
885                            movement_y: None,
886                            scroll_v: None,
887                            scroll_h: None,
888                            buttons: Some(vec![0]),
889                            position_x: None,
890                            position_y: None,
891                            ..Default::default()
892                        }),
893                        ..Default::default()
894                    }),
895                    sensor: None,
896                    touch: None,
897                    keyboard: None,
898                    consumer_control: None,
899                    ..Default::default()
900                });
901            }
902            _ => {}
903        }
904    }
905
906    /// Tests that an input pipeline handles events from multiple devices.
907    #[fasync::run_singlethreaded(test)]
908    async fn multiple_devices_single_handler() {
909        // Create two fake device bindings.
910        let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
911        let first_device_binding =
912            fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
913        let second_device_binding =
914            fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
915
916        // Create a fake input handler.
917        let (handler_event_sender, mut handler_event_receiver) =
918            futures::channel::mpsc::channel(100);
919        let input_handler = observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
920            handler_event_sender,
921        );
922
923        // Build the input pipeline.
924        let (sender, receiver, handlers, _, _, _) =
925            InputPipelineAssembly::new(metrics::MetricsLogger::default())
926                .add_handler(input_handler)
927                .into_components();
928        let inspector = fuchsia_inspect::Inspector::default();
929        let test_node = inspector.root().create_child("input_pipeline");
930        let input_pipeline = InputPipeline {
931            pipeline_sender: sender,
932            device_event_sender,
933            device_event_receiver,
934            input_device_types: vec![],
935            input_device_bindings: Arc::new(Mutex::new(HashMap::new())),
936            inspect_node: test_node,
937            metrics_logger: metrics::MetricsLogger::default(),
938            feature_flags: input_device::InputPipelineFeatureFlags::default(),
939        };
940        InputPipeline::run(receiver, handlers, metrics::MetricsLogger::default());
941
942        // Send an input event from each device.
943        let first_device_events = send_input_event(first_device_binding.input_event_sender());
944        let second_device_events = send_input_event(second_device_binding.input_event_sender());
945
946        // Run the pipeline.
947        fasync::Task::local(async {
948            input_pipeline.handle_input_events().await;
949        })
950        .detach();
951
952        // Assert the handler receives the events.
953        let first_handled_event = handler_event_receiver.next().await;
954        assert_eq!(first_handled_event, first_device_events.into_iter().next());
955
956        let second_handled_event = handler_event_receiver.next().await;
957        assert_eq!(second_handled_event, second_device_events.into_iter().next());
958    }
959
960    /// Tests that an input pipeline handles events through multiple input handlers.
961    #[fasync::run_singlethreaded(test)]
962    async fn single_device_multiple_handlers() {
963        // Create two fake device bindings.
964        let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
965        let input_device_binding =
966            fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
967
968        // Create two fake input handlers.
969        let (first_handler_event_sender, mut first_handler_event_receiver) =
970            futures::channel::mpsc::channel(100);
971        let first_input_handler =
972            observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
973                first_handler_event_sender,
974            );
975        let (second_handler_event_sender, mut second_handler_event_receiver) =
976            futures::channel::mpsc::channel(100);
977        let second_input_handler =
978            observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
979                second_handler_event_sender,
980            );
981
982        // Build the input pipeline.
983        let (sender, receiver, handlers, _, _, _) =
984            InputPipelineAssembly::new(metrics::MetricsLogger::default())
985                .add_handler(first_input_handler)
986                .add_handler(second_input_handler)
987                .into_components();
988        let inspector = fuchsia_inspect::Inspector::default();
989        let test_node = inspector.root().create_child("input_pipeline");
990        let input_pipeline = InputPipeline {
991            pipeline_sender: sender,
992            device_event_sender,
993            device_event_receiver,
994            input_device_types: vec![],
995            input_device_bindings: Arc::new(Mutex::new(HashMap::new())),
996            inspect_node: test_node,
997            metrics_logger: metrics::MetricsLogger::default(),
998            feature_flags: input_device::InputPipelineFeatureFlags::default(),
999        };
1000        InputPipeline::run(receiver, handlers, metrics::MetricsLogger::default());
1001
1002        // Send an input event.
1003        let input_events = send_input_event(input_device_binding.input_event_sender());
1004
1005        // Run the pipeline.
1006        fasync::Task::local(async {
1007            input_pipeline.handle_input_events().await;
1008        })
1009        .detach();
1010
1011        // Assert both handlers receive the event.
1012        let expected_event = input_events.into_iter().next();
1013        let first_handler_event = first_handler_event_receiver.next().await;
1014        assert_eq!(first_handler_event, expected_event);
1015        let second_handler_event = second_handler_event_receiver.next().await;
1016        assert_eq!(second_handler_event, expected_event);
1017    }
1018
1019    /// Tests that a single mouse device binding is created for the one input device in the
1020    /// input report directory.
1021    #[fasync::run_singlethreaded(test)]
1022    async fn watch_devices_one_match_exists() {
1023        // Create a file in a pseudo directory that represents an input device.
1024        let mut count: i8 = 0;
1025        let dir = pseudo_directory! {
1026            "file_name" => pseudo_fs_service::host(
1027                move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1028                    async move {
1029                        while count < 3 {
1030                            if let Some(input_device_request) =
1031                                request_stream.try_next().await.unwrap()
1032                            {
1033                                handle_input_device_request(input_device_request);
1034                                count += 1;
1035                            }
1036                        }
1037
1038                    }.boxed()
1039                },
1040            )
1041        };
1042
1043        // Create a Watcher on the pseudo directory.
1044        let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1045        let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1046        // Get a proxy to the pseudo directory for the input pipeline. The input pipeline uses this
1047        // proxy to get connections to input devices.
1048        let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1049
1050        let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1051        let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1052        let supported_device_types = vec![input_device::InputDeviceType::Mouse];
1053
1054        let inspector = fuchsia_inspect::Inspector::default();
1055        let test_node = inspector.root().create_child("input_pipeline");
1056        test_node.record_string(
1057            "supported_input_devices",
1058            supported_device_types.clone().iter().join(", "),
1059        );
1060        let input_devices = test_node.create_child("input_devices");
1061        // Assert that inspect tree is initialized with no devices.
1062        diagnostics_assertions::assert_data_tree!(inspector, root: {
1063            input_pipeline: {
1064                supported_input_devices: "Mouse",
1065                input_devices: {}
1066            }
1067        });
1068
1069        let _ = InputPipeline::watch_for_devices(
1070            device_watcher,
1071            dir_proxy_for_pipeline,
1072            supported_device_types,
1073            input_event_sender,
1074            bindings.clone(),
1075            &input_devices,
1076            true, /* break_on_idle */
1077            InputPipelineFeatureFlags { enable_merge_touch_events: false },
1078            metrics::MetricsLogger::default(),
1079        )
1080        .await;
1081
1082        // Assert that one mouse device with accurate device id was found.
1083        let bindings_hashmap = bindings.lock().await;
1084        assert_eq!(bindings_hashmap.len(), 1);
1085        let bindings_vector = bindings_hashmap.get(&10);
1086        assert!(bindings_vector.is_some());
1087        assert_eq!(bindings_vector.unwrap().len(), 1);
1088        let boxed_mouse_binding = bindings_vector.unwrap().get(0);
1089        assert!(boxed_mouse_binding.is_some());
1090        assert_eq!(
1091            boxed_mouse_binding.unwrap().get_device_descriptor(),
1092            input_device::InputDeviceDescriptor::Mouse(mouse_binding::MouseDeviceDescriptor {
1093                device_id: 10,
1094                absolute_x_range: None,
1095                absolute_y_range: None,
1096                wheel_v_range: None,
1097                wheel_h_range: None,
1098                buttons: Some(vec![0]),
1099                counts_per_mm: mouse_model_database::db::DEFAULT_COUNTS_PER_MM,
1100            })
1101        );
1102
1103        // Assert that inspect tree reflects new device discovered and connected.
1104        diagnostics_assertions::assert_data_tree!(inspector, root: {
1105            input_pipeline: {
1106                supported_input_devices: "Mouse",
1107                input_devices: {
1108                    devices_discovered: 1u64,
1109                    devices_connected: 1u64,
1110                    "file_name_Mouse": contains {
1111                        reports_received_count: 0u64,
1112                        reports_filtered_count: 0u64,
1113                        events_generated: 0u64,
1114                        last_received_timestamp_ns: 0u64,
1115                        last_generated_timestamp_ns: 0u64,
1116                        "fuchsia.inspect.Health": {
1117                            status: "OK",
1118                            // Timestamp value is unpredictable and not relevant in this context,
1119                            // so we only assert that the property is present.
1120                            start_timestamp_nanos: AnyProperty
1121                        },
1122                    }
1123                }
1124            }
1125        });
1126    }
1127
1128    /// Tests that no device bindings are created because the input pipeline looks for keyboard devices
1129    /// but only a mouse exists.
1130    #[fasync::run_singlethreaded(test)]
1131    async fn watch_devices_no_matches_exist() {
1132        // Create a file in a pseudo directory that represents an input device.
1133        let mut count: i8 = 0;
1134        let dir = pseudo_directory! {
1135            "file_name" => pseudo_fs_service::host(
1136                move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1137                    async move {
1138                        while count < 1 {
1139                            if let Some(input_device_request) =
1140                                request_stream.try_next().await.unwrap()
1141                            {
1142                                handle_input_device_request(input_device_request);
1143                                count += 1;
1144                            }
1145                        }
1146
1147                    }.boxed()
1148                },
1149            )
1150        };
1151
1152        // Create a Watcher on the pseudo directory.
1153        let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1154        let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1155        // Get a proxy to the pseudo directory for the input pipeline. The input pipeline uses this
1156        // proxy to get connections to input devices.
1157        let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1158
1159        let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1160        let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1161        let supported_device_types = vec![input_device::InputDeviceType::Keyboard];
1162
1163        let inspector = fuchsia_inspect::Inspector::default();
1164        let test_node = inspector.root().create_child("input_pipeline");
1165        test_node.record_string(
1166            "supported_input_devices",
1167            supported_device_types.clone().iter().join(", "),
1168        );
1169        let input_devices = test_node.create_child("input_devices");
1170        // Assert that inspect tree is initialized with no devices.
1171        diagnostics_assertions::assert_data_tree!(inspector, root: {
1172            input_pipeline: {
1173                supported_input_devices: "Keyboard",
1174                input_devices: {}
1175            }
1176        });
1177
1178        let _ = InputPipeline::watch_for_devices(
1179            device_watcher,
1180            dir_proxy_for_pipeline,
1181            supported_device_types,
1182            input_event_sender,
1183            bindings.clone(),
1184            &input_devices,
1185            true, /* break_on_idle */
1186            InputPipelineFeatureFlags { enable_merge_touch_events: false },
1187            metrics::MetricsLogger::default(),
1188        )
1189        .await;
1190
1191        // Assert that no devices were found.
1192        let bindings = bindings.lock().await;
1193        assert_eq!(bindings.len(), 0);
1194
1195        // Assert that inspect tree reflects new device discovered, but not connected.
1196        diagnostics_assertions::assert_data_tree!(inspector, root: {
1197            input_pipeline: {
1198                supported_input_devices: "Keyboard",
1199                input_devices: {
1200                    devices_discovered: 1u64,
1201                    devices_connected: 0u64,
1202                    "file_name_Unsupported": {
1203                        "fuchsia.inspect.Health": {
1204                            status: "UNHEALTHY",
1205                            message: "Unsupported device type.",
1206                            // Timestamp value is unpredictable and not relevant in this context,
1207                            // so we only assert that the property is present.
1208                            start_timestamp_nanos: AnyProperty
1209                        },
1210                    }
1211                }
1212            }
1213        });
1214    }
1215
1216    /// Tests that a single keyboard device binding is created for the input device registered
1217    /// through InputDeviceRegistry.
1218    #[fasync::run_singlethreaded(test)]
1219    async fn handle_input_device_registry_request_stream() {
1220        let (input_device_registry_proxy, input_device_registry_request_stream) =
1221            create_proxy_and_stream::<fidl_fuchsia_input_injection::InputDeviceRegistryMarker>();
1222        let (input_device_client_end, mut input_device_request_stream) =
1223            create_request_stream::<fidl_fuchsia_input_report::InputDeviceMarker>();
1224
1225        let device_types = vec![input_device::InputDeviceType::Mouse];
1226        let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1227        let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1228
1229        // Handle input device requests.
1230        let mut count: i8 = 0;
1231        fasync::Task::local(async move {
1232            // Register a device.
1233            let _ = input_device_registry_proxy.register(input_device_client_end);
1234
1235            while count < 3 {
1236                if let Some(input_device_request) =
1237                    input_device_request_stream.try_next().await.unwrap()
1238                {
1239                    handle_input_device_request(input_device_request);
1240                    count += 1;
1241                }
1242            }
1243
1244            // End handle_input_device_registry_request_stream() by taking the event stream.
1245            input_device_registry_proxy.take_event_stream();
1246        })
1247        .detach();
1248
1249        let inspector = fuchsia_inspect::Inspector::default();
1250        let test_node = inspector.root().create_child("input_pipeline");
1251
1252        // Start listening for InputDeviceRegistryRequests.
1253        let bindings_clone = bindings.clone();
1254        let _ = InputPipeline::handle_input_device_registry_request_stream(
1255            input_device_registry_request_stream,
1256            &device_types,
1257            &input_event_sender,
1258            &bindings_clone,
1259            &test_node,
1260            InputPipelineFeatureFlags { enable_merge_touch_events: false },
1261            metrics::MetricsLogger::default(),
1262        )
1263        .await;
1264
1265        // Assert that a device was registered.
1266        let bindings = bindings.lock().await;
1267        assert_eq!(bindings.len(), 1);
1268    }
1269
1270    // Tests that correct properties are added to inspect node when InputPipeline is created.
1271    #[fasync::run_singlethreaded(test)]
1272    async fn check_inspect_node_has_correct_properties() {
1273        let device_types = vec![
1274            input_device::InputDeviceType::Touch,
1275            input_device::InputDeviceType::ConsumerControls,
1276        ];
1277        let inspector = fuchsia_inspect::Inspector::default();
1278        let test_node = inspector.root().create_child("input_pipeline");
1279        // Create fake input handler for assembly
1280        let (fake_handler_event_sender, _fake_handler_event_receiver) =
1281            futures::channel::mpsc::channel(100);
1282        let fake_input_handler =
1283            observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
1284                fake_handler_event_sender,
1285            );
1286        let assembly = InputPipelineAssembly::new(metrics::MetricsLogger::default())
1287            .add_handler(fake_input_handler);
1288        let _test_input_pipeline = InputPipeline::new(
1289            device_types,
1290            assembly,
1291            test_node,
1292            InputPipelineFeatureFlags { enable_merge_touch_events: false },
1293            metrics::MetricsLogger::default(),
1294        );
1295        diagnostics_assertions::assert_data_tree!(inspector, root: {
1296            input_pipeline: {
1297                supported_input_devices: "Touch, ConsumerControls",
1298                handlers_registered: 1u64,
1299                handlers_healthy: 1u64,
1300                input_devices: {}
1301            }
1302        });
1303    }
1304
1305    struct SpecificInterestFakeHandler {
1306        interest_types: Vec<input_device::InputEventType>,
1307        event_sender: std::cell::RefCell<futures::channel::mpsc::Sender<input_device::InputEvent>>,
1308    }
1309
1310    impl SpecificInterestFakeHandler {
1311        pub fn new(
1312            interest_types: Vec<input_device::InputEventType>,
1313            event_sender: futures::channel::mpsc::Sender<input_device::InputEvent>,
1314        ) -> Rc<Self> {
1315            Rc::new(SpecificInterestFakeHandler {
1316                interest_types,
1317                event_sender: std::cell::RefCell::new(event_sender),
1318            })
1319        }
1320    }
1321
1322    impl Handler for SpecificInterestFakeHandler {
1323        fn set_handler_healthy(self: std::rc::Rc<Self>) {}
1324        fn set_handler_unhealthy(self: std::rc::Rc<Self>, _msg: &str) {}
1325        fn get_name(&self) -> &'static str {
1326            "SpecificInterestFakeHandler"
1327        }
1328
1329        fn interest(&self) -> Vec<input_device::InputEventType> {
1330            self.interest_types.clone()
1331        }
1332    }
1333
1334    #[async_trait(?Send)]
1335    impl input_handler::InputHandler for SpecificInterestFakeHandler {
1336        async fn handle_input_event(
1337            self: Rc<Self>,
1338            input_event: input_device::InputEvent,
1339        ) -> Vec<input_device::InputEvent> {
1340            match self.event_sender.borrow_mut().try_send(input_event.clone()) {
1341                Err(e) => panic!("SpecificInterestFakeHandler failed to send event: {:?}", e),
1342                Ok(_) => {}
1343            }
1344            vec![input_event]
1345        }
1346    }
1347
1348    #[fasync::run_singlethreaded(test)]
1349    async fn run_only_sends_events_to_interested_handlers() {
1350        // Mouse Handler (Specific Interest: Mouse)
1351        let (mouse_sender, mut mouse_receiver) = futures::channel::mpsc::channel(1);
1352        let mouse_handler =
1353            SpecificInterestFakeHandler::new(vec![InputEventType::Mouse], mouse_sender);
1354
1355        // Fake Handler (Specific Interest: Fake)
1356        let (fake_sender, mut fake_receiver) = futures::channel::mpsc::channel(1);
1357        let fake_handler =
1358            SpecificInterestFakeHandler::new(vec![InputEventType::Fake], fake_sender);
1359
1360        let (pipeline_sender, pipeline_receiver, handlers, _, _, _) =
1361            InputPipelineAssembly::new(metrics::MetricsLogger::default())
1362                .add_handler(mouse_handler)
1363                .add_handler(fake_handler)
1364                .into_components();
1365
1366        // Run the pipeline logic
1367        InputPipeline::run(pipeline_receiver, handlers, metrics::MetricsLogger::default());
1368
1369        // Create a Fake event
1370        let fake_event = input_device::InputEvent {
1371            device_event: input_device::InputDeviceEvent::Fake,
1372            device_descriptor: input_device::InputDeviceDescriptor::Fake,
1373            event_time: zx::MonotonicInstant::get(),
1374            handled: input_device::Handled::No,
1375            trace_id: None,
1376        };
1377
1378        // Send the Fake event
1379        pipeline_sender.unbounded_send(vec![fake_event.clone()]).expect("failed to send event");
1380
1381        // Verify Fake Handler received it
1382        let received_by_fake = fake_receiver.next().await;
1383        assert_eq!(received_by_fake, Some(fake_event));
1384
1385        // Verify Mouse Handler did NOT receive it
1386        assert!(mouse_receiver.try_next().is_err());
1387    }
1388
1389    fn create_mouse_event(x: f32, y: f32) -> input_device::InputEvent {
1390        input_device::InputEvent {
1391            device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
1392                mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
1393                    millimeters: Position { x, y },
1394                }),
1395                None,
1396                None,
1397                mouse_binding::MousePhase::Move,
1398                HashSet::new(),
1399                HashSet::new(),
1400                None,
1401                None,
1402            )),
1403            device_descriptor: input_device::InputDeviceDescriptor::Mouse(
1404                mouse_binding::MouseDeviceDescriptor {
1405                    device_id: 1,
1406                    absolute_x_range: None,
1407                    absolute_y_range: None,
1408                    wheel_v_range: None,
1409                    wheel_h_range: None,
1410                    buttons: None,
1411                    counts_per_mm: 1,
1412                },
1413            ),
1414            event_time: zx::MonotonicInstant::get(),
1415            handled: input_device::Handled::No,
1416            trace_id: None,
1417        }
1418    }
1419
1420    #[fasync::run_singlethreaded(test)]
1421    async fn run_mixed_event_types_dispatched_correctly() {
1422        // Mouse Handler (Specific Interest: Mouse)
1423        let (mouse_sender, mut mouse_receiver) = futures::channel::mpsc::channel(10);
1424        let mouse_handler =
1425            SpecificInterestFakeHandler::new(vec![InputEventType::Mouse], mouse_sender);
1426
1427        // Fake Handler (Specific Interest: Fake)
1428        let (fake_sender, mut fake_receiver) = futures::channel::mpsc::channel(10);
1429        let fake_handler =
1430            SpecificInterestFakeHandler::new(vec![InputEventType::Fake], fake_sender);
1431
1432        let (pipeline_sender, pipeline_receiver, handlers, _, _, _) =
1433            InputPipelineAssembly::new(metrics::MetricsLogger::default())
1434                .add_handler(mouse_handler)
1435                .add_handler(fake_handler)
1436                .into_components();
1437
1438        // Run the pipeline logic
1439        InputPipeline::run(pipeline_receiver, handlers, metrics::MetricsLogger::default());
1440
1441        // Create events
1442        let mouse_event_1 = create_mouse_event(1.0, 1.0);
1443        let mouse_event_2 = create_mouse_event(2.0, 2.0);
1444        let mouse_event_3 = create_mouse_event(3.0, 3.0);
1445
1446        let fake_event_1 = input_device::InputEvent {
1447            device_event: input_device::InputDeviceEvent::Fake,
1448            device_descriptor: input_device::InputDeviceDescriptor::Fake,
1449            event_time: zx::MonotonicInstant::get(),
1450            handled: input_device::Handled::No,
1451            trace_id: None,
1452        };
1453
1454        // Send mixed batch: [Mouse, Mouse, Fake, Mouse]
1455        // This should result in 3 chunks: [Mouse, Mouse], [Fake], [Mouse]
1456        let mixed_batch = vec![
1457            mouse_event_1.clone(),
1458            mouse_event_2.clone(),
1459            fake_event_1.clone(),
1460            mouse_event_3.clone(),
1461        ];
1462        pipeline_sender.unbounded_send(mixed_batch).expect("failed to send events");
1463
1464        // Verify Mouse Handler received M1, M2, and then M3
1465        assert_eq!(mouse_receiver.next().await, Some(mouse_event_1));
1466        assert_eq!(mouse_receiver.next().await, Some(mouse_event_2));
1467        assert_eq!(mouse_receiver.next().await, Some(mouse_event_3));
1468
1469        // Verify Fake Handler received F1
1470        assert_eq!(fake_receiver.next().await, Some(fake_event_1));
1471    }
1472}