sensors_lib/
sensor_update_sender.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// Copyright 2024 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::client::Client;
use crate::sensor_manager::*;
use fidl::endpoints::ControlHandle;
use fidl_fuchsia_hardware_sensors::{DriverEvent, DriverEventStream};
use futures::channel::mpsc;
use futures::lock::Mutex;
use futures::select;
use futures::stream::{FuturesUnordered, StreamFuture};
use futures_util::StreamExt;
use std::collections::HashMap;
use std::sync::Arc;

#[derive(Debug, Clone)]
pub struct SensorUpdateSender {
    sender: Arc<Mutex<mpsc::Sender<SensorUpdate>>>,
}

pub enum SensorUpdate {
    SensorMap(HashMap<SensorId, Sensor>),
    EventStream(DriverEventStream),
}

// Sends all clients of a sensor an event if event contains a DriverEvent. If the channel to the
// client has closed, it will remove that client from the list of subscribers for that particular
// sensor.
//
// Returns whether or not to continue polling the event stream that generated this sensor event.
fn handle_sensor_event(
    sensors: &mut HashMap<SensorId, Sensor>,
    sensor_event: Option<Result<DriverEvent, fidl::Error>>,
) -> bool {
    let mut should_send_more_events = true;
    match sensor_event {
        Some(Ok(DriverEvent::OnSensorEvent { event })) => {
            if let Some(sensor) = sensors.get_mut(&event.sensor_id) {
                let mut clients_to_remove: Vec<Client> = Vec::new();
                for client in &sensor.clients {
                    if !client.control_handle.is_closed() {
                        if let Err(e) = client.control_handle.send_on_sensor_event(&event) {
                            log::warn!("Failed to send sensor event: {:#?}", e);
                        }
                    } else {
                        log::error!("Client was PEER_CLOSED! Removing from clients list");
                        clients_to_remove.push(client.clone());
                    }
                }
                for client in clients_to_remove {
                    sensor.clients.remove(&client);
                }
            }
        }
        Some(Ok(DriverEvent::_UnknownEvent { ordinal, .. })) => {
            log::warn!("SensorManager received an UnknownEvent with ordinal: {:#?}", ordinal);
        }
        Some(Err(e)) => {
            log::error!("Received an error from sensor driver: {:#?}", e);
            should_send_more_events = false;
        }
        None => {
            log::error!("Got None from driver");
            should_send_more_events = false;
        }
    }

    should_send_more_events
}

// Handles the stream of sensor events from all drivers. Receives updates about sensor information
// and new sensors from a channel to the SensorManager.
pub async fn handle_sensor_event_streams(mut update_receiver: mpsc::Receiver<SensorUpdate>) {
    let mut event_streams: FuturesUnordered<StreamFuture<DriverEventStream>> =
        FuturesUnordered::new();
    let mut sensors: HashMap<SensorId, Sensor> = HashMap::new();
    loop {
        select! {
            sensor_event = event_streams.next() => {
                if let Some((event, stream)) = sensor_event {
                    if handle_sensor_event(&mut sensors, event) {
                        // Once the future has resolved, the rest of the events need to be
                        // placed back onto the list of futures.
                        event_streams.push(stream.into_future());
                    }
                }
            },
            sensor_update = update_receiver.next() => {
                match sensor_update {
                    Some(SensorUpdate::SensorMap(updated_sensors)) => {
                        sensors = updated_sensors;
                    },
                    Some(SensorUpdate::EventStream(stream)) => {
                        event_streams.push(stream.into_future());
                    }
                    None => {
                        log::error!("Channel has hung up! Will no longer receive sensor updates.");
                    }
                }
            },
        }
    }
}

impl SensorUpdateSender {
    pub fn new(sender: Arc<Mutex<mpsc::Sender<SensorUpdate>>>) -> Self {
        SensorUpdateSender { sender }
    }

    async fn send_update(&self, update: SensorUpdate) {
        if let Err(e) = self.sender.lock().await.try_send(update) {
            log::warn!("Failed to send sensor update! {:#?}", e);
        }
    }

    pub(crate) async fn update_sensor_map(&self, sensors: HashMap<SensorId, Sensor>) {
        self.send_update(SensorUpdate::SensorMap(sensors)).await
    }

    pub(crate) async fn add_event_stream(&self, event_stream: DriverEventStream) {
        self.send_update(SensorUpdate::EventStream(event_stream)).await
    }
}