Skip to main content

input_pipeline_dso/
input_pipeline.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::display_ownership::DisplayOwnership;
6use crate::focus_listener::FocusListener;
7use crate::input_device::InputPipelineFeatureFlags;
8use crate::input_handler::Handler;
9use crate::{Dispatcher, Incoming, Transport, input_device, input_handler, metrics};
10use anyhow::{Context, Error, format_err};
11use fidl::endpoints;
12use fidl_fuchsia_io as fio;
13use focus_chain_provider::FocusChainProviderPublisher;
14use fuchsia_async as fasync;
15use fuchsia_component::directory::AsRefDirectory;
16use fuchsia_fs::directory::{WatchEvent, Watcher};
17use fuchsia_inspect::NumericProperty;
18use fuchsia_inspect::health::Reporter;
19use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
20use futures::future::LocalBoxFuture;
21use futures::lock::Mutex;
22use futures::{StreamExt, TryStreamExt};
23use itertools::Itertools;
24use metrics_registry::*;
25use sorted_vec_map::SortedVecMap;
26use std::path::PathBuf;
27use std::rc::Rc;
28use std::sync::atomic::{AtomicU32, Ordering};
29use std::sync::{Arc, LazyLock};
30use strum::EnumCount;
31
32/// Use a self incremental u32 unique id for device_id.
33///
34/// device id start from 10 to avoid conflict with default devices in Starnix.
35/// Currently, Starnix using 0 and 1 as default devices' id. Starnix need to
36/// use default devices to deliver events from physical devices until we have
37/// API to expose device changes to UI clients.
38static NEXT_DEVICE_ID: LazyLock<AtomicU32> = LazyLock::new(|| AtomicU32::new(10));
39
40/// Each time this function is invoked, it returns the current value of its
41/// internal counter (serving as a unique id for device_id) and then increments
42/// that counter in preparation for the next call.
43fn get_next_device_id() -> u32 {
44    NEXT_DEVICE_ID.fetch_add(1, Ordering::SeqCst)
45}
46
47type BoxedInputDeviceBinding = Box<dyn input_device::InputDeviceBinding>;
48
49/// An [`InputDeviceBindingMap`] maps an input device to one or more InputDeviceBindings.
50/// It uses unique device id as key.
51pub type InputDeviceBindingMap = Arc<Mutex<SortedVecMap<u32, Vec<BoxedInputDeviceBinding>>>>;
52
53/// An input pipeline assembly.
54///
55/// Represents a partial stage of the input pipeline which accepts inputs through an asynchronous
56/// sender channel, and emits outputs through an asynchronous receiver channel.  Use [new] to
57/// create a new assembly.  Use [add_handler], or [add_all_handlers] to add the input pipeline
58/// handlers to use.  When done, [InputPipeline::new] can be used to make a new input pipeline.
59///
60/// # Implementation notes
61///
62/// Internally, when a new [InputPipelineAssembly] is created with multiple [InputHandler]s, the
63/// handlers are connected together using async queues.  This allows fully streamed processing of
64/// input events, and also allows some pipeline stages to generate events spontaneously, i.e.
65/// without an external stimulus.
66pub struct InputPipelineAssembly {
67    /// The top-level sender: send into this queue to inject an event into the input
68    /// pipeline.
69    sender: UnboundedSender<Vec<input_device::InputEvent>>,
70    /// The bottom-level receiver: any events that fall through the entire pipeline can
71    /// be read from this receiver.
72    receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
73
74    /// The input handlers that comprise the input pipeline.
75    handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>,
76
77    /// The display ownership watcher task.
78    display_ownership_fut: Option<LocalBoxFuture<'static, ()>>,
79
80    /// The focus listener task.
81    focus_listener_fut: Option<LocalBoxFuture<'static, ()>>,
82
83    /// The metrics logger.
84    metrics_logger: metrics::MetricsLogger,
85}
86
87impl InputPipelineAssembly {
88    /// Create a new but empty [InputPipelineAssembly]. Use [add_handler] or similar
89    /// to add new handlers to it.
90    pub fn new(metrics_logger: metrics::MetricsLogger) -> Self {
91        let (sender, receiver) = mpsc::unbounded();
92        InputPipelineAssembly {
93            sender,
94            receiver,
95            handlers: vec![],
96            metrics_logger,
97            display_ownership_fut: None,
98            focus_listener_fut: None,
99        }
100    }
101
102    /// Adds another [input_handler::BatchInputHandler] into the [InputPipelineAssembly]. The handlers
103    /// are invoked in the order they are added. Returns `Self` for chaining.
104    pub fn add_handler(mut self, handler: Rc<dyn input_handler::BatchInputHandler>) -> Self {
105        self.handlers.push(handler);
106        self
107    }
108
109    /// Adds all handlers into the assembly in the order they appear in `handlers`.
110    pub fn add_all_handlers(self, handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>) -> Self {
111        handlers.into_iter().fold(self, |assembly, handler| assembly.add_handler(handler))
112    }
113
114    pub fn add_display_ownership(
115        mut self,
116        display_ownership_event: zx::Event,
117        input_handlers_node: &fuchsia_inspect::Node,
118    ) -> InputPipelineAssembly {
119        let h = DisplayOwnership::new(
120            display_ownership_event,
121            input_handlers_node,
122            self.metrics_logger.clone(),
123        );
124        let metrics_logger_clone = self.metrics_logger.clone();
125        let h_clone = h.clone();
126        let sender_clone = self.sender.clone();
127        let display_ownership_fut = Box::pin(async move {
128            h_clone.clone().set_handler_healthy();
129            h_clone.clone()
130                .handle_ownership_change(sender_clone)
131                .await
132                .map_err(|e| {
133                    metrics_logger_clone.log_error(
134                        InputPipelineErrorMetricDimensionEvent::InputPipelineDisplayOwnershipIsNotSupposedToTerminate,
135                        std::format!(
136                            "display ownership is not supposed to terminate - this is likely a problem: {:?}", e));
137                        })
138                        .unwrap();
139            h_clone.set_handler_unhealthy("Receive loop terminated for handler: DisplayOwnership");
140        });
141        self.display_ownership_fut = Some(display_ownership_fut);
142        self.add_handler(h)
143    }
144
145    /// Deconstructs the assembly into constituent components, used when constructing
146    /// [InputPipeline].
147    ///
148    /// You should call [catch_unhandled] on the returned [async_channel::Receiver], and
149    /// [run] on the returned [fuchsia_async::Tasks] (or supply own equivalents).
150    fn into_components(
151        self,
152    ) -> (
153        UnboundedSender<Vec<input_device::InputEvent>>,
154        UnboundedReceiver<Vec<input_device::InputEvent>>,
155        Vec<Rc<dyn input_handler::BatchInputHandler>>,
156        metrics::MetricsLogger,
157        Option<LocalBoxFuture<'static, ()>>,
158        Option<LocalBoxFuture<'static, ()>>,
159    ) {
160        (
161            self.sender,
162            self.receiver,
163            self.handlers,
164            self.metrics_logger,
165            self.display_ownership_fut,
166            self.focus_listener_fut,
167        )
168    }
169
170    pub fn add_focus_listener(
171        mut self,
172        incoming: &Incoming,
173        focus_chain_publisher: FocusChainProviderPublisher,
174    ) -> Self {
175        let metrics_logger_clone = self.metrics_logger.clone();
176        let incoming2 = incoming.clone();
177        let focus_listener_fut = Box::pin(async move {
178            if let Ok(mut focus_listener) = FocusListener::new(
179                &incoming2,
180                focus_chain_publisher,
181                metrics_logger_clone,
182            )
183            .map_err(|e| {
184                log::warn!("could not create focus listener, focus will not be dispatched: {:?}", e)
185            }) {
186                // This will await indefinitely and process focus messages in a loop, unless there
187                // is a problem.
188                let _result = focus_listener
189                    .dispatch_focus_changes()
190                    .await
191                    .map(|_| {
192                        log::warn!("dispatch focus loop ended, focus will no longer be dispatched")
193                    })
194                    .map_err(|e| {
195                        panic!("could not dispatch focus changes, this is a fatal error: {:?}", e)
196                    });
197            }
198        });
199        self.focus_listener_fut = Some(focus_listener_fut);
200        self
201    }
202}
203
204/// An [`InputPipeline`] manages input devices and propagates input events through input handlers.
205///
206/// On creation, clients declare what types of input devices an [`InputPipeline`] manages. The
207/// [`InputPipeline`] will continuously detect new input devices of supported type(s).
208///
209/// # Example
210/// ```
211/// let ime_handler =
212///     ImeHandler::new(scene_manager.session.clone(), scene_manager.compositor_id).await?;
213/// let touch_handler = TouchHandler::new(
214///     scene_manager.session.clone(),
215///     scene_manager.compositor_id,
216///     scene_manager.display_size
217/// ).await?;
218///
219/// let assembly = InputPipelineAssembly::new()
220///     .add_handler(Box::new(ime_handler)),
221///     .add_handler(Box::new(touch_handler)),
222/// let input_pipeline = InputPipeline::new(
223///     vec![
224///         input_device::InputDeviceType::Touch,
225///         input_device::InputDeviceType::Keyboard,
226///     ],
227///     assembly,
228/// );
229/// input_pipeline.handle_input_events().await;
230/// ```
231pub struct InputPipeline {
232    /// The entry point into the input handler pipeline. Incoming input events should
233    /// be inserted into this async queue, and the input pipeline will ensure that they
234    /// are propagated through all the input handlers in the appropriate sequence.
235    pipeline_sender: UnboundedSender<Vec<input_device::InputEvent>>,
236
237    /// A clone of this sender is given to every InputDeviceBinding that this pipeline owns.
238    /// Each InputDeviceBinding will send InputEvents to the pipeline through this channel.
239    device_event_sender: UnboundedSender<Vec<input_device::InputEvent>>,
240
241    /// Receives InputEvents from all InputDeviceBindings that this pipeline owns.
242    device_event_receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
243
244    /// The types of devices this pipeline supports.
245    input_device_types: Vec<input_device::InputDeviceType>,
246
247    /// The InputDeviceBindings bound to this pipeline.
248    input_device_bindings: InputDeviceBindingMap,
249
250    /// This node is bound to the lifetime of this InputPipeline.
251    /// Inspect data will be dumped for this pipeline as long as it exists.
252    inspect_node: fuchsia_inspect::Node,
253
254    /// The metrics logger.
255    metrics_logger: metrics::MetricsLogger,
256
257    /// The feature flags for the input pipeline.
258    pub feature_flags: input_device::InputPipelineFeatureFlags,
259}
260
261impl InputPipeline {
262    fn new_common(
263        input_device_types: Vec<input_device::InputDeviceType>,
264        assembly: InputPipelineAssembly,
265        inspect_node: fuchsia_inspect::Node,
266        feature_flags: input_device::InputPipelineFeatureFlags,
267    ) -> Self {
268        let (
269            pipeline_sender,
270            receiver,
271            handlers,
272            metrics_logger,
273            display_ownership_fut,
274            focus_listener_fut,
275        ) = assembly.into_components();
276
277        let mut handlers_count = handlers.len();
278        // TODO: b/469745447 - should use futures::select! instead of Task::local().detach().
279        if let Some(fut) = display_ownership_fut {
280            fasync::Task::local(fut).detach();
281            handlers_count += 1;
282        }
283
284        // TODO: b/469745447 - should use futures::select! instead of Task::local().detach().
285        if let Some(fut) = focus_listener_fut {
286            fasync::Task::local(fut).detach();
287            handlers_count += 1;
288        }
289
290        // Add properties to inspect node
291        inspect_node.record_string("supported_input_devices", input_device_types.iter().join(", "));
292        inspect_node.record_uint("handlers_registered", handlers_count as u64);
293        inspect_node.record_uint("handlers_healthy", handlers_count as u64);
294
295        // Initializes all handlers and starts the input pipeline loop.
296        InputPipeline::run(receiver, handlers, metrics_logger.clone());
297
298        let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
299        let input_device_bindings: InputDeviceBindingMap =
300            Arc::new(Mutex::new(SortedVecMap::new()));
301        InputPipeline {
302            pipeline_sender,
303            device_event_sender,
304            device_event_receiver,
305            input_device_types,
306            input_device_bindings,
307            inspect_node,
308            metrics_logger,
309            feature_flags,
310        }
311    }
312
313    /// Creates a new [`InputPipeline`] for integration testing.
314    /// Unlike a production input pipeline, this pipeline will not monitor
315    /// `/dev/class/input-report` for devices.
316    ///
317    /// # Parameters
318    /// - `input_device_types`: The types of devices the new [`InputPipeline`] will support.
319    /// - `assembly`: The input handlers that the [`InputPipeline`] sends InputEvents to.
320    pub fn new_for_test(
321        input_device_types: Vec<input_device::InputDeviceType>,
322        assembly: InputPipelineAssembly,
323    ) -> Self {
324        let inspector = fuchsia_inspect::Inspector::default();
325        let root = inspector.root();
326        let test_node = root.create_child("input_pipeline");
327        Self::new_common(
328            input_device_types,
329            assembly,
330            test_node,
331            input_device::InputPipelineFeatureFlags { enable_merge_touch_events: false },
332        )
333    }
334
335    /// Creates a new [`InputPipeline`] for production use.
336    ///
337    /// # Parameters
338    /// - `input_device_types`: The types of devices the new [`InputPipeline`] will support.
339    /// - `assembly`: The input handlers that the [`InputPipeline`] sends InputEvents to.
340    /// - `inspect_node`: The root node for InputPipeline's Inspect tree
341    pub fn new(
342        incoming: &Incoming,
343        input_device_types: Vec<input_device::InputDeviceType>,
344        assembly: InputPipelineAssembly,
345        inspect_node: fuchsia_inspect::Node,
346        feature_flags: input_device::InputPipelineFeatureFlags,
347        metrics_logger: metrics::MetricsLogger,
348    ) -> Result<Self, Error> {
349        let input_pipeline =
350            Self::new_common(input_device_types, assembly, inspect_node, feature_flags);
351        let input_device_types = input_pipeline.input_device_types.clone();
352        let input_event_sender = input_pipeline.device_event_sender.clone();
353        let input_device_bindings = input_pipeline.input_device_bindings.clone();
354        let devices_node = input_pipeline.inspect_node.create_child("input_devices");
355        let feature_flags = input_pipeline.feature_flags.clone();
356        let incoming = incoming.clone();
357        // This intentionally uses the [`fuchsia_async`] task dispatcher instead of
358        // [`crate::Dispatcher`] -- the directory watcher always uses the fuchsia-async dispatcher.
359        // This is fine for performance because the actual event dispatch is still configured to
360        // run on [`crate::Dispatcher`].
361        fasync::Task::local(async move {
362            // Watches the input device directory for new input devices. Creates new InputDeviceBindings
363            // that send InputEvents to `input_event_receiver`.
364            match async {
365                let (dir_proxy, server) = endpoints::create_proxy::<fio::DirectoryMarker>();
366                incoming.as_ref_directory().open(
367                    input_device::INPUT_REPORT_PATH,
368                    fio::PERM_READABLE,
369                    server.into()
370                )
371                .with_context(|| format!("failed to open {}", input_device::INPUT_REPORT_PATH))?;
372                let device_watcher =
373                    Watcher::new(&dir_proxy).await.context("failed to create watcher")?;
374                Self::watch_for_devices(
375                    device_watcher,
376                    dir_proxy,
377                    input_device_types,
378                    input_event_sender,
379                    input_device_bindings,
380                    &devices_node,
381                    false, /* break_on_idle */
382                    feature_flags,
383                    metrics_logger.clone(),
384                )
385                .await
386                .context("failed to watch for devices")
387            }
388            .await
389            {
390                Ok(()) => {}
391                Err(err) => {
392                    // This error is usually benign in tests: it means that the setup does not
393                    // support dynamic device discovery. Almost no tests support dynamic
394                    // device discovery, and they also do not need those.
395                    metrics_logger.log_warn(
396                        InputPipelineErrorMetricDimensionEvent::InputPipelineUnableToWatchForNewInputDevices,
397                        std::format!(
398                            "Input pipeline is unable to watch for new input devices: {:?}",
399                            err
400                        ));
401                }
402            }
403        }).detach();
404
405        Ok(input_pipeline)
406    }
407
408    /// Gets the input device bindings.
409    pub fn input_device_bindings(&self) -> &InputDeviceBindingMap {
410        &self.input_device_bindings
411    }
412
413    /// Gets the input device sender: this is the channel that should be cloned
414    /// and used for injecting events from the drivers into the input pipeline.
415    pub fn input_event_sender(&self) -> &UnboundedSender<Vec<input_device::InputEvent>> {
416        &self.device_event_sender
417    }
418
419    /// Gets a list of input device types supported by this input pipeline.
420    pub fn input_device_types(&self) -> &Vec<input_device::InputDeviceType> {
421        &self.input_device_types
422    }
423
424    /// Forwards all input events into the input pipeline.
425    pub async fn handle_input_events(mut self) {
426        let metrics_logger_clone = self.metrics_logger.clone();
427        while let Some(input_event) = self.device_event_receiver.next().await {
428            if let Err(e) = self.pipeline_sender.unbounded_send(input_event) {
429                metrics_logger_clone.log_error(
430                    InputPipelineErrorMetricDimensionEvent::InputPipelineCouldNotForwardEventFromDriver,
431                    std::format!("could not forward event from driver: {:?}", &e));
432            }
433        }
434
435        metrics_logger_clone.log_error(
436            InputPipelineErrorMetricDimensionEvent::InputPipelineStopHandlingEvents,
437            "Input pipeline stopped handling input events.".to_string(),
438        );
439    }
440
441    /// Watches the input report directory for new input devices. Creates InputDeviceBindings
442    /// if new devices match a type in `device_types`.
443    ///
444    /// # Parameters
445    /// - `device_watcher`: Watches the input report directory for new devices.
446    /// - `dir_proxy`: The directory containing InputDevice connections.
447    /// - `device_types`: The types of devices to watch for.
448    /// - `input_event_sender`: The channel new InputDeviceBindings will send InputEvents to.
449    /// - `bindings`: Holds all the InputDeviceBindings
450    /// - `input_devices_node`: The parent node for all device bindings' inspect nodes.
451    /// - `break_on_idle`: If true, stops watching for devices once all existing devices are handled.
452    /// - `metrics_logger`: The metrics logger.
453    ///
454    /// # Errors
455    /// If the input report directory or a file within it cannot be read.
456    async fn watch_for_devices(
457        mut device_watcher: Watcher,
458        dir_proxy: fio::DirectoryProxy,
459        device_types: Vec<input_device::InputDeviceType>,
460        input_event_sender: UnboundedSender<Vec<input_device::InputEvent>>,
461        bindings: InputDeviceBindingMap,
462        input_devices_node: &fuchsia_inspect::Node,
463        break_on_idle: bool,
464        feature_flags: input_device::InputPipelineFeatureFlags,
465        metrics_logger: metrics::MetricsLogger,
466    ) -> Result<(), Error> {
467        // Add non-static properties to inspect node.
468        let devices_discovered = input_devices_node.create_uint("devices_discovered", 0);
469        let devices_connected = input_devices_node.create_uint("devices_connected", 0);
470        while let Some(msg) = device_watcher.try_next().await? {
471            if let Ok(filename) = msg.filename.into_os_string().into_string() {
472                if filename == "." {
473                    continue;
474                }
475
476                let pathbuf = PathBuf::from(filename.clone());
477                match msg.event {
478                    WatchEvent::EXISTING | WatchEvent::ADD_FILE => {
479                        log::info!("found input device {}", filename);
480                        devices_discovered.add(1);
481                        let device_proxy =
482                            input_device::get_device_from_dir_entry_path(&dir_proxy, &pathbuf)?;
483                        add_device_bindings(
484                            &device_types,
485                            &filename,
486                            device_proxy,
487                            &input_event_sender,
488                            &bindings,
489                            get_next_device_id(),
490                            input_devices_node,
491                            Some(&devices_connected),
492                            feature_flags.clone(),
493                            metrics_logger.clone(),
494                        )
495                        .await;
496                    }
497                    WatchEvent::IDLE => {
498                        if break_on_idle {
499                            break;
500                        }
501                    }
502                    _ => (),
503                }
504            }
505        }
506        // Ensure inspect properties persist for debugging if device watch loop ends.
507        input_devices_node.record(devices_discovered);
508        input_devices_node.record(devices_connected);
509        Err(format_err!("Input pipeline stopped watching for new input devices."))
510    }
511
512    /// Handles the incoming InputDeviceRegistryRequestStream.
513    ///
514    /// This method will end when the request stream is closed. If the stream closes with an
515    /// error the error will be returned in the Result.
516    ///
517    /// **NOTE**: Only one stream is handled at a time. https://fxbug.dev/42061078
518    ///
519    /// # Parameters
520    /// - `stream`: The stream of InputDeviceRegistryRequests.
521    /// - `device_types`: The types of devices to watch for.
522    /// - `input_event_sender`: The channel new InputDeviceBindings will send InputEvents to.
523    /// - `bindings`: Holds all the InputDeviceBindings associated with the InputPipeline.
524    /// - `input_devices_node`: The parent node for all injected devices' inspect nodes.
525    /// - `metrics_logger`: The metrics logger.
526    pub async fn handle_input_device_registry_request_stream(
527        mut stream: fidl_fuchsia_input_injection::InputDeviceRegistryRequestStream,
528        device_types: &Vec<input_device::InputDeviceType>,
529        input_event_sender: &UnboundedSender<Vec<input_device::InputEvent>>,
530        bindings: &InputDeviceBindingMap,
531        input_devices_node: &fuchsia_inspect::Node,
532        feature_flags: input_device::InputPipelineFeatureFlags,
533        metrics_logger: metrics::MetricsLogger,
534    ) -> Result<(), Error> {
535        while let Some(request) = stream
536            .try_next()
537            .await
538            .context("Error handling input device registry request stream")?
539        {
540            match request {
541                fidl_fuchsia_input_injection::InputDeviceRegistryRequest::Register {
542                    device,
543                    ..
544                } => {
545                    // Add a binding if the device is a type being tracked
546                    let device = fidl_next::ClientEnd::<
547                        fidl_next_fuchsia_input_report::InputDevice,
548                        zx::Channel,
549                    >::from_untyped(device.into_channel());
550                    let device = Dispatcher::client_from_zx_channel(device);
551                    let device = device.spawn();
552                    let device_id = get_next_device_id();
553
554                    add_device_bindings(
555                        device_types,
556                        &format!("input-device-registry-{}", device_id),
557                        device,
558                        input_event_sender,
559                        bindings,
560                        device_id,
561                        input_devices_node,
562                        None,
563                        feature_flags.clone(),
564                        metrics_logger.clone(),
565                    )
566                    .await;
567                }
568                fidl_fuchsia_input_injection::InputDeviceRegistryRequest::RegisterAndGetDeviceInfo {
569                    device,
570                    responder,
571                    .. } => {
572                    // Add a binding if the device is a type being tracked
573                    let device = fidl_next::ClientEnd::<
574                        fidl_next_fuchsia_input_report::InputDevice,
575                        zx::Channel,
576                    >::from_untyped(device.into_channel());
577                    let device = Dispatcher::client_from_zx_channel(device);
578                    let device = device.spawn();
579                    let device_id = get_next_device_id();
580
581                    add_device_bindings(
582                        device_types,
583                        &format!("input-device-registry-{}", device_id),
584                        device,
585                        input_event_sender,
586                        bindings,
587                        device_id,
588                        input_devices_node,
589                        None,
590                        feature_flags.clone(),
591                        metrics_logger.clone(),
592                    )
593                    .await;
594
595                    responder.send(fidl_fuchsia_input_injection::InputDeviceRegistryRegisterAndGetDeviceInfoResponse{
596                        device_id: Some(device_id),
597                        ..Default::default()
598                    }).expect("Failed to respond to RegisterAndGetDeviceInfo request");
599                }
600            }
601        }
602
603        Ok(())
604    }
605
606    /// Initializes all handlers and starts the input pipeline loop in an asynchronous executor.
607    fn run(
608        mut receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
609        handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>,
610        metrics_logger: metrics::MetricsLogger,
611    ) {
612        Dispatcher::spawn_local(async move {
613            for handler in &handlers {
614                handler.clone().set_handler_healthy();
615            }
616
617            use input_device::InputEventType;
618
619            let mut handlers_by_type: [Vec<Rc<dyn input_handler::BatchInputHandler>>; InputEventType::COUNT] = Default::default();
620
621            // TODO: b/478262850 - We can use supported_input_devices to populate this list.
622            let event_types = vec![
623                InputEventType::Keyboard,
624                InputEventType::LightSensor,
625                InputEventType::ConsumerControls,
626                InputEventType::Mouse,
627                InputEventType::TouchScreen,
628                InputEventType::Touchpad,
629                #[cfg(test)]
630                InputEventType::Fake,
631            ];
632
633            for event_type in event_types {
634                let handlers_for_type: Vec<Rc<dyn input_handler::BatchInputHandler>> = handlers
635                    .iter()
636                    .filter(|h| h.interest().contains(&event_type))
637                    .cloned()
638                    .collect();
639                handlers_by_type[event_type as usize] = handlers_for_type;
640            }
641
642            while let Some(events) = receiver.next().await {
643                if events.is_empty() {
644                    continue;
645                }
646
647                let mut groups_seen = 0;
648                let events = events
649                    .into_iter()
650                    .chunk_by(|e| InputEventType::from(&e.device_event));
651                let events = events.into_iter().map(|(k, v)| (k, v.collect::<Vec<_>>()));
652                for (event_type, event_group) in events {
653                    groups_seen += 1;
654                    if groups_seen == 2 {
655                        metrics_logger.log_error(
656                                InputPipelineErrorMetricDimensionEvent::InputFrameContainsMultipleTypesOfEvents,
657                                "it is not recommended to contain multiple types of events in 1 send".to_string(),
658                            );
659                    }
660                    let mut events_in_group = event_group;
661
662                    // Get pre-computed handlers for this event type.
663                    let handlers = &handlers_by_type[event_type as usize];
664
665                    for handler in handlers {
666                        events_in_group =
667                            handler.clone().handle_input_events(events_in_group).await;
668                    }
669
670                    for event in events_in_group {
671                        if event.handled == input_device::Handled::No {
672                            log::warn!("unhandled input event: {:?}", &event);
673                        }
674                        if let Some(trace_id) = event.trace_id {
675                            fuchsia_trace::flow_end!(
676                                "input",
677                                "event_in_input_pipeline",
678                                trace_id.into()
679                            );
680                        }
681                    }
682                }
683            }
684            for handler in &handlers {
685                handler.clone().set_handler_unhealthy("Pipeline loop terminated");
686            }
687            panic!("Runner task is not supposed to terminate.")
688        }).detach();
689    }
690}
691
692/// Adds `InputDeviceBinding`s to `bindings` for all `device_types` exposed by `device_proxy`.
693///
694/// # Parameters
695/// - `device_types`: The types of devices to watch for.
696/// - `device_proxy`: A proxy to the input device.
697/// - `input_event_sender`: The channel new InputDeviceBindings will send InputEvents to.
698/// - `bindings`: Holds all the InputDeviceBindings associated with the InputPipeline.
699/// - `device_id`: The device id of the associated bindings.
700/// - `input_devices_node`: The parent node for all device bindings' inspect nodes.
701///
702/// # Note
703/// This will create multiple bindings, in the case where
704/// * `device_proxy().get_descriptor()` returns a `fidl_fuchsia_input_report::DeviceDescriptor`
705///   with multiple table fields populated, and
706/// * multiple populated table fields correspond to device types present in `device_types`
707///
708/// This is used, for example, to support the Atlas touchpad. In that case, a single
709/// node in `/dev/class/input-report` provides both a `fuchsia.input.report.MouseDescriptor` and
710/// a `fuchsia.input.report.TouchDescriptor`.
711async fn add_device_bindings(
712    device_types: &Vec<input_device::InputDeviceType>,
713    filename: &String,
714    device_proxy: fidl_next::Client<fidl_next_fuchsia_input_report::InputDevice, Transport>,
715    input_event_sender: &UnboundedSender<Vec<input_device::InputEvent>>,
716    bindings: &InputDeviceBindingMap,
717    device_id: u32,
718    input_devices_node: &fuchsia_inspect::Node,
719    devices_connected: Option<&fuchsia_inspect::UintProperty>,
720    feature_flags: InputPipelineFeatureFlags,
721    metrics_logger: metrics::MetricsLogger,
722) {
723    let mut matched_device_types = vec![];
724    if let Ok(res) = device_proxy.get_descriptor().await {
725        for device_type in device_types {
726            if input_device::is_device_type(&res.descriptor, *device_type).await {
727                matched_device_types.push(device_type);
728                match devices_connected {
729                    Some(dev_connected) => {
730                        let _ = dev_connected.add(1);
731                    }
732                    None => (),
733                };
734            }
735        }
736        if matched_device_types.is_empty() {
737            log::info!(
738                "device {} did not match any supported device types: {:?}",
739                filename,
740                device_types
741            );
742            let device_node = input_devices_node.create_child(format!("{}_Unsupported", filename));
743            let mut health = fuchsia_inspect::health::Node::new(&device_node);
744            health.set_unhealthy("Unsupported device type.");
745            device_node.record(health);
746            input_devices_node.record(device_node);
747            return;
748        }
749    } else {
750        metrics_logger.clone().log_error(
751            InputPipelineErrorMetricDimensionEvent::InputPipelineNoDeviceDescriptor,
752            std::format!("cannot bind device {} without a device descriptor", filename),
753        );
754        return;
755    }
756
757    log::info!(
758        "binding {} to device types: {}",
759        filename,
760        matched_device_types
761            .iter()
762            .fold(String::new(), |device_types_string, device_type| device_types_string
763                + &format!("{:?}, ", device_type))
764    );
765
766    let mut new_bindings: Vec<BoxedInputDeviceBinding> = vec![];
767    for device_type in matched_device_types {
768        // Clone `device_proxy`, so that multiple bindings (e.g. a `MouseBinding` and a
769        // `TouchBinding`) can read data from the same `/dev/class/input-report` node.
770        //
771        // There's no conflict in having multiple bindings read from the same node,
772        // since:
773        // * each binding will create its own `fuchsia.input.report.InputReportsReader`, and
774        // * the device driver will copy each incoming report to each connected reader.
775        //
776        // This does mean that reports from the Atlas touchpad device get read twice
777        // (by a `MouseBinding` and a `TouchBinding`), regardless of whether the device
778        // is operating in mouse mode or touchpad mode.
779        //
780        // This hasn't been an issue because:
781        // * Semantically: things are fine, because each binding discards irrelevant reports.
782        //   (E.g. `MouseBinding` discards anything that isn't a `MouseInputReport`), and
783        // * Performance wise: things are fine, because the data rate of the touchpad is low
784        //   (125 HZ).
785        //
786        // If we add additional cases where bindings share an underlying `input-report` node,
787        // we might consider adding a multiplexing binding, to avoid reading duplicate reports.
788        let proxy = device_proxy.clone();
789        let device_node = input_devices_node.create_child(format!("{}_{}", filename, device_type));
790        match input_device::get_device_binding(
791            *device_type,
792            proxy,
793            device_id,
794            input_event_sender.clone(),
795            device_node,
796            feature_flags.clone(),
797            metrics_logger.clone(),
798        )
799        .await
800        {
801            Ok(binding) => new_bindings.push(binding),
802            Err(e) => {
803                metrics_logger.log_error(
804                    InputPipelineErrorMetricDimensionEvent::InputPipelineFailedToBind,
805                    std::format!("failed to bind {} as {:?}: {}", filename, device_type, e),
806                );
807            }
808        }
809    }
810
811    if !new_bindings.is_empty() {
812        let mut bindings = bindings.lock().await;
813        if let Some(v) = bindings.get_mut(&device_id) {
814            v.extend(new_bindings);
815        } else {
816            bindings.insert(device_id, new_bindings);
817        }
818    }
819}
820
821#[cfg(test)]
822mod tests {
823    use super::*;
824    use crate::input_device::{InputDeviceBinding, InputEventType};
825    use crate::utils::Position;
826    use crate::{
827        fake_input_device_binding, mouse_binding, mouse_model_database,
828        observe_fake_events_input_handler,
829    };
830    use async_trait::async_trait;
831    use diagnostics_assertions::AnyProperty;
832    use fidl::endpoints::{create_proxy_and_stream, create_request_stream};
833    use fuchsia_async as fasync;
834    use futures::FutureExt;
835    use pretty_assertions::assert_eq;
836    use rand::Rng;
837    use sorted_vec_map::SortedVecSet;
838    use vfs::{pseudo_directory, service as pseudo_fs_service};
839
840    const COUNTS_PER_MM: u32 = 12;
841
842    /// Returns the InputEvent sent over `sender`.
843    ///
844    /// # Parameters
845    /// - `sender`: The channel to send the InputEvent over.
846    fn send_input_event(
847        sender: UnboundedSender<Vec<input_device::InputEvent>>,
848    ) -> Vec<input_device::InputEvent> {
849        let mut rng = rand::rng();
850        let offset =
851            Position { x: rng.random_range(0..10) as f32, y: rng.random_range(0..10) as f32 };
852        let input_event = input_device::InputEvent {
853            device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
854                mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
855                    millimeters: Position {
856                        x: offset.x / COUNTS_PER_MM as f32,
857                        y: offset.y / COUNTS_PER_MM as f32,
858                    },
859                }),
860                None, /* wheel_delta_v */
861                None, /* wheel_delta_h */
862                mouse_binding::MousePhase::Move,
863                SortedVecSet::new(),
864                SortedVecSet::new(),
865                None, /* is_precision_scroll */
866                None, /* wake_lease */
867            )),
868            device_descriptor: input_device::InputDeviceDescriptor::Mouse(
869                mouse_binding::MouseDeviceDescriptor {
870                    device_id: 1,
871                    absolute_x_range: None,
872                    absolute_y_range: None,
873                    wheel_v_range: None,
874                    wheel_h_range: None,
875                    buttons: None,
876                    counts_per_mm: COUNTS_PER_MM,
877                },
878            ),
879            event_time: zx::MonotonicInstant::get(),
880            handled: input_device::Handled::No,
881            trace_id: None,
882        };
883        match sender.unbounded_send(vec![input_event.clone()]) {
884            Err(_) => assert!(false),
885            _ => {}
886        }
887
888        vec![input_event]
889    }
890
891    /// Returns a MouseDescriptor on an InputDeviceRequest.
892    ///
893    /// # Parameters
894    /// - `input_device_request`: The request to handle.
895    fn handle_input_device_request(
896        input_device_request: fidl_fuchsia_input_report::InputDeviceRequest,
897    ) {
898        match input_device_request {
899            fidl_fuchsia_input_report::InputDeviceRequest::GetDescriptor { responder } => {
900                let _ = responder.send(&fidl_fuchsia_input_report::DeviceDescriptor {
901                    device_information: None,
902                    mouse: Some(fidl_fuchsia_input_report::MouseDescriptor {
903                        input: Some(fidl_fuchsia_input_report::MouseInputDescriptor {
904                            movement_x: None,
905                            movement_y: None,
906                            scroll_v: None,
907                            scroll_h: None,
908                            buttons: Some(vec![0]),
909                            position_x: None,
910                            position_y: None,
911                            ..Default::default()
912                        }),
913                        ..Default::default()
914                    }),
915                    sensor: None,
916                    touch: None,
917                    keyboard: None,
918                    consumer_control: None,
919                    ..Default::default()
920                });
921            }
922            _ => {}
923        }
924    }
925
926    /// Tests that an input pipeline handles events from multiple devices.
927    #[fasync::run_singlethreaded(test)]
928    async fn multiple_devices_single_handler() {
929        // Create two fake device bindings.
930        let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
931        let first_device_binding =
932            fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
933        let second_device_binding =
934            fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
935
936        // Create a fake input handler.
937        let (handler_event_sender, mut handler_event_receiver) =
938            futures::channel::mpsc::channel(100);
939        let input_handler = observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
940            handler_event_sender,
941        );
942
943        // Build the input pipeline.
944        let (sender, receiver, handlers, _, _, _) =
945            InputPipelineAssembly::new(metrics::MetricsLogger::default())
946                .add_handler(input_handler)
947                .into_components();
948        let inspector = fuchsia_inspect::Inspector::default();
949        let test_node = inspector.root().create_child("input_pipeline");
950        let input_pipeline = InputPipeline {
951            pipeline_sender: sender,
952            device_event_sender,
953            device_event_receiver,
954            input_device_types: vec![],
955            input_device_bindings: Arc::new(Mutex::new(SortedVecMap::new())),
956            inspect_node: test_node,
957            metrics_logger: metrics::MetricsLogger::default(),
958            feature_flags: input_device::InputPipelineFeatureFlags::default(),
959        };
960        InputPipeline::run(receiver, handlers, metrics::MetricsLogger::default());
961
962        // Send an input event from each device.
963        let first_device_events = send_input_event(first_device_binding.input_event_sender());
964        let second_device_events = send_input_event(second_device_binding.input_event_sender());
965
966        // Run the pipeline.
967        fasync::Task::local(async {
968            input_pipeline.handle_input_events().await;
969        })
970        .detach();
971
972        // Assert the handler receives the events.
973        let first_handled_event = handler_event_receiver.next().await;
974        assert_eq!(first_handled_event, first_device_events.into_iter().next());
975
976        let second_handled_event = handler_event_receiver.next().await;
977        assert_eq!(second_handled_event, second_device_events.into_iter().next());
978    }
979
980    /// Tests that an input pipeline handles events through multiple input handlers.
981    #[fasync::run_singlethreaded(test)]
982    async fn single_device_multiple_handlers() {
983        // Create two fake device bindings.
984        let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
985        let input_device_binding =
986            fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
987
988        // Create two fake input handlers.
989        let (first_handler_event_sender, mut first_handler_event_receiver) =
990            futures::channel::mpsc::channel(100);
991        let first_input_handler =
992            observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
993                first_handler_event_sender,
994            );
995        let (second_handler_event_sender, mut second_handler_event_receiver) =
996            futures::channel::mpsc::channel(100);
997        let second_input_handler =
998            observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
999                second_handler_event_sender,
1000            );
1001
1002        // Build the input pipeline.
1003        let (sender, receiver, handlers, _, _, _) =
1004            InputPipelineAssembly::new(metrics::MetricsLogger::default())
1005                .add_handler(first_input_handler)
1006                .add_handler(second_input_handler)
1007                .into_components();
1008        let inspector = fuchsia_inspect::Inspector::default();
1009        let test_node = inspector.root().create_child("input_pipeline");
1010        let input_pipeline = InputPipeline {
1011            pipeline_sender: sender,
1012            device_event_sender,
1013            device_event_receiver,
1014            input_device_types: vec![],
1015            input_device_bindings: Arc::new(Mutex::new(SortedVecMap::new())),
1016            inspect_node: test_node,
1017            metrics_logger: metrics::MetricsLogger::default(),
1018            feature_flags: input_device::InputPipelineFeatureFlags::default(),
1019        };
1020        InputPipeline::run(receiver, handlers, metrics::MetricsLogger::default());
1021
1022        // Send an input event.
1023        let input_events = send_input_event(input_device_binding.input_event_sender());
1024
1025        // Run the pipeline.
1026        fasync::Task::local(async {
1027            input_pipeline.handle_input_events().await;
1028        })
1029        .detach();
1030
1031        // Assert both handlers receive the event.
1032        let expected_event = input_events.into_iter().next();
1033        let first_handler_event = first_handler_event_receiver.next().await;
1034        assert_eq!(first_handler_event, expected_event);
1035        let second_handler_event = second_handler_event_receiver.next().await;
1036        assert_eq!(second_handler_event, expected_event);
1037    }
1038
1039    /// Tests that a single mouse device binding is created for the one input device in the
1040    /// input report directory.
1041    #[fasync::run_singlethreaded(test)]
1042    async fn watch_devices_one_match_exists() {
1043        // Create a file in a pseudo directory that represents an input device.
1044        let mut count: i8 = 0;
1045        let dir = pseudo_directory! {
1046            "file_name" => pseudo_fs_service::host(
1047                move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1048                    async move {
1049                        while count < 3 {
1050                            if let Some(input_device_request) =
1051                                request_stream.try_next().await.unwrap()
1052                            {
1053                                handle_input_device_request(input_device_request);
1054                                count += 1;
1055                            }
1056                        }
1057
1058                    }.boxed()
1059                },
1060            )
1061        };
1062
1063        // Create a Watcher on the pseudo directory.
1064        let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1065        let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1066        // Get a proxy to the pseudo directory for the input pipeline. The input pipeline uses this
1067        // proxy to get connections to input devices.
1068        let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1069
1070        let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1071        let bindings: InputDeviceBindingMap = Arc::new(Mutex::new(SortedVecMap::new()));
1072        let supported_device_types = vec![input_device::InputDeviceType::Mouse];
1073
1074        let inspector = fuchsia_inspect::Inspector::default();
1075        let test_node = inspector.root().create_child("input_pipeline");
1076        test_node.record_string(
1077            "supported_input_devices",
1078            supported_device_types.clone().iter().join(", "),
1079        );
1080        let input_devices = test_node.create_child("input_devices");
1081        // Assert that inspect tree is initialized with no devices.
1082        diagnostics_assertions::assert_data_tree!(inspector, root: {
1083            input_pipeline: {
1084                supported_input_devices: "Mouse",
1085                input_devices: {}
1086            }
1087        });
1088
1089        let _ = InputPipeline::watch_for_devices(
1090            device_watcher,
1091            dir_proxy_for_pipeline,
1092            supported_device_types,
1093            input_event_sender,
1094            bindings.clone(),
1095            &input_devices,
1096            true, /* break_on_idle */
1097            InputPipelineFeatureFlags { enable_merge_touch_events: false },
1098            metrics::MetricsLogger::default(),
1099        )
1100        .await;
1101
1102        // Assert that one mouse device with accurate device id was found.
1103        let bindings_map = bindings.lock().await;
1104        assert_eq!(bindings_map.len(), 1);
1105        let bindings_vector = bindings_map.get(&10);
1106        assert!(bindings_vector.is_some());
1107        assert_eq!(bindings_vector.unwrap().len(), 1);
1108        let boxed_mouse_binding = bindings_vector.unwrap().get(0);
1109        assert!(boxed_mouse_binding.is_some());
1110        assert_eq!(
1111            boxed_mouse_binding.unwrap().get_device_descriptor(),
1112            input_device::InputDeviceDescriptor::Mouse(mouse_binding::MouseDeviceDescriptor {
1113                device_id: 10,
1114                absolute_x_range: None,
1115                absolute_y_range: None,
1116                wheel_v_range: None,
1117                wheel_h_range: None,
1118                buttons: Some(vec![0]),
1119                counts_per_mm: mouse_model_database::db::DEFAULT_COUNTS_PER_MM,
1120            })
1121        );
1122
1123        // Assert that inspect tree reflects new device discovered and connected.
1124        diagnostics_assertions::assert_data_tree!(inspector, root: {
1125            input_pipeline: {
1126                supported_input_devices: "Mouse",
1127                input_devices: {
1128                    devices_discovered: 1u64,
1129                    devices_connected: 1u64,
1130                    "file_name_Mouse": contains {
1131                        reports_received_count: 0u64,
1132                        reports_filtered_count: 0u64,
1133                        events_generated: 0u64,
1134                        last_received_timestamp_ns: 0u64,
1135                        last_generated_timestamp_ns: 0u64,
1136                        "fuchsia.inspect.Health": {
1137                            status: "OK",
1138                            // Timestamp value is unpredictable and not relevant in this context,
1139                            // so we only assert that the property is present.
1140                            start_timestamp_nanos: AnyProperty
1141                        },
1142                    }
1143                }
1144            }
1145        });
1146    }
1147
1148    /// Tests that no device bindings are created because the input pipeline looks for keyboard devices
1149    /// but only a mouse exists.
1150    #[fasync::run_singlethreaded(test)]
1151    async fn watch_devices_no_matches_exist() {
1152        // Create a file in a pseudo directory that represents an input device.
1153        let mut count: i8 = 0;
1154        let dir = pseudo_directory! {
1155            "file_name" => pseudo_fs_service::host(
1156                move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1157                    async move {
1158                        while count < 1 {
1159                            if let Some(input_device_request) =
1160                                request_stream.try_next().await.unwrap()
1161                            {
1162                                handle_input_device_request(input_device_request);
1163                                count += 1;
1164                            }
1165                        }
1166
1167                    }.boxed()
1168                },
1169            )
1170        };
1171
1172        // Create a Watcher on the pseudo directory.
1173        let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1174        let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1175        // Get a proxy to the pseudo directory for the input pipeline. The input pipeline uses this
1176        // proxy to get connections to input devices.
1177        let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1178
1179        let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1180        let bindings: InputDeviceBindingMap = Arc::new(Mutex::new(SortedVecMap::new()));
1181        let supported_device_types = vec![input_device::InputDeviceType::Keyboard];
1182
1183        let inspector = fuchsia_inspect::Inspector::default();
1184        let test_node = inspector.root().create_child("input_pipeline");
1185        test_node.record_string(
1186            "supported_input_devices",
1187            supported_device_types.clone().iter().join(", "),
1188        );
1189        let input_devices = test_node.create_child("input_devices");
1190        // Assert that inspect tree is initialized with no devices.
1191        diagnostics_assertions::assert_data_tree!(inspector, root: {
1192            input_pipeline: {
1193                supported_input_devices: "Keyboard",
1194                input_devices: {}
1195            }
1196        });
1197
1198        let _ = InputPipeline::watch_for_devices(
1199            device_watcher,
1200            dir_proxy_for_pipeline,
1201            supported_device_types,
1202            input_event_sender,
1203            bindings.clone(),
1204            &input_devices,
1205            true, /* break_on_idle */
1206            InputPipelineFeatureFlags { enable_merge_touch_events: false },
1207            metrics::MetricsLogger::default(),
1208        )
1209        .await;
1210
1211        // Assert that no devices were found.
1212        let bindings = bindings.lock().await;
1213        assert_eq!(bindings.len(), 0);
1214
1215        // Assert that inspect tree reflects new device discovered, but not connected.
1216        diagnostics_assertions::assert_data_tree!(inspector, root: {
1217            input_pipeline: {
1218                supported_input_devices: "Keyboard",
1219                input_devices: {
1220                    devices_discovered: 1u64,
1221                    devices_connected: 0u64,
1222                    "file_name_Unsupported": {
1223                        "fuchsia.inspect.Health": {
1224                            status: "UNHEALTHY",
1225                            message: "Unsupported device type.",
1226                            // Timestamp value is unpredictable and not relevant in this context,
1227                            // so we only assert that the property is present.
1228                            start_timestamp_nanos: AnyProperty
1229                        },
1230                    }
1231                }
1232            }
1233        });
1234    }
1235
1236    /// Tests that a single keyboard device binding is created for the input device registered
1237    /// through InputDeviceRegistry.
1238    #[fasync::run_singlethreaded(test)]
1239    async fn handle_input_device_registry_request_stream() {
1240        let (input_device_registry_proxy, input_device_registry_request_stream) =
1241            create_proxy_and_stream::<fidl_fuchsia_input_injection::InputDeviceRegistryMarker>();
1242        let (input_device_client_end, mut input_device_request_stream) =
1243            create_request_stream::<fidl_fuchsia_input_report::InputDeviceMarker>();
1244
1245        let device_types = vec![input_device::InputDeviceType::Mouse];
1246        let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1247        let bindings: InputDeviceBindingMap = Arc::new(Mutex::new(SortedVecMap::new()));
1248
1249        // Handle input device requests.
1250        let mut count: i8 = 0;
1251        fasync::Task::local(async move {
1252            // Register a device.
1253            let _ = input_device_registry_proxy.register(input_device_client_end);
1254
1255            while count < 3 {
1256                if let Some(input_device_request) =
1257                    input_device_request_stream.try_next().await.unwrap()
1258                {
1259                    handle_input_device_request(input_device_request);
1260                    count += 1;
1261                }
1262            }
1263
1264            // End handle_input_device_registry_request_stream() by taking the event stream.
1265            input_device_registry_proxy.take_event_stream();
1266        })
1267        .detach();
1268
1269        let inspector = fuchsia_inspect::Inspector::default();
1270        let test_node = inspector.root().create_child("input_pipeline");
1271
1272        // Start listening for InputDeviceRegistryRequests.
1273        let bindings_clone = bindings.clone();
1274        let _ = InputPipeline::handle_input_device_registry_request_stream(
1275            input_device_registry_request_stream,
1276            &device_types,
1277            &input_event_sender,
1278            &bindings_clone,
1279            &test_node,
1280            InputPipelineFeatureFlags { enable_merge_touch_events: false },
1281            metrics::MetricsLogger::default(),
1282        )
1283        .await;
1284
1285        // Assert that a device was registered.
1286        let bindings = bindings.lock().await;
1287        assert_eq!(bindings.len(), 1);
1288    }
1289
1290    // Tests that correct properties are added to inspect node when InputPipeline is created.
1291    #[fasync::run_singlethreaded(test)]
1292    async fn check_inspect_node_has_correct_properties() {
1293        let device_types = vec![
1294            input_device::InputDeviceType::Touch,
1295            input_device::InputDeviceType::ConsumerControls,
1296        ];
1297        let inspector = fuchsia_inspect::Inspector::default();
1298        let test_node = inspector.root().create_child("input_pipeline");
1299        // Create fake input handler for assembly
1300        let (fake_handler_event_sender, _fake_handler_event_receiver) =
1301            futures::channel::mpsc::channel(100);
1302        let fake_input_handler =
1303            observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
1304                fake_handler_event_sender,
1305            );
1306        let assembly = InputPipelineAssembly::new(metrics::MetricsLogger::default())
1307            .add_handler(fake_input_handler);
1308        let _test_input_pipeline = InputPipeline::new(
1309            &Incoming::new(),
1310            device_types,
1311            assembly,
1312            test_node,
1313            InputPipelineFeatureFlags { enable_merge_touch_events: false },
1314            metrics::MetricsLogger::default(),
1315        );
1316        diagnostics_assertions::assert_data_tree!(inspector, root: {
1317            input_pipeline: {
1318                supported_input_devices: "Touch, ConsumerControls",
1319                handlers_registered: 1u64,
1320                handlers_healthy: 1u64,
1321                input_devices: {}
1322            }
1323        });
1324    }
1325
1326    struct SpecificInterestFakeHandler {
1327        interest_types: Vec<input_device::InputEventType>,
1328        event_sender: std::cell::RefCell<futures::channel::mpsc::Sender<input_device::InputEvent>>,
1329    }
1330
1331    impl SpecificInterestFakeHandler {
1332        pub fn new(
1333            interest_types: Vec<input_device::InputEventType>,
1334            event_sender: futures::channel::mpsc::Sender<input_device::InputEvent>,
1335        ) -> Rc<Self> {
1336            Rc::new(SpecificInterestFakeHandler {
1337                interest_types,
1338                event_sender: std::cell::RefCell::new(event_sender),
1339            })
1340        }
1341    }
1342
1343    impl Handler for SpecificInterestFakeHandler {
1344        fn set_handler_healthy(self: std::rc::Rc<Self>) {}
1345        fn set_handler_unhealthy(self: std::rc::Rc<Self>, _msg: &str) {}
1346        fn get_name(&self) -> &'static str {
1347            "SpecificInterestFakeHandler"
1348        }
1349
1350        fn interest(&self) -> Vec<input_device::InputEventType> {
1351            self.interest_types.clone()
1352        }
1353    }
1354
1355    #[async_trait(?Send)]
1356    impl input_handler::InputHandler for SpecificInterestFakeHandler {
1357        async fn handle_input_event(
1358            self: Rc<Self>,
1359            input_event: input_device::InputEvent,
1360        ) -> Vec<input_device::InputEvent> {
1361            match self.event_sender.borrow_mut().try_send(input_event.clone()) {
1362                Err(e) => panic!("SpecificInterestFakeHandler failed to send event: {:?}", e),
1363                Ok(_) => {}
1364            }
1365            vec![input_event]
1366        }
1367    }
1368
1369    #[fasync::run_singlethreaded(test)]
1370    async fn run_only_sends_events_to_interested_handlers() {
1371        // Mouse Handler (Specific Interest: Mouse)
1372        let (mouse_sender, mut mouse_receiver) = futures::channel::mpsc::channel(1);
1373        let mouse_handler =
1374            SpecificInterestFakeHandler::new(vec![InputEventType::Mouse], mouse_sender);
1375
1376        // Fake Handler (Specific Interest: Fake)
1377        let (fake_sender, mut fake_receiver) = futures::channel::mpsc::channel(1);
1378        let fake_handler =
1379            SpecificInterestFakeHandler::new(vec![InputEventType::Fake], fake_sender);
1380
1381        let (pipeline_sender, pipeline_receiver, handlers, _, _, _) =
1382            InputPipelineAssembly::new(metrics::MetricsLogger::default())
1383                .add_handler(mouse_handler)
1384                .add_handler(fake_handler)
1385                .into_components();
1386
1387        // Run the pipeline logic
1388        InputPipeline::run(pipeline_receiver, handlers, metrics::MetricsLogger::default());
1389
1390        // Create a Fake event
1391        let fake_event = input_device::InputEvent {
1392            device_event: input_device::InputDeviceEvent::Fake,
1393            device_descriptor: input_device::InputDeviceDescriptor::Fake,
1394            event_time: zx::MonotonicInstant::get(),
1395            handled: input_device::Handled::No,
1396            trace_id: None,
1397        };
1398
1399        // Send the Fake event
1400        pipeline_sender.unbounded_send(vec![fake_event.clone()]).expect("failed to send event");
1401
1402        // Verify Fake Handler received it
1403        let received_by_fake = fake_receiver.next().await;
1404        assert_eq!(received_by_fake, Some(fake_event));
1405
1406        // Verify Mouse Handler did NOT receive it
1407        assert!(mouse_receiver.try_next().is_err());
1408    }
1409
1410    fn create_mouse_event(x: f32, y: f32) -> input_device::InputEvent {
1411        input_device::InputEvent {
1412            device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
1413                mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
1414                    millimeters: Position { x, y },
1415                }),
1416                None,
1417                None,
1418                mouse_binding::MousePhase::Move,
1419                SortedVecSet::new(),
1420                SortedVecSet::new(),
1421                None,
1422                None,
1423            )),
1424            device_descriptor: input_device::InputDeviceDescriptor::Mouse(
1425                mouse_binding::MouseDeviceDescriptor {
1426                    device_id: 1,
1427                    absolute_x_range: None,
1428                    absolute_y_range: None,
1429                    wheel_v_range: None,
1430                    wheel_h_range: None,
1431                    buttons: None,
1432                    counts_per_mm: 1,
1433                },
1434            ),
1435            event_time: zx::MonotonicInstant::get(),
1436            handled: input_device::Handled::No,
1437            trace_id: None,
1438        }
1439    }
1440
1441    #[fasync::run_singlethreaded(test)]
1442    async fn run_mixed_event_types_dispatched_correctly() {
1443        // Mouse Handler (Specific Interest: Mouse)
1444        let (mouse_sender, mut mouse_receiver) = futures::channel::mpsc::channel(10);
1445        let mouse_handler =
1446            SpecificInterestFakeHandler::new(vec![InputEventType::Mouse], mouse_sender);
1447
1448        // Fake Handler (Specific Interest: Fake)
1449        let (fake_sender, mut fake_receiver) = futures::channel::mpsc::channel(10);
1450        let fake_handler =
1451            SpecificInterestFakeHandler::new(vec![InputEventType::Fake], fake_sender);
1452
1453        let (pipeline_sender, pipeline_receiver, handlers, _, _, _) =
1454            InputPipelineAssembly::new(metrics::MetricsLogger::default())
1455                .add_handler(mouse_handler)
1456                .add_handler(fake_handler)
1457                .into_components();
1458
1459        // Run the pipeline logic
1460        InputPipeline::run(pipeline_receiver, handlers, metrics::MetricsLogger::default());
1461
1462        // Create events
1463        let mouse_event_1 = create_mouse_event(1.0, 1.0);
1464        let mouse_event_2 = create_mouse_event(2.0, 2.0);
1465        let mouse_event_3 = create_mouse_event(3.0, 3.0);
1466
1467        let fake_event_1 = input_device::InputEvent {
1468            device_event: input_device::InputDeviceEvent::Fake,
1469            device_descriptor: input_device::InputDeviceDescriptor::Fake,
1470            event_time: zx::MonotonicInstant::get(),
1471            handled: input_device::Handled::No,
1472            trace_id: None,
1473        };
1474
1475        // Send mixed batch: [Mouse, Mouse, Fake, Mouse]
1476        // This should result in 3 chunks: [Mouse, Mouse], [Fake], [Mouse]
1477        let mixed_batch = vec![
1478            mouse_event_1.clone(),
1479            mouse_event_2.clone(),
1480            fake_event_1.clone(),
1481            mouse_event_3.clone(),
1482        ];
1483        pipeline_sender.unbounded_send(mixed_batch).expect("failed to send events");
1484
1485        // Verify Mouse Handler received M1, M2, and then M3
1486        assert_eq!(mouse_receiver.next().await, Some(mouse_event_1));
1487        assert_eq!(mouse_receiver.next().await, Some(mouse_event_2));
1488        assert_eq!(mouse_receiver.next().await, Some(mouse_event_3));
1489
1490        // Verify Fake Handler received F1
1491        assert_eq!(fake_receiver.next().await, Some(fake_event_1));
1492    }
1493}