settings_camera_watcher_agent/
lib.rs1use anyhow::{Context, Error};
6use fuchsia_async as fasync;
7use futures::channel::mpsc::UnboundedSender;
8use settings_camera::connect_to_camera;
9use settings_common::inspect::event::ExternalEventPublisher;
10use settings_common::service_context::ServiceContext;
11use settings_common::trace;
12
13pub struct CameraWatcherAgent {
14 muted_txs: Vec<UnboundedSender<bool>>,
17 external_publisher: ExternalEventPublisher,
18}
19
20impl CameraWatcherAgent {
21 pub fn new(
22 muted_txs: Vec<UnboundedSender<bool>>,
23 external_publisher: ExternalEventPublisher,
24 ) -> Self {
25 Self { muted_txs, external_publisher }
26 }
27
28 pub async fn spawn(self, service_context: &ServiceContext) -> Result<(), Error> {
29 let camera_device_client =
30 connect_to_camera(service_context, self.external_publisher.clone())
31 .await
32 .context("connecting to camera")?;
33 let mut event_handler = EventHandler { muted_txs: self.muted_txs.clone(), sw_muted: false };
34 fasync::Task::local(async move {
35 let id = fuchsia_trace::Id::new();
36 trace!(id, "camera_watcher_agent_handler");
41 while let Ok((sw_muted, _hw_muted)) = camera_device_client.watch_mute_state().await {
42 trace!(id, "event");
43 event_handler.handle_event(sw_muted);
44 }
45 })
46 .detach();
47
48 Ok(())
49 }
50}
51
52struct EventHandler {
53 muted_txs: Vec<UnboundedSender<bool>>,
54 sw_muted: bool,
55}
56
57impl EventHandler {
58 fn handle_event(&mut self, sw_muted: bool) {
59 if self.sw_muted != sw_muted {
60 self.sw_muted = sw_muted;
61 self.send_event(sw_muted);
62 }
63 }
64
65 fn send_event(&self, muted: bool) {
66 for muted_tx in &self.muted_txs {
67 let _ = muted_tx.unbounded_send(muted);
68 }
69 }
70}
71
72#[cfg(test)]
73mod tests {
74 use super::*;
75 use fuchsia_async::{MonotonicInstant, TestExecutor};
76 use futures::StreamExt;
77 use futures::channel::mpsc;
78 use futures::lock::Mutex;
79 use settings_camera::CAMERA_WATCHER_TIMEOUT;
80 use settings_test_common::fakes::camera3_service::Camera3Service;
81 use settings_test_common::fakes::service::ServiceRegistry;
82 use settings_test_common::helpers::move_executor_forward_and_get;
83 use std::rc::Rc;
84
85 #[fuchsia::test(allow_stalls = false)]
87 async fn when_camera3_inaccessible_returns_err() {
88 let (event_tx, _event_rx) = mpsc::unbounded();
89 let external_publisher = ExternalEventPublisher::new(event_tx);
90
91 let agent = CameraWatcherAgent { muted_txs: vec![], external_publisher };
92 let service_context =
93 ServiceContext::new(Some(ServiceRegistry::serve(ServiceRegistry::create())));
94
95 let result = agent.spawn(&service_context).await;
97 assert!(result.is_err());
98 }
99
100 #[fuchsia::test(allow_stalls = false)]
102 async fn event_handler_proxies_event() {
103 let (tx1, mut rx1) = mpsc::unbounded();
104 let (tx2, mut rx2) = mpsc::unbounded();
105
106 let mut event_handler =
107 EventHandler { muted_txs: vec![tx1.clone(), tx2.clone()], sw_muted: false };
108
109 event_handler.handle_event(true);
111
112 let mut channel_received = 0;
113
114 let mut next_rx1 = rx1.next();
115 let mut next_rx2 = rx2.next();
116
117 loop {
120 futures::select! {
121 event = next_rx1 => {
122 let Some(muted) = event else {
123 continue;
124 };
125 assert!(muted);
126 tx1.close_channel();
128 channel_received += 1;
129 }
130 event = next_rx2 => {
131 let Some(muted) = event else {
132 continue;
133 };
134 assert!(muted);
135 tx2.close_channel();
137 channel_received += 1;
138 }
139 complete => break,
140 }
141 }
142
143 assert_eq!(channel_received, 2);
144 }
145
146 struct FakeServices {
147 camera3_service: Rc<Mutex<Camera3Service>>,
148 }
149
150 #[derive(PartialEq)]
151 enum CameraDevice {
152 With,
153 Without,
154 }
155
156 #[derive(PartialEq)]
157 enum DelayCamera {
158 Yes,
159 No,
160 }
161
162 async fn create_services(
167 has_camera_device: CameraDevice,
168 delay_camera_device: DelayCamera,
169 ) -> (Rc<Mutex<ServiceRegistry>>, FakeServices) {
170 let service_registry = ServiceRegistry::create();
171
172 let camera3_service_handle =
173 Rc::new(Mutex::new(if DelayCamera::Yes == delay_camera_device {
174 Camera3Service::new_delayed_devices(delay_camera_device == DelayCamera::Yes)
175 } else {
176 Camera3Service::new(has_camera_device == CameraDevice::With)
177 }));
178 service_registry.lock().await.register_service(camera3_service_handle.clone());
179
180 (service_registry, FakeServices { camera3_service: camera3_service_handle })
181 }
182
183 #[fuchsia::test(allow_stalls = false)]
184 async fn test_camera_agent_proxy() {
185 let (service_registry, fake_services) =
187 create_services(CameraDevice::With, DelayCamera::No).await;
188
189 let expected_camera_state = true;
190 fake_services.camera3_service.lock().await.set_camera_sw_muted(expected_camera_state);
191 let (event_tx, _event_rx) = mpsc::unbounded();
192 let external_publisher = ExternalEventPublisher::new(event_tx);
193 let (tx, mut rx) = mpsc::unbounded();
194
195 let agent = CameraWatcherAgent::new(vec![tx], external_publisher);
196 let service_context = ServiceContext::new(Some(ServiceRegistry::serve(service_registry)));
197 let res = agent.spawn(&service_context).await;
198
199 assert!(matches!(res, Ok(())), "agent spawn failed");
201
202 let res = rx.next().await;
204 assert_eq!(res, Some(true));
205 }
206
207 #[fuchsia::test]
210 fn test_camera_devices_watcher_timeout() {
211 let mut executor = TestExecutor::new_with_fake_time();
214 executor.set_fake_time(MonotonicInstant::from_nanos(0));
215
216 let services_future = create_services(CameraDevice::Without, DelayCamera::No);
218 let (service_registry, fake_services) = move_executor_forward_and_get(
219 &mut executor,
220 services_future,
221 "Could not create services",
222 );
223
224 let camera_service_future = fake_services.camera3_service.lock();
226 move_executor_forward_and_get(
227 &mut executor,
228 camera_service_future,
229 "Unable to get camera service",
230 )
231 .set_camera_sw_muted(true);
232
233 let (event_tx, _event_rx) = mpsc::unbounded();
234 let external_publisher = ExternalEventPublisher::new(event_tx);
235 let agent = CameraWatcherAgent::new(vec![], external_publisher);
236 let service_context = ServiceContext::new(Some(ServiceRegistry::serve(service_registry)));
237
238 let spawn_future = agent.spawn(&service_context);
240
241 executor.set_fake_time(MonotonicInstant::from_nanos(CAMERA_WATCHER_TIMEOUT + 1));
243
244 let res =
245 move_executor_forward_and_get(&mut executor, spawn_future, "Could not complete spawn");
246
247 assert!(res.is_err(), "spawn did not hit timeout");
249 }
250
251 #[fuchsia_async::run_singlethreaded(test)]
254 async fn test_camera_agent_delayed_devices() {
255 let (service_registry, fake_services) =
256 create_services(CameraDevice::Without, DelayCamera::Yes).await;
257
258 let expected_camera_state = true;
259 fake_services.camera3_service.lock().await.set_camera_sw_muted(expected_camera_state);
260 let (tx, mut rx) = mpsc::unbounded();
261 let (event_tx, _event_rx) = mpsc::unbounded();
262 let external_publisher = ExternalEventPublisher::new(event_tx);
263 let agent = CameraWatcherAgent::new(vec![tx], external_publisher);
264 let service_context = ServiceContext::new(Some(ServiceRegistry::serve(service_registry)));
265 let res = agent.spawn(&service_context).await;
266
267 assert!(matches!(res, Ok(())), "spawn failed");
269 let muted = rx.next().await;
270 assert_eq!(muted, Some(true));
271 }
272}