Skip to main content

input_pipeline_dso/
input_pipeline.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::display_ownership::DisplayOwnership;
6use crate::focus_listener::FocusListener;
7use crate::input_device::InputPipelineFeatureFlags;
8use crate::input_handler::Handler;
9use crate::{Dispatcher, Incoming, input_device, input_handler, metrics};
10use anyhow::{Context, Error, format_err};
11use fidl::endpoints;
12use fidl_fuchsia_io as fio;
13use focus_chain_provider::FocusChainProviderPublisher;
14use fuchsia_async as fasync;
15use fuchsia_component::directory::AsRefDirectory;
16use fuchsia_fs::directory::{WatchEvent, Watcher};
17use fuchsia_inspect::NumericProperty;
18use fuchsia_inspect::health::Reporter;
19use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
20use futures::future::LocalBoxFuture;
21use futures::lock::Mutex;
22use futures::{StreamExt, TryStreamExt};
23use itertools::Itertools;
24use metrics_registry::*;
25use std::collections::HashMap;
26use std::path::PathBuf;
27use std::rc::Rc;
28use std::sync::atomic::{AtomicU32, Ordering};
29use std::sync::{Arc, LazyLock};
30
31/// Use a self incremental u32 unique id for device_id.
32///
33/// device id start from 10 to avoid conflict with default devices in Starnix.
34/// Currently, Starnix using 0 and 1 as default devices' id. Starnix need to
35/// use default devices to deliver events from physical devices until we have
36/// API to expose device changes to UI clients.
37static NEXT_DEVICE_ID: LazyLock<AtomicU32> = LazyLock::new(|| AtomicU32::new(10));
38
39/// Each time this function is invoked, it returns the current value of its
40/// internal counter (serving as a unique id for device_id) and then increments
41/// that counter in preparation for the next call.
42fn get_next_device_id() -> u32 {
43    NEXT_DEVICE_ID.fetch_add(1, Ordering::SeqCst)
44}
45
46type BoxedInputDeviceBinding = Box<dyn input_device::InputDeviceBinding>;
47
48/// An [`InputDeviceBindingHashMap`] maps an input device to one or more InputDeviceBindings.
49/// It uses unique device id as key.
50pub type InputDeviceBindingHashMap = Arc<Mutex<HashMap<u32, Vec<BoxedInputDeviceBinding>>>>;
51
52/// An input pipeline assembly.
53///
54/// Represents a partial stage of the input pipeline which accepts inputs through an asynchronous
55/// sender channel, and emits outputs through an asynchronous receiver channel.  Use [new] to
56/// create a new assembly.  Use [add_handler], or [add_all_handlers] to add the input pipeline
57/// handlers to use.  When done, [InputPipeline::new] can be used to make a new input pipeline.
58///
59/// # Implementation notes
60///
61/// Internally, when a new [InputPipelineAssembly] is created with multiple [InputHandler]s, the
62/// handlers are connected together using async queues.  This allows fully streamed processing of
63/// input events, and also allows some pipeline stages to generate events spontaneously, i.e.
64/// without an external stimulus.
65pub struct InputPipelineAssembly {
66    /// The top-level sender: send into this queue to inject an event into the input
67    /// pipeline.
68    sender: UnboundedSender<Vec<input_device::InputEvent>>,
69    /// The bottom-level receiver: any events that fall through the entire pipeline can
70    /// be read from this receiver.
71    receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
72
73    /// The input handlers that comprise the input pipeline.
74    handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>,
75
76    /// The display ownership watcher task.
77    display_ownership_fut: Option<LocalBoxFuture<'static, ()>>,
78
79    /// The focus listener task.
80    focus_listener_fut: Option<LocalBoxFuture<'static, ()>>,
81
82    /// The metrics logger.
83    metrics_logger: metrics::MetricsLogger,
84}
85
86impl InputPipelineAssembly {
87    /// Create a new but empty [InputPipelineAssembly]. Use [add_handler] or similar
88    /// to add new handlers to it.
89    pub fn new(metrics_logger: metrics::MetricsLogger) -> Self {
90        let (sender, receiver) = mpsc::unbounded();
91        InputPipelineAssembly {
92            sender,
93            receiver,
94            handlers: vec![],
95            metrics_logger,
96            display_ownership_fut: None,
97            focus_listener_fut: None,
98        }
99    }
100
101    /// Adds another [input_handler::BatchInputHandler] into the [InputPipelineAssembly]. The handlers
102    /// are invoked in the order they are added. Returns `Self` for chaining.
103    pub fn add_handler(mut self, handler: Rc<dyn input_handler::BatchInputHandler>) -> Self {
104        self.handlers.push(handler);
105        self
106    }
107
108    /// Adds all handlers into the assembly in the order they appear in `handlers`.
109    pub fn add_all_handlers(self, handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>) -> Self {
110        handlers.into_iter().fold(self, |assembly, handler| assembly.add_handler(handler))
111    }
112
113    pub fn add_display_ownership(
114        mut self,
115        display_ownership_event: zx::Event,
116        input_handlers_node: &fuchsia_inspect::Node,
117    ) -> InputPipelineAssembly {
118        let h = DisplayOwnership::new(
119            display_ownership_event,
120            input_handlers_node,
121            self.metrics_logger.clone(),
122        );
123        let metrics_logger_clone = self.metrics_logger.clone();
124        let h_clone = h.clone();
125        let sender_clone = self.sender.clone();
126        let display_ownership_fut = Box::pin(async move {
127            h_clone.clone().set_handler_healthy();
128            h_clone.clone()
129                .handle_ownership_change(sender_clone)
130                .await
131                .map_err(|e| {
132                    metrics_logger_clone.log_error(
133                        InputPipelineErrorMetricDimensionEvent::InputPipelineDisplayOwnershipIsNotSupposedToTerminate,
134                        std::format!(
135                            "display ownership is not supposed to terminate - this is likely a problem: {:?}", e));
136                        })
137                        .unwrap();
138            h_clone.set_handler_unhealthy("Receive loop terminated for handler: DisplayOwnership");
139        });
140        self.display_ownership_fut = Some(display_ownership_fut);
141        self.add_handler(h)
142    }
143
144    /// Deconstructs the assembly into constituent components, used when constructing
145    /// [InputPipeline].
146    ///
147    /// You should call [catch_unhandled] on the returned [async_channel::Receiver], and
148    /// [run] on the returned [fuchsia_async::Tasks] (or supply own equivalents).
149    fn into_components(
150        self,
151    ) -> (
152        UnboundedSender<Vec<input_device::InputEvent>>,
153        UnboundedReceiver<Vec<input_device::InputEvent>>,
154        Vec<Rc<dyn input_handler::BatchInputHandler>>,
155        metrics::MetricsLogger,
156        Option<LocalBoxFuture<'static, ()>>,
157        Option<LocalBoxFuture<'static, ()>>,
158    ) {
159        (
160            self.sender,
161            self.receiver,
162            self.handlers,
163            self.metrics_logger,
164            self.display_ownership_fut,
165            self.focus_listener_fut,
166        )
167    }
168
169    pub fn add_focus_listener(
170        mut self,
171        incoming: &Incoming,
172        focus_chain_publisher: FocusChainProviderPublisher,
173    ) -> Self {
174        let metrics_logger_clone = self.metrics_logger.clone();
175        let incoming2 = incoming.clone();
176        let focus_listener_fut = Box::pin(async move {
177            if let Ok(mut focus_listener) = FocusListener::new(
178                &incoming2,
179                focus_chain_publisher,
180                metrics_logger_clone,
181            )
182            .map_err(|e| {
183                log::warn!("could not create focus listener, focus will not be dispatched: {:?}", e)
184            }) {
185                // This will await indefinitely and process focus messages in a loop, unless there
186                // is a problem.
187                let _result = focus_listener
188                    .dispatch_focus_changes()
189                    .await
190                    .map(|_| {
191                        log::warn!("dispatch focus loop ended, focus will no longer be dispatched")
192                    })
193                    .map_err(|e| {
194                        panic!("could not dispatch focus changes, this is a fatal error: {:?}", e)
195                    });
196            }
197        });
198        self.focus_listener_fut = Some(focus_listener_fut);
199        self
200    }
201}
202
203/// An [`InputPipeline`] manages input devices and propagates input events through input handlers.
204///
205/// On creation, clients declare what types of input devices an [`InputPipeline`] manages. The
206/// [`InputPipeline`] will continuously detect new input devices of supported type(s).
207///
208/// # Example
209/// ```
210/// let ime_handler =
211///     ImeHandler::new(scene_manager.session.clone(), scene_manager.compositor_id).await?;
212/// let touch_handler = TouchHandler::new(
213///     scene_manager.session.clone(),
214///     scene_manager.compositor_id,
215///     scene_manager.display_size
216/// ).await?;
217///
218/// let assembly = InputPipelineAssembly::new()
219///     .add_handler(Box::new(ime_handler)),
220///     .add_handler(Box::new(touch_handler)),
221/// let input_pipeline = InputPipeline::new(
222///     vec![
223///         input_device::InputDeviceType::Touch,
224///         input_device::InputDeviceType::Keyboard,
225///     ],
226///     assembly,
227/// );
228/// input_pipeline.handle_input_events().await;
229/// ```
230pub struct InputPipeline {
231    /// The entry point into the input handler pipeline. Incoming input events should
232    /// be inserted into this async queue, and the input pipeline will ensure that they
233    /// are propagated through all the input handlers in the appropriate sequence.
234    pipeline_sender: UnboundedSender<Vec<input_device::InputEvent>>,
235
236    /// A clone of this sender is given to every InputDeviceBinding that this pipeline owns.
237    /// Each InputDeviceBinding will send InputEvents to the pipeline through this channel.
238    device_event_sender: UnboundedSender<Vec<input_device::InputEvent>>,
239
240    /// Receives InputEvents from all InputDeviceBindings that this pipeline owns.
241    device_event_receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
242
243    /// The types of devices this pipeline supports.
244    input_device_types: Vec<input_device::InputDeviceType>,
245
246    /// The InputDeviceBindings bound to this pipeline.
247    input_device_bindings: InputDeviceBindingHashMap,
248
249    /// This node is bound to the lifetime of this InputPipeline.
250    /// Inspect data will be dumped for this pipeline as long as it exists.
251    inspect_node: fuchsia_inspect::Node,
252
253    /// The metrics logger.
254    metrics_logger: metrics::MetricsLogger,
255
256    /// The feature flags for the input pipeline.
257    pub feature_flags: input_device::InputPipelineFeatureFlags,
258}
259
260impl InputPipeline {
261    fn new_common(
262        input_device_types: Vec<input_device::InputDeviceType>,
263        assembly: InputPipelineAssembly,
264        inspect_node: fuchsia_inspect::Node,
265        feature_flags: input_device::InputPipelineFeatureFlags,
266    ) -> Self {
267        let (
268            pipeline_sender,
269            receiver,
270            handlers,
271            metrics_logger,
272            display_ownership_fut,
273            focus_listener_fut,
274        ) = assembly.into_components();
275
276        let mut handlers_count = handlers.len();
277        // TODO: b/469745447 - should use futures::select! instead of Task::local().detach().
278        if let Some(fut) = display_ownership_fut {
279            fasync::Task::local(fut).detach();
280            handlers_count += 1;
281        }
282
283        // TODO: b/469745447 - should use futures::select! instead of Task::local().detach().
284        if let Some(fut) = focus_listener_fut {
285            fasync::Task::local(fut).detach();
286            handlers_count += 1;
287        }
288
289        // Add properties to inspect node
290        inspect_node.record_string("supported_input_devices", input_device_types.iter().join(", "));
291        inspect_node.record_uint("handlers_registered", handlers_count as u64);
292        inspect_node.record_uint("handlers_healthy", handlers_count as u64);
293
294        // Initializes all handlers and starts the input pipeline loop.
295        InputPipeline::run(receiver, handlers, metrics_logger.clone());
296
297        let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
298        let input_device_bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
299        InputPipeline {
300            pipeline_sender,
301            device_event_sender,
302            device_event_receiver,
303            input_device_types,
304            input_device_bindings,
305            inspect_node,
306            metrics_logger,
307            feature_flags,
308        }
309    }
310
311    /// Creates a new [`InputPipeline`] for integration testing.
312    /// Unlike a production input pipeline, this pipeline will not monitor
313    /// `/dev/class/input-report` for devices.
314    ///
315    /// # Parameters
316    /// - `input_device_types`: The types of devices the new [`InputPipeline`] will support.
317    /// - `assembly`: The input handlers that the [`InputPipeline`] sends InputEvents to.
318    pub fn new_for_test(
319        input_device_types: Vec<input_device::InputDeviceType>,
320        assembly: InputPipelineAssembly,
321    ) -> Self {
322        let inspector = fuchsia_inspect::Inspector::default();
323        let root = inspector.root();
324        let test_node = root.create_child("input_pipeline");
325        Self::new_common(
326            input_device_types,
327            assembly,
328            test_node,
329            input_device::InputPipelineFeatureFlags { enable_merge_touch_events: false },
330        )
331    }
332
333    /// Creates a new [`InputPipeline`] for production use.
334    ///
335    /// # Parameters
336    /// - `input_device_types`: The types of devices the new [`InputPipeline`] will support.
337    /// - `assembly`: The input handlers that the [`InputPipeline`] sends InputEvents to.
338    /// - `inspect_node`: The root node for InputPipeline's Inspect tree
339    pub fn new(
340        incoming: &Incoming,
341        input_device_types: Vec<input_device::InputDeviceType>,
342        assembly: InputPipelineAssembly,
343        inspect_node: fuchsia_inspect::Node,
344        feature_flags: input_device::InputPipelineFeatureFlags,
345        metrics_logger: metrics::MetricsLogger,
346    ) -> Result<Self, Error> {
347        let input_pipeline =
348            Self::new_common(input_device_types, assembly, inspect_node, feature_flags);
349        let input_device_types = input_pipeline.input_device_types.clone();
350        let input_event_sender = input_pipeline.device_event_sender.clone();
351        let input_device_bindings = input_pipeline.input_device_bindings.clone();
352        let devices_node = input_pipeline.inspect_node.create_child("input_devices");
353        let feature_flags = input_pipeline.feature_flags.clone();
354        let incoming = incoming.clone();
355        fasync::Task::local(async move {
356            // Watches the input device directory for new input devices. Creates new InputDeviceBindings
357            // that send InputEvents to `input_event_receiver`.
358            match async {
359                let (dir_proxy, server) = endpoints::create_proxy::<fio::DirectoryMarker>();
360                incoming.as_ref_directory().open(
361                    input_device::INPUT_REPORT_PATH,
362                    fio::PERM_READABLE,
363                    server.into()
364                )
365                .with_context(|| format!("failed to open {}", input_device::INPUT_REPORT_PATH))?;
366                let device_watcher =
367                    Watcher::new(&dir_proxy).await.context("failed to create watcher")?;
368                Self::watch_for_devices(
369                    device_watcher,
370                    dir_proxy,
371                    input_device_types,
372                    input_event_sender,
373                    input_device_bindings,
374                    &devices_node,
375                    false, /* break_on_idle */
376                    feature_flags,
377                    metrics_logger.clone(),
378                )
379                .await
380                .context("failed to watch for devices")
381            }
382            .await
383            {
384                Ok(()) => {}
385                Err(err) => {
386                    // This error is usually benign in tests: it means that the setup does not
387                    // support dynamic device discovery. Almost no tests support dynamic
388                    // device discovery, and they also do not need those.
389                    metrics_logger.log_warn(
390                        InputPipelineErrorMetricDimensionEvent::InputPipelineUnableToWatchForNewInputDevices,
391                        std::format!(
392                            "Input pipeline is unable to watch for new input devices: {:?}",
393                            err
394                        ));
395                }
396            }
397        }).detach();
398
399        Ok(input_pipeline)
400    }
401
402    /// Gets the input device bindings.
403    pub fn input_device_bindings(&self) -> &InputDeviceBindingHashMap {
404        &self.input_device_bindings
405    }
406
407    /// Gets the input device sender: this is the channel that should be cloned
408    /// and used for injecting events from the drivers into the input pipeline.
409    pub fn input_event_sender(&self) -> &UnboundedSender<Vec<input_device::InputEvent>> {
410        &self.device_event_sender
411    }
412
413    /// Gets a list of input device types supported by this input pipeline.
414    pub fn input_device_types(&self) -> &Vec<input_device::InputDeviceType> {
415        &self.input_device_types
416    }
417
418    /// Forwards all input events into the input pipeline.
419    pub async fn handle_input_events(mut self) {
420        let metrics_logger_clone = self.metrics_logger.clone();
421        while let Some(input_event) = self.device_event_receiver.next().await {
422            if let Err(e) = self.pipeline_sender.unbounded_send(input_event) {
423                metrics_logger_clone.log_error(
424                    InputPipelineErrorMetricDimensionEvent::InputPipelineCouldNotForwardEventFromDriver,
425                    std::format!("could not forward event from driver: {:?}", &e));
426            }
427        }
428
429        metrics_logger_clone.log_error(
430            InputPipelineErrorMetricDimensionEvent::InputPipelineStopHandlingEvents,
431            "Input pipeline stopped handling input events.".to_string(),
432        );
433    }
434
435    /// Watches the input report directory for new input devices. Creates InputDeviceBindings
436    /// if new devices match a type in `device_types`.
437    ///
438    /// # Parameters
439    /// - `device_watcher`: Watches the input report directory for new devices.
440    /// - `dir_proxy`: The directory containing InputDevice connections.
441    /// - `device_types`: The types of devices to watch for.
442    /// - `input_event_sender`: The channel new InputDeviceBindings will send InputEvents to.
443    /// - `bindings`: Holds all the InputDeviceBindings
444    /// - `input_devices_node`: The parent node for all device bindings' inspect nodes.
445    /// - `break_on_idle`: If true, stops watching for devices once all existing devices are handled.
446    /// - `metrics_logger`: The metrics logger.
447    ///
448    /// # Errors
449    /// If the input report directory or a file within it cannot be read.
450    async fn watch_for_devices(
451        mut device_watcher: Watcher,
452        dir_proxy: fio::DirectoryProxy,
453        device_types: Vec<input_device::InputDeviceType>,
454        input_event_sender: UnboundedSender<Vec<input_device::InputEvent>>,
455        bindings: InputDeviceBindingHashMap,
456        input_devices_node: &fuchsia_inspect::Node,
457        break_on_idle: bool,
458        feature_flags: input_device::InputPipelineFeatureFlags,
459        metrics_logger: metrics::MetricsLogger,
460    ) -> Result<(), Error> {
461        // Add non-static properties to inspect node.
462        let devices_discovered = input_devices_node.create_uint("devices_discovered", 0);
463        let devices_connected = input_devices_node.create_uint("devices_connected", 0);
464        while let Some(msg) = device_watcher.try_next().await? {
465            if let Ok(filename) = msg.filename.into_os_string().into_string() {
466                if filename == "." {
467                    continue;
468                }
469
470                let pathbuf = PathBuf::from(filename.clone());
471                match msg.event {
472                    WatchEvent::EXISTING | WatchEvent::ADD_FILE => {
473                        log::info!("found input device {}", filename);
474                        devices_discovered.add(1);
475                        let device_proxy =
476                            input_device::get_device_from_dir_entry_path(&dir_proxy, &pathbuf)?;
477                        add_device_bindings(
478                            &device_types,
479                            &filename,
480                            device_proxy,
481                            &input_event_sender,
482                            &bindings,
483                            get_next_device_id(),
484                            input_devices_node,
485                            Some(&devices_connected),
486                            feature_flags.clone(),
487                            metrics_logger.clone(),
488                        )
489                        .await;
490                    }
491                    WatchEvent::IDLE => {
492                        if break_on_idle {
493                            break;
494                        }
495                    }
496                    _ => (),
497                }
498            }
499        }
500        // Ensure inspect properties persist for debugging if device watch loop ends.
501        input_devices_node.record(devices_discovered);
502        input_devices_node.record(devices_connected);
503        Err(format_err!("Input pipeline stopped watching for new input devices."))
504    }
505
506    /// Handles the incoming InputDeviceRegistryRequestStream.
507    ///
508    /// This method will end when the request stream is closed. If the stream closes with an
509    /// error the error will be returned in the Result.
510    ///
511    /// **NOTE**: Only one stream is handled at a time. https://fxbug.dev/42061078
512    ///
513    /// # Parameters
514    /// - `stream`: The stream of InputDeviceRegistryRequests.
515    /// - `device_types`: The types of devices to watch for.
516    /// - `input_event_sender`: The channel new InputDeviceBindings will send InputEvents to.
517    /// - `bindings`: Holds all the InputDeviceBindings associated with the InputPipeline.
518    /// - `input_devices_node`: The parent node for all injected devices' inspect nodes.
519    /// - `metrics_logger`: The metrics logger.
520    pub async fn handle_input_device_registry_request_stream(
521        mut stream: fidl_fuchsia_input_injection::InputDeviceRegistryRequestStream,
522        device_types: &Vec<input_device::InputDeviceType>,
523        input_event_sender: &UnboundedSender<Vec<input_device::InputEvent>>,
524        bindings: &InputDeviceBindingHashMap,
525        input_devices_node: &fuchsia_inspect::Node,
526        feature_flags: input_device::InputPipelineFeatureFlags,
527        metrics_logger: metrics::MetricsLogger,
528    ) -> Result<(), Error> {
529        while let Some(request) = stream
530            .try_next()
531            .await
532            .context("Error handling input device registry request stream")?
533        {
534            match request {
535                fidl_fuchsia_input_injection::InputDeviceRegistryRequest::Register {
536                    device,
537                    ..
538                } => {
539                    // Add a binding if the device is a type being tracked
540                    let device_proxy = device.into_proxy();
541
542                    let device_id = get_next_device_id();
543
544                    add_device_bindings(
545                        device_types,
546                        &format!("input-device-registry-{}", device_id),
547                        device_proxy,
548                        input_event_sender,
549                        bindings,
550                        device_id,
551                        input_devices_node,
552                        None,
553                        feature_flags.clone(),
554                        metrics_logger.clone(),
555                    )
556                    .await;
557                }
558                fidl_fuchsia_input_injection::InputDeviceRegistryRequest::RegisterAndGetDeviceInfo {
559                    device,
560                    responder,
561                    .. } => {
562                    // Add a binding if the device is a type being tracked
563                    let device_proxy = device.into_proxy();
564
565                    let device_id = get_next_device_id();
566
567                    add_device_bindings(
568                        device_types,
569                        &format!("input-device-registry-{}", device_id),
570                        device_proxy,
571                        input_event_sender,
572                        bindings,
573                        device_id,
574                        input_devices_node,
575                        None,
576                        feature_flags.clone(),
577                        metrics_logger.clone(),
578                    )
579                    .await;
580
581                    responder.send(fidl_fuchsia_input_injection::InputDeviceRegistryRegisterAndGetDeviceInfoResponse{
582                        device_id: Some(device_id),
583                        ..Default::default()
584                    }).expect("Failed to respond to RegisterAndGetDeviceInfo request");
585                }
586            }
587        }
588
589        Ok(())
590    }
591
592    /// Initializes all handlers and starts the input pipeline loop in an asynchronous executor.
593    fn run(
594        mut receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
595        handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>,
596        metrics_logger: metrics::MetricsLogger,
597    ) {
598        Dispatcher::spawn_local(async move {
599            for handler in &handlers {
600                handler.clone().set_handler_healthy();
601            }
602
603            use input_device::InputEventType;
604            use std::collections::HashMap;
605
606            // Pre-compute handler lists for each event type.
607            let mut handlers_by_type: HashMap<
608                InputEventType,
609                Vec<Rc<dyn input_handler::BatchInputHandler>>,
610            > = HashMap::new();
611
612            // TODO: b/478262850 - We can use supported_input_devices to populate this list.
613            let event_types = vec![
614                InputEventType::Keyboard,
615                InputEventType::LightSensor,
616                InputEventType::ConsumerControls,
617                InputEventType::Mouse,
618                InputEventType::TouchScreen,
619                InputEventType::Touchpad,
620                #[cfg(test)]
621                InputEventType::Fake,
622            ];
623
624            for event_type in event_types {
625                let handlers_for_type: Vec<Rc<dyn input_handler::BatchInputHandler>> = handlers
626                    .iter()
627                    .filter(|h| h.interest().contains(&event_type))
628                    .cloned()
629                    .collect();
630                handlers_by_type.insert(event_type, handlers_for_type);
631            }
632
633            while let Some(events) = receiver.next().await {
634                if events.is_empty() {
635                    continue;
636                }
637
638                let mut groups_seen = 0;
639                for (event_type, event_group) in events
640                    .into_iter()
641                    .chunk_by(|e| InputEventType::from(&e.device_event))
642                    .into_iter()
643                {
644                    groups_seen += 1;
645                    if groups_seen == 2 {
646                        metrics_logger.log_error(
647                            InputPipelineErrorMetricDimensionEvent::InputFrameContainsMultipleTypesOfEvents,
648                            "it is not recommended to contain multiple types of events in 1 send".to_string(),
649                        );
650                    }
651                    let mut events_in_group: Vec<_> = event_group.collect();
652
653                    // Get pre-computed handlers for this event type.
654                    let handlers = handlers_by_type.get(&event_type).unwrap();
655
656                    for handler in handlers {
657                        events_in_group =
658                            handler.clone().handle_input_events(events_in_group).await;
659                    }
660
661                    for event in events_in_group {
662                        if event.handled == input_device::Handled::No {
663                            log::warn!("unhandled input event: {:?}", &event);
664                        }
665                        if let Some(trace_id) = event.trace_id {
666                            fuchsia_trace::flow_end!(
667                                "input",
668                                "event_in_input_pipeline",
669                                trace_id.into()
670                            );
671                        }
672                    }
673                }
674            }
675            for handler in &handlers {
676                handler.clone().set_handler_unhealthy("Pipeline loop terminated");
677            }
678            panic!("Runner task is not supposed to terminate.")
679        }).detach();
680    }
681}
682
683/// Adds `InputDeviceBinding`s to `bindings` for all `device_types` exposed by `device_proxy`.
684///
685/// # Parameters
686/// - `device_types`: The types of devices to watch for.
687/// - `device_proxy`: A proxy to the input device.
688/// - `input_event_sender`: The channel new InputDeviceBindings will send InputEvents to.
689/// - `bindings`: Holds all the InputDeviceBindings associated with the InputPipeline.
690/// - `device_id`: The device id of the associated bindings.
691/// - `input_devices_node`: The parent node for all device bindings' inspect nodes.
692///
693/// # Note
694/// This will create multiple bindings, in the case where
695/// * `device_proxy().get_descriptor()` returns a `fidl_fuchsia_input_report::DeviceDescriptor`
696///   with multiple table fields populated, and
697/// * multiple populated table fields correspond to device types present in `device_types`
698///
699/// This is used, for example, to support the Atlas touchpad. In that case, a single
700/// node in `/dev/class/input-report` provides both a `fuchsia.input.report.MouseDescriptor` and
701/// a `fuchsia.input.report.TouchDescriptor`.
702async fn add_device_bindings(
703    device_types: &Vec<input_device::InputDeviceType>,
704    filename: &String,
705    device_proxy: fidl_fuchsia_input_report::InputDeviceProxy,
706    input_event_sender: &UnboundedSender<Vec<input_device::InputEvent>>,
707    bindings: &InputDeviceBindingHashMap,
708    device_id: u32,
709    input_devices_node: &fuchsia_inspect::Node,
710    devices_connected: Option<&fuchsia_inspect::UintProperty>,
711    feature_flags: InputPipelineFeatureFlags,
712    metrics_logger: metrics::MetricsLogger,
713) {
714    let mut matched_device_types = vec![];
715    if let Ok(descriptor) = device_proxy.get_descriptor().await {
716        for device_type in device_types {
717            if input_device::is_device_type(&descriptor, *device_type).await {
718                matched_device_types.push(device_type);
719                match devices_connected {
720                    Some(dev_connected) => {
721                        let _ = dev_connected.add(1);
722                    }
723                    None => (),
724                };
725            }
726        }
727        if matched_device_types.is_empty() {
728            log::info!(
729                "device {} did not match any supported device types: {:?}",
730                filename,
731                device_types
732            );
733            let device_node = input_devices_node.create_child(format!("{}_Unsupported", filename));
734            let mut health = fuchsia_inspect::health::Node::new(&device_node);
735            health.set_unhealthy("Unsupported device type.");
736            device_node.record(health);
737            input_devices_node.record(device_node);
738            return;
739        }
740    } else {
741        metrics_logger.clone().log_error(
742            InputPipelineErrorMetricDimensionEvent::InputPipelineNoDeviceDescriptor,
743            std::format!("cannot bind device {} without a device descriptor", filename),
744        );
745        return;
746    }
747
748    log::info!(
749        "binding {} to device types: {}",
750        filename,
751        matched_device_types
752            .iter()
753            .fold(String::new(), |device_types_string, device_type| device_types_string
754                + &format!("{:?}, ", device_type))
755    );
756
757    let mut new_bindings: Vec<BoxedInputDeviceBinding> = vec![];
758    for device_type in matched_device_types {
759        // Clone `device_proxy`, so that multiple bindings (e.g. a `MouseBinding` and a
760        // `TouchBinding`) can read data from the same `/dev/class/input-report` node.
761        //
762        // There's no conflict in having multiple bindings read from the same node,
763        // since:
764        // * each binding will create its own `fuchsia.input.report.InputReportsReader`, and
765        // * the device driver will copy each incoming report to each connected reader.
766        //
767        // This does mean that reports from the Atlas touchpad device get read twice
768        // (by a `MouseBinding` and a `TouchBinding`), regardless of whether the device
769        // is operating in mouse mode or touchpad mode.
770        //
771        // This hasn't been an issue because:
772        // * Semantically: things are fine, because each binding discards irrelevant reports.
773        //   (E.g. `MouseBinding` discards anything that isn't a `MouseInputReport`), and
774        // * Performance wise: things are fine, because the data rate of the touchpad is low
775        //   (125 HZ).
776        //
777        // If we add additional cases where bindings share an underlying `input-report` node,
778        // we might consider adding a multiplexing binding, to avoid reading duplicate reports.
779        let proxy = device_proxy.clone();
780        let device_node = input_devices_node.create_child(format!("{}_{}", filename, device_type));
781        match input_device::get_device_binding(
782            *device_type,
783            proxy,
784            device_id,
785            input_event_sender.clone(),
786            device_node,
787            feature_flags.clone(),
788            metrics_logger.clone(),
789        )
790        .await
791        {
792            Ok(binding) => new_bindings.push(binding),
793            Err(e) => {
794                metrics_logger.log_error(
795                    InputPipelineErrorMetricDimensionEvent::InputPipelineFailedToBind,
796                    std::format!("failed to bind {} as {:?}: {}", filename, device_type, e),
797                );
798            }
799        }
800    }
801
802    if !new_bindings.is_empty() {
803        let mut bindings = bindings.lock().await;
804        bindings.entry(device_id).or_insert(Vec::new()).extend(new_bindings);
805    }
806}
807
808#[cfg(test)]
809mod tests {
810    use super::*;
811    use crate::input_device::{InputDeviceBinding, InputEventType};
812    use crate::utils::Position;
813    use crate::{
814        fake_input_device_binding, mouse_binding, mouse_model_database,
815        observe_fake_events_input_handler,
816    };
817    use async_trait::async_trait;
818    use diagnostics_assertions::AnyProperty;
819    use fidl::endpoints::{create_proxy_and_stream, create_request_stream};
820    use fuchsia_async as fasync;
821    use futures::FutureExt;
822    use pretty_assertions::assert_eq;
823    use rand::Rng;
824    use std::collections::HashSet;
825    use vfs::{pseudo_directory, service as pseudo_fs_service};
826
827    const COUNTS_PER_MM: u32 = 12;
828
829    /// Returns the InputEvent sent over `sender`.
830    ///
831    /// # Parameters
832    /// - `sender`: The channel to send the InputEvent over.
833    fn send_input_event(
834        sender: UnboundedSender<Vec<input_device::InputEvent>>,
835    ) -> Vec<input_device::InputEvent> {
836        let mut rng = rand::rng();
837        let offset =
838            Position { x: rng.random_range(0..10) as f32, y: rng.random_range(0..10) as f32 };
839        let input_event = input_device::InputEvent {
840            device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
841                mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
842                    millimeters: Position {
843                        x: offset.x / COUNTS_PER_MM as f32,
844                        y: offset.y / COUNTS_PER_MM as f32,
845                    },
846                }),
847                None, /* wheel_delta_v */
848                None, /* wheel_delta_h */
849                mouse_binding::MousePhase::Move,
850                HashSet::new(),
851                HashSet::new(),
852                None, /* is_precision_scroll */
853                None, /* wake_lease */
854            )),
855            device_descriptor: input_device::InputDeviceDescriptor::Mouse(
856                mouse_binding::MouseDeviceDescriptor {
857                    device_id: 1,
858                    absolute_x_range: None,
859                    absolute_y_range: None,
860                    wheel_v_range: None,
861                    wheel_h_range: None,
862                    buttons: None,
863                    counts_per_mm: COUNTS_PER_MM,
864                },
865            ),
866            event_time: zx::MonotonicInstant::get(),
867            handled: input_device::Handled::No,
868            trace_id: None,
869        };
870        match sender.unbounded_send(vec![input_event.clone()]) {
871            Err(_) => assert!(false),
872            _ => {}
873        }
874
875        vec![input_event]
876    }
877
878    /// Returns a MouseDescriptor on an InputDeviceRequest.
879    ///
880    /// # Parameters
881    /// - `input_device_request`: The request to handle.
882    fn handle_input_device_request(
883        input_device_request: fidl_fuchsia_input_report::InputDeviceRequest,
884    ) {
885        match input_device_request {
886            fidl_fuchsia_input_report::InputDeviceRequest::GetDescriptor { responder } => {
887                let _ = responder.send(&fidl_fuchsia_input_report::DeviceDescriptor {
888                    device_information: None,
889                    mouse: Some(fidl_fuchsia_input_report::MouseDescriptor {
890                        input: Some(fidl_fuchsia_input_report::MouseInputDescriptor {
891                            movement_x: None,
892                            movement_y: None,
893                            scroll_v: None,
894                            scroll_h: None,
895                            buttons: Some(vec![0]),
896                            position_x: None,
897                            position_y: None,
898                            ..Default::default()
899                        }),
900                        ..Default::default()
901                    }),
902                    sensor: None,
903                    touch: None,
904                    keyboard: None,
905                    consumer_control: None,
906                    ..Default::default()
907                });
908            }
909            _ => {}
910        }
911    }
912
913    /// Tests that an input pipeline handles events from multiple devices.
914    #[fasync::run_singlethreaded(test)]
915    async fn multiple_devices_single_handler() {
916        // Create two fake device bindings.
917        let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
918        let first_device_binding =
919            fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
920        let second_device_binding =
921            fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
922
923        // Create a fake input handler.
924        let (handler_event_sender, mut handler_event_receiver) =
925            futures::channel::mpsc::channel(100);
926        let input_handler = observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
927            handler_event_sender,
928        );
929
930        // Build the input pipeline.
931        let (sender, receiver, handlers, _, _, _) =
932            InputPipelineAssembly::new(metrics::MetricsLogger::default())
933                .add_handler(input_handler)
934                .into_components();
935        let inspector = fuchsia_inspect::Inspector::default();
936        let test_node = inspector.root().create_child("input_pipeline");
937        let input_pipeline = InputPipeline {
938            pipeline_sender: sender,
939            device_event_sender,
940            device_event_receiver,
941            input_device_types: vec![],
942            input_device_bindings: Arc::new(Mutex::new(HashMap::new())),
943            inspect_node: test_node,
944            metrics_logger: metrics::MetricsLogger::default(),
945            feature_flags: input_device::InputPipelineFeatureFlags::default(),
946        };
947        InputPipeline::run(receiver, handlers, metrics::MetricsLogger::default());
948
949        // Send an input event from each device.
950        let first_device_events = send_input_event(first_device_binding.input_event_sender());
951        let second_device_events = send_input_event(second_device_binding.input_event_sender());
952
953        // Run the pipeline.
954        fasync::Task::local(async {
955            input_pipeline.handle_input_events().await;
956        })
957        .detach();
958
959        // Assert the handler receives the events.
960        let first_handled_event = handler_event_receiver.next().await;
961        assert_eq!(first_handled_event, first_device_events.into_iter().next());
962
963        let second_handled_event = handler_event_receiver.next().await;
964        assert_eq!(second_handled_event, second_device_events.into_iter().next());
965    }
966
967    /// Tests that an input pipeline handles events through multiple input handlers.
968    #[fasync::run_singlethreaded(test)]
969    async fn single_device_multiple_handlers() {
970        // Create two fake device bindings.
971        let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
972        let input_device_binding =
973            fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
974
975        // Create two fake input handlers.
976        let (first_handler_event_sender, mut first_handler_event_receiver) =
977            futures::channel::mpsc::channel(100);
978        let first_input_handler =
979            observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
980                first_handler_event_sender,
981            );
982        let (second_handler_event_sender, mut second_handler_event_receiver) =
983            futures::channel::mpsc::channel(100);
984        let second_input_handler =
985            observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
986                second_handler_event_sender,
987            );
988
989        // Build the input pipeline.
990        let (sender, receiver, handlers, _, _, _) =
991            InputPipelineAssembly::new(metrics::MetricsLogger::default())
992                .add_handler(first_input_handler)
993                .add_handler(second_input_handler)
994                .into_components();
995        let inspector = fuchsia_inspect::Inspector::default();
996        let test_node = inspector.root().create_child("input_pipeline");
997        let input_pipeline = InputPipeline {
998            pipeline_sender: sender,
999            device_event_sender,
1000            device_event_receiver,
1001            input_device_types: vec![],
1002            input_device_bindings: Arc::new(Mutex::new(HashMap::new())),
1003            inspect_node: test_node,
1004            metrics_logger: metrics::MetricsLogger::default(),
1005            feature_flags: input_device::InputPipelineFeatureFlags::default(),
1006        };
1007        InputPipeline::run(receiver, handlers, metrics::MetricsLogger::default());
1008
1009        // Send an input event.
1010        let input_events = send_input_event(input_device_binding.input_event_sender());
1011
1012        // Run the pipeline.
1013        fasync::Task::local(async {
1014            input_pipeline.handle_input_events().await;
1015        })
1016        .detach();
1017
1018        // Assert both handlers receive the event.
1019        let expected_event = input_events.into_iter().next();
1020        let first_handler_event = first_handler_event_receiver.next().await;
1021        assert_eq!(first_handler_event, expected_event);
1022        let second_handler_event = second_handler_event_receiver.next().await;
1023        assert_eq!(second_handler_event, expected_event);
1024    }
1025
1026    /// Tests that a single mouse device binding is created for the one input device in the
1027    /// input report directory.
1028    #[fasync::run_singlethreaded(test)]
1029    async fn watch_devices_one_match_exists() {
1030        // Create a file in a pseudo directory that represents an input device.
1031        let mut count: i8 = 0;
1032        let dir = pseudo_directory! {
1033            "file_name" => pseudo_fs_service::host(
1034                move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1035                    async move {
1036                        while count < 3 {
1037                            if let Some(input_device_request) =
1038                                request_stream.try_next().await.unwrap()
1039                            {
1040                                handle_input_device_request(input_device_request);
1041                                count += 1;
1042                            }
1043                        }
1044
1045                    }.boxed()
1046                },
1047            )
1048        };
1049
1050        // Create a Watcher on the pseudo directory.
1051        let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1052        let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1053        // Get a proxy to the pseudo directory for the input pipeline. The input pipeline uses this
1054        // proxy to get connections to input devices.
1055        let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1056
1057        let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1058        let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1059        let supported_device_types = vec![input_device::InputDeviceType::Mouse];
1060
1061        let inspector = fuchsia_inspect::Inspector::default();
1062        let test_node = inspector.root().create_child("input_pipeline");
1063        test_node.record_string(
1064            "supported_input_devices",
1065            supported_device_types.clone().iter().join(", "),
1066        );
1067        let input_devices = test_node.create_child("input_devices");
1068        // Assert that inspect tree is initialized with no devices.
1069        diagnostics_assertions::assert_data_tree!(inspector, root: {
1070            input_pipeline: {
1071                supported_input_devices: "Mouse",
1072                input_devices: {}
1073            }
1074        });
1075
1076        let _ = InputPipeline::watch_for_devices(
1077            device_watcher,
1078            dir_proxy_for_pipeline,
1079            supported_device_types,
1080            input_event_sender,
1081            bindings.clone(),
1082            &input_devices,
1083            true, /* break_on_idle */
1084            InputPipelineFeatureFlags { enable_merge_touch_events: false },
1085            metrics::MetricsLogger::default(),
1086        )
1087        .await;
1088
1089        // Assert that one mouse device with accurate device id was found.
1090        let bindings_hashmap = bindings.lock().await;
1091        assert_eq!(bindings_hashmap.len(), 1);
1092        let bindings_vector = bindings_hashmap.get(&10);
1093        assert!(bindings_vector.is_some());
1094        assert_eq!(bindings_vector.unwrap().len(), 1);
1095        let boxed_mouse_binding = bindings_vector.unwrap().get(0);
1096        assert!(boxed_mouse_binding.is_some());
1097        assert_eq!(
1098            boxed_mouse_binding.unwrap().get_device_descriptor(),
1099            input_device::InputDeviceDescriptor::Mouse(mouse_binding::MouseDeviceDescriptor {
1100                device_id: 10,
1101                absolute_x_range: None,
1102                absolute_y_range: None,
1103                wheel_v_range: None,
1104                wheel_h_range: None,
1105                buttons: Some(vec![0]),
1106                counts_per_mm: mouse_model_database::db::DEFAULT_COUNTS_PER_MM,
1107            })
1108        );
1109
1110        // Assert that inspect tree reflects new device discovered and connected.
1111        diagnostics_assertions::assert_data_tree!(inspector, root: {
1112            input_pipeline: {
1113                supported_input_devices: "Mouse",
1114                input_devices: {
1115                    devices_discovered: 1u64,
1116                    devices_connected: 1u64,
1117                    "file_name_Mouse": contains {
1118                        reports_received_count: 0u64,
1119                        reports_filtered_count: 0u64,
1120                        events_generated: 0u64,
1121                        last_received_timestamp_ns: 0u64,
1122                        last_generated_timestamp_ns: 0u64,
1123                        "fuchsia.inspect.Health": {
1124                            status: "OK",
1125                            // Timestamp value is unpredictable and not relevant in this context,
1126                            // so we only assert that the property is present.
1127                            start_timestamp_nanos: AnyProperty
1128                        },
1129                    }
1130                }
1131            }
1132        });
1133    }
1134
1135    /// Tests that no device bindings are created because the input pipeline looks for keyboard devices
1136    /// but only a mouse exists.
1137    #[fasync::run_singlethreaded(test)]
1138    async fn watch_devices_no_matches_exist() {
1139        // Create a file in a pseudo directory that represents an input device.
1140        let mut count: i8 = 0;
1141        let dir = pseudo_directory! {
1142            "file_name" => pseudo_fs_service::host(
1143                move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1144                    async move {
1145                        while count < 1 {
1146                            if let Some(input_device_request) =
1147                                request_stream.try_next().await.unwrap()
1148                            {
1149                                handle_input_device_request(input_device_request);
1150                                count += 1;
1151                            }
1152                        }
1153
1154                    }.boxed()
1155                },
1156            )
1157        };
1158
1159        // Create a Watcher on the pseudo directory.
1160        let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1161        let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1162        // Get a proxy to the pseudo directory for the input pipeline. The input pipeline uses this
1163        // proxy to get connections to input devices.
1164        let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1165
1166        let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1167        let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1168        let supported_device_types = vec![input_device::InputDeviceType::Keyboard];
1169
1170        let inspector = fuchsia_inspect::Inspector::default();
1171        let test_node = inspector.root().create_child("input_pipeline");
1172        test_node.record_string(
1173            "supported_input_devices",
1174            supported_device_types.clone().iter().join(", "),
1175        );
1176        let input_devices = test_node.create_child("input_devices");
1177        // Assert that inspect tree is initialized with no devices.
1178        diagnostics_assertions::assert_data_tree!(inspector, root: {
1179            input_pipeline: {
1180                supported_input_devices: "Keyboard",
1181                input_devices: {}
1182            }
1183        });
1184
1185        let _ = InputPipeline::watch_for_devices(
1186            device_watcher,
1187            dir_proxy_for_pipeline,
1188            supported_device_types,
1189            input_event_sender,
1190            bindings.clone(),
1191            &input_devices,
1192            true, /* break_on_idle */
1193            InputPipelineFeatureFlags { enable_merge_touch_events: false },
1194            metrics::MetricsLogger::default(),
1195        )
1196        .await;
1197
1198        // Assert that no devices were found.
1199        let bindings = bindings.lock().await;
1200        assert_eq!(bindings.len(), 0);
1201
1202        // Assert that inspect tree reflects new device discovered, but not connected.
1203        diagnostics_assertions::assert_data_tree!(inspector, root: {
1204            input_pipeline: {
1205                supported_input_devices: "Keyboard",
1206                input_devices: {
1207                    devices_discovered: 1u64,
1208                    devices_connected: 0u64,
1209                    "file_name_Unsupported": {
1210                        "fuchsia.inspect.Health": {
1211                            status: "UNHEALTHY",
1212                            message: "Unsupported device type.",
1213                            // Timestamp value is unpredictable and not relevant in this context,
1214                            // so we only assert that the property is present.
1215                            start_timestamp_nanos: AnyProperty
1216                        },
1217                    }
1218                }
1219            }
1220        });
1221    }
1222
1223    /// Tests that a single keyboard device binding is created for the input device registered
1224    /// through InputDeviceRegistry.
1225    #[fasync::run_singlethreaded(test)]
1226    async fn handle_input_device_registry_request_stream() {
1227        let (input_device_registry_proxy, input_device_registry_request_stream) =
1228            create_proxy_and_stream::<fidl_fuchsia_input_injection::InputDeviceRegistryMarker>();
1229        let (input_device_client_end, mut input_device_request_stream) =
1230            create_request_stream::<fidl_fuchsia_input_report::InputDeviceMarker>();
1231
1232        let device_types = vec![input_device::InputDeviceType::Mouse];
1233        let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1234        let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1235
1236        // Handle input device requests.
1237        let mut count: i8 = 0;
1238        fasync::Task::local(async move {
1239            // Register a device.
1240            let _ = input_device_registry_proxy.register(input_device_client_end);
1241
1242            while count < 3 {
1243                if let Some(input_device_request) =
1244                    input_device_request_stream.try_next().await.unwrap()
1245                {
1246                    handle_input_device_request(input_device_request);
1247                    count += 1;
1248                }
1249            }
1250
1251            // End handle_input_device_registry_request_stream() by taking the event stream.
1252            input_device_registry_proxy.take_event_stream();
1253        })
1254        .detach();
1255
1256        let inspector = fuchsia_inspect::Inspector::default();
1257        let test_node = inspector.root().create_child("input_pipeline");
1258
1259        // Start listening for InputDeviceRegistryRequests.
1260        let bindings_clone = bindings.clone();
1261        let _ = InputPipeline::handle_input_device_registry_request_stream(
1262            input_device_registry_request_stream,
1263            &device_types,
1264            &input_event_sender,
1265            &bindings_clone,
1266            &test_node,
1267            InputPipelineFeatureFlags { enable_merge_touch_events: false },
1268            metrics::MetricsLogger::default(),
1269        )
1270        .await;
1271
1272        // Assert that a device was registered.
1273        let bindings = bindings.lock().await;
1274        assert_eq!(bindings.len(), 1);
1275    }
1276
1277    // Tests that correct properties are added to inspect node when InputPipeline is created.
1278    #[fasync::run_singlethreaded(test)]
1279    async fn check_inspect_node_has_correct_properties() {
1280        let device_types = vec![
1281            input_device::InputDeviceType::Touch,
1282            input_device::InputDeviceType::ConsumerControls,
1283        ];
1284        let inspector = fuchsia_inspect::Inspector::default();
1285        let test_node = inspector.root().create_child("input_pipeline");
1286        // Create fake input handler for assembly
1287        let (fake_handler_event_sender, _fake_handler_event_receiver) =
1288            futures::channel::mpsc::channel(100);
1289        let fake_input_handler =
1290            observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
1291                fake_handler_event_sender,
1292            );
1293        let assembly = InputPipelineAssembly::new(metrics::MetricsLogger::default())
1294            .add_handler(fake_input_handler);
1295        let _test_input_pipeline = InputPipeline::new(
1296            &Incoming::new(),
1297            device_types,
1298            assembly,
1299            test_node,
1300            InputPipelineFeatureFlags { enable_merge_touch_events: false },
1301            metrics::MetricsLogger::default(),
1302        );
1303        diagnostics_assertions::assert_data_tree!(inspector, root: {
1304            input_pipeline: {
1305                supported_input_devices: "Touch, ConsumerControls",
1306                handlers_registered: 1u64,
1307                handlers_healthy: 1u64,
1308                input_devices: {}
1309            }
1310        });
1311    }
1312
1313    struct SpecificInterestFakeHandler {
1314        interest_types: Vec<input_device::InputEventType>,
1315        event_sender: std::cell::RefCell<futures::channel::mpsc::Sender<input_device::InputEvent>>,
1316    }
1317
1318    impl SpecificInterestFakeHandler {
1319        pub fn new(
1320            interest_types: Vec<input_device::InputEventType>,
1321            event_sender: futures::channel::mpsc::Sender<input_device::InputEvent>,
1322        ) -> Rc<Self> {
1323            Rc::new(SpecificInterestFakeHandler {
1324                interest_types,
1325                event_sender: std::cell::RefCell::new(event_sender),
1326            })
1327        }
1328    }
1329
1330    impl Handler for SpecificInterestFakeHandler {
1331        fn set_handler_healthy(self: std::rc::Rc<Self>) {}
1332        fn set_handler_unhealthy(self: std::rc::Rc<Self>, _msg: &str) {}
1333        fn get_name(&self) -> &'static str {
1334            "SpecificInterestFakeHandler"
1335        }
1336
1337        fn interest(&self) -> Vec<input_device::InputEventType> {
1338            self.interest_types.clone()
1339        }
1340    }
1341
1342    #[async_trait(?Send)]
1343    impl input_handler::InputHandler for SpecificInterestFakeHandler {
1344        async fn handle_input_event(
1345            self: Rc<Self>,
1346            input_event: input_device::InputEvent,
1347        ) -> Vec<input_device::InputEvent> {
1348            match self.event_sender.borrow_mut().try_send(input_event.clone()) {
1349                Err(e) => panic!("SpecificInterestFakeHandler failed to send event: {:?}", e),
1350                Ok(_) => {}
1351            }
1352            vec![input_event]
1353        }
1354    }
1355
1356    #[fasync::run_singlethreaded(test)]
1357    async fn run_only_sends_events_to_interested_handlers() {
1358        // Mouse Handler (Specific Interest: Mouse)
1359        let (mouse_sender, mut mouse_receiver) = futures::channel::mpsc::channel(1);
1360        let mouse_handler =
1361            SpecificInterestFakeHandler::new(vec![InputEventType::Mouse], mouse_sender);
1362
1363        // Fake Handler (Specific Interest: Fake)
1364        let (fake_sender, mut fake_receiver) = futures::channel::mpsc::channel(1);
1365        let fake_handler =
1366            SpecificInterestFakeHandler::new(vec![InputEventType::Fake], fake_sender);
1367
1368        let (pipeline_sender, pipeline_receiver, handlers, _, _, _) =
1369            InputPipelineAssembly::new(metrics::MetricsLogger::default())
1370                .add_handler(mouse_handler)
1371                .add_handler(fake_handler)
1372                .into_components();
1373
1374        // Run the pipeline logic
1375        InputPipeline::run(pipeline_receiver, handlers, metrics::MetricsLogger::default());
1376
1377        // Create a Fake event
1378        let fake_event = input_device::InputEvent {
1379            device_event: input_device::InputDeviceEvent::Fake,
1380            device_descriptor: input_device::InputDeviceDescriptor::Fake,
1381            event_time: zx::MonotonicInstant::get(),
1382            handled: input_device::Handled::No,
1383            trace_id: None,
1384        };
1385
1386        // Send the Fake event
1387        pipeline_sender.unbounded_send(vec![fake_event.clone()]).expect("failed to send event");
1388
1389        // Verify Fake Handler received it
1390        let received_by_fake = fake_receiver.next().await;
1391        assert_eq!(received_by_fake, Some(fake_event));
1392
1393        // Verify Mouse Handler did NOT receive it
1394        assert!(mouse_receiver.try_next().is_err());
1395    }
1396
1397    fn create_mouse_event(x: f32, y: f32) -> input_device::InputEvent {
1398        input_device::InputEvent {
1399            device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
1400                mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
1401                    millimeters: Position { x, y },
1402                }),
1403                None,
1404                None,
1405                mouse_binding::MousePhase::Move,
1406                HashSet::new(),
1407                HashSet::new(),
1408                None,
1409                None,
1410            )),
1411            device_descriptor: input_device::InputDeviceDescriptor::Mouse(
1412                mouse_binding::MouseDeviceDescriptor {
1413                    device_id: 1,
1414                    absolute_x_range: None,
1415                    absolute_y_range: None,
1416                    wheel_v_range: None,
1417                    wheel_h_range: None,
1418                    buttons: None,
1419                    counts_per_mm: 1,
1420                },
1421            ),
1422            event_time: zx::MonotonicInstant::get(),
1423            handled: input_device::Handled::No,
1424            trace_id: None,
1425        }
1426    }
1427
1428    #[fasync::run_singlethreaded(test)]
1429    async fn run_mixed_event_types_dispatched_correctly() {
1430        // Mouse Handler (Specific Interest: Mouse)
1431        let (mouse_sender, mut mouse_receiver) = futures::channel::mpsc::channel(10);
1432        let mouse_handler =
1433            SpecificInterestFakeHandler::new(vec![InputEventType::Mouse], mouse_sender);
1434
1435        // Fake Handler (Specific Interest: Fake)
1436        let (fake_sender, mut fake_receiver) = futures::channel::mpsc::channel(10);
1437        let fake_handler =
1438            SpecificInterestFakeHandler::new(vec![InputEventType::Fake], fake_sender);
1439
1440        let (pipeline_sender, pipeline_receiver, handlers, _, _, _) =
1441            InputPipelineAssembly::new(metrics::MetricsLogger::default())
1442                .add_handler(mouse_handler)
1443                .add_handler(fake_handler)
1444                .into_components();
1445
1446        // Run the pipeline logic
1447        InputPipeline::run(pipeline_receiver, handlers, metrics::MetricsLogger::default());
1448
1449        // Create events
1450        let mouse_event_1 = create_mouse_event(1.0, 1.0);
1451        let mouse_event_2 = create_mouse_event(2.0, 2.0);
1452        let mouse_event_3 = create_mouse_event(3.0, 3.0);
1453
1454        let fake_event_1 = input_device::InputEvent {
1455            device_event: input_device::InputDeviceEvent::Fake,
1456            device_descriptor: input_device::InputDeviceDescriptor::Fake,
1457            event_time: zx::MonotonicInstant::get(),
1458            handled: input_device::Handled::No,
1459            trace_id: None,
1460        };
1461
1462        // Send mixed batch: [Mouse, Mouse, Fake, Mouse]
1463        // This should result in 3 chunks: [Mouse, Mouse], [Fake], [Mouse]
1464        let mixed_batch = vec![
1465            mouse_event_1.clone(),
1466            mouse_event_2.clone(),
1467            fake_event_1.clone(),
1468            mouse_event_3.clone(),
1469        ];
1470        pipeline_sender.unbounded_send(mixed_batch).expect("failed to send events");
1471
1472        // Verify Mouse Handler received M1, M2, and then M3
1473        assert_eq!(mouse_receiver.next().await, Some(mouse_event_1));
1474        assert_eq!(mouse_receiver.next().await, Some(mouse_event_2));
1475        assert_eq!(mouse_receiver.next().await, Some(mouse_event_3));
1476
1477        // Verify Fake Handler received F1
1478        assert_eq!(fake_receiver.next().await, Some(fake_event_1));
1479    }
1480}