1use anyhow::Result;
9use async_trait::async_trait;
10use fidl::prelude::*;
11use fidl_fuchsia_time_external::{
12 Properties, PushSourceRequest, PushSourceRequestStream, PushSourceWatchSampleResponder,
13 PushSourceWatchStatusResponder, Status, TimeSample,
14};
15
16use futures::channel::mpsc::{channel, Receiver, Sender};
17use futures::lock::Mutex;
18use futures::{StreamExt, TryStreamExt};
19use log::warn;
20use std::sync::{Arc, Weak};
21use watch_handler::{Sender as WatchSender, WatchHandler};
22
23#[derive(Clone, PartialEq, Debug)]
25pub enum Update {
26 Sample(Arc<TimeSample>),
28 Status(Status),
30}
31
32impl From<Status> for Update {
33 fn from(status: Status) -> Self {
34 Update::Status(status)
35 }
36}
37
38impl From<TimeSample> for Update {
39 fn from(sample: TimeSample) -> Self {
40 Update::Sample(Arc::new(sample))
41 }
42}
43
44impl Update {
45 pub fn is_status(&self) -> bool {
47 match self {
48 Update::Sample(_) => false,
49 Update::Status(_) => true,
50 }
51 }
52}
53
54#[async_trait]
56pub trait UpdateAlgorithm {
57 async fn update_device_properties(&self, properties: Properties);
59
60 async fn generate_updates(&self, sink: Sender<Update>) -> Result<()>;
63}
64
65pub struct PushSource<UA: UpdateAlgorithm> {
69 internal: Mutex<PushSourceInternal>,
71 update_algorithm: UA,
73}
74
75impl<UA: UpdateAlgorithm> PushSource<UA> {
76 pub fn new(update_algorithm: UA, initial_status: Status) -> Result<Self> {
79 Ok(Self { internal: Mutex::new(PushSourceInternal::new(initial_status)), update_algorithm })
80 }
81
82 pub async fn poll_updates(&self) -> Result<()> {
84 let (sender, mut receiver) = channel(0);
86 let updater_fut = self.update_algorithm.generate_updates(sender);
87 let consumer_fut = async move {
88 while let Some(update) = receiver.next().await {
89 self.internal.lock().await.push_update(update).await;
90 }
91 };
92 let (update_res, _) = futures::future::join(updater_fut, consumer_fut).await;
93 update_res
94 }
95
96 pub async fn handle_requests_for_stream(
98 &self,
99 mut request_stream: PushSourceRequestStream,
100 ) -> Result<()> {
101 log::debug!("handle_requests_for_stream: ");
102 let client_context = self.internal.lock().await.register_client();
103 while let Some(request) = request_stream.try_next().await? {
104 client_context.lock().await.handle_request(request, &self.update_algorithm).await?;
105 }
106 Ok(())
107 }
108}
109
110struct PushSourceInternal {
112 clients: Vec<Weak<Mutex<PushSourceClientHandler>>>,
114 latest_sample: Option<Arc<TimeSample>>,
116 latest_status: Status,
118}
119
120impl PushSourceInternal {
121 pub fn new(initial_status: Status) -> Self {
123 PushSourceInternal { clients: vec![], latest_sample: None, latest_status: initial_status }
124 }
125
126 pub fn register_client(&mut self) -> Arc<Mutex<PushSourceClientHandler>> {
129 log::debug!("PushSourceInternal: register_client");
130 let client = Arc::new(Mutex::new(PushSourceClientHandler {
131 sample_watcher: WatchHandler::create(self.latest_sample.clone()),
132 status_watcher: WatchHandler::create(Some(self.latest_status)),
133 }));
134 self.clients.push(Arc::downgrade(&client));
135 client
136 }
137
138 pub async fn push_update(&mut self, update: Update) {
140 log::debug!("push_update: received update: {:?}", &update);
141 match &update {
142 Update::Sample(sample) => self.latest_sample = Some(Arc::clone(&sample)),
143 Update::Status(status) => self.latest_status = *status,
144 }
145 let mut client_arcs = vec![];
147 self.clients.retain(|client_weak| match client_weak.upgrade() {
148 Some(client_arc) => {
149 client_arcs.push(client_arc);
150 true
151 }
152 None => false,
153 });
154 log::debug!("push_update: clients to update: {}", client_arcs.len());
160 for client in client_arcs {
161 client.lock().await.handle_update(update.clone());
162 }
163 }
164}
165
166struct PushSourceClientHandler {
168 sample_watcher: WatchHandler<Arc<TimeSample>, WatchSampleResponder>,
170 status_watcher: WatchHandler<Status, WatchStatusResponder>,
172}
173
174impl PushSourceClientHandler {
175 async fn handle_request(
177 &mut self,
178 request: PushSourceRequest,
179 update_algorithm: &impl UpdateAlgorithm,
180 ) -> Result<()> {
181 match request {
182 PushSourceRequest::WatchSample { responder } => {
183 self.sample_watcher.watch(WatchSampleResponder(responder)).map_err(|e| {
184 e.responder.0.control_handle().shutdown_with_epitaph(zx::Status::BAD_STATE);
185 e
186 })?;
187 }
188 PushSourceRequest::WatchStatus { responder } => {
189 self.status_watcher.watch(WatchStatusResponder(responder)).map_err(|e| {
190 e.responder.0.control_handle().shutdown_with_epitaph(zx::Status::BAD_STATE);
191 e
192 })?;
193 }
194 PushSourceRequest::UpdateDeviceProperties { properties, .. } => {
195 update_algorithm.update_device_properties(properties).await;
196 }
197 }
198 Ok(())
199 }
200
201 fn handle_update(&mut self, update: Update) {
203 match update {
204 Update::Sample(sample) => self.sample_watcher.set_value(sample),
205 Update::Status(status) => self.status_watcher.set_value(status),
206 }
207 }
208}
209
210struct WatchSampleResponder(PushSourceWatchSampleResponder);
211struct WatchStatusResponder(PushSourceWatchStatusResponder);
212
213impl WatchSender<Arc<TimeSample>> for WatchSampleResponder {
214 fn send_response(self, data: Arc<TimeSample>) {
215 let time_sample = TimeSample {
216 utc: data.utc.clone(),
217 reference: data.reference.clone(),
218 standard_deviation: data.standard_deviation.clone(),
219 ..Default::default()
220 };
221 self.0.send(&time_sample).unwrap_or_else(|e| warn!("Error sending response: {:?}", e));
222 }
223}
224
225impl WatchSender<Status> for WatchStatusResponder {
226 fn send_response(self, data: Status) {
227 self.0.send(data).unwrap_or_else(|e| warn!("Error sending response: {:?}", e));
228 }
229}
230
231pub struct TestUpdateAlgorithm {
233 receiver: Mutex<Option<Receiver<Update>>>,
235 device_property_updates: Mutex<Vec<Properties>>,
237}
238
239impl TestUpdateAlgorithm {
240 pub fn new() -> (Self, Sender<Update>) {
241 let (sender, receiver) = channel(0);
249 (
250 TestUpdateAlgorithm {
251 receiver: Mutex::new(Some(receiver)),
252 device_property_updates: Mutex::new(vec![]),
253 },
254 sender,
255 )
256 }
257}
258
259#[async_trait]
260impl UpdateAlgorithm for TestUpdateAlgorithm {
261 async fn update_device_properties(&self, properties: Properties) {
262 self.device_property_updates.lock().await.push(properties);
263 }
264
265 async fn generate_updates(&self, sink: Sender<Update>) -> Result<()> {
266 let receiver = self.receiver.lock().await.take().unwrap();
267 receiver.map(Ok).forward(sink).await?;
268 Ok(())
269 }
270}
271
272#[cfg(test)]
273mod test {
274 use super::*;
275 use assert_matches::assert_matches;
276 use fidl::endpoints::create_proxy_and_stream;
277 use fidl::Error as FidlError;
278 use fidl_fuchsia_time_external::{PushSourceMarker, PushSourceProxy};
279 use fuchsia_async as fasync;
280 use futures::{FutureExt, SinkExt};
281
282 struct TestHarness {
283 test_source: Arc<PushSource<TestUpdateAlgorithm>>,
285 tasks: Vec<fasync::Task<Result<()>>>,
287 update_sender: Sender<Update>,
289 }
290
291 impl TestHarness {
292 fn new() -> Self {
294 let (update_algorithm, update_sender) = TestUpdateAlgorithm::new();
295 let test_source = Arc::new(PushSource::new(update_algorithm, Status::Ok).unwrap());
296 let source_clone = Arc::clone(&test_source);
297 let update_task = fasync::Task::spawn(async move { source_clone.poll_updates().await });
298
299 TestHarness { test_source, tasks: vec![update_task], update_sender }
300 }
301
302 fn new_proxy(&mut self) -> PushSourceProxy {
304 let source_clone = Arc::clone(&self.test_source);
305 let (proxy, stream) = create_proxy_and_stream::<PushSourceMarker>();
306 let server_task = fasync::Task::spawn(async move {
307 source_clone.handle_requests_for_stream(stream).await
308 });
309 self.tasks.push(server_task);
310 proxy
311 }
312
313 async fn push_update(&mut self, update: Update) {
315 self.update_sender.send(update).await.unwrap();
316 }
317
318 async fn assert_device_properties(&self, properties: &[Properties]) {
320 assert_eq!(
321 self.test_source.update_algorithm.device_property_updates.lock().await.as_slice(),
322 properties
323 );
324 }
325 }
326
327 #[fuchsia::test(allow_stalls = false)]
332 async fn test_watch_sample_closes_on_multiple_watches() {
333 let mut harness = TestHarness::new();
334 let proxy = harness.new_proxy();
335
336 let first_watch_fut = proxy.watch_sample();
338 assert_matches!(
340 proxy.watch_sample().await.unwrap_err(),
341 FidlError::ClientChannelClosed { status: zx::Status::BAD_STATE, .. }
342 );
343 assert_matches!(
344 first_watch_fut.await.unwrap_err(),
345 FidlError::ClientChannelClosed { status: zx::Status::BAD_STATE, .. }
346 );
347 }
348
349 #[fuchsia::test(allow_stalls = false)]
350 async fn test_watch_status_closes_on_multiple_watches() {
351 let mut harness = TestHarness::new();
352 let proxy = harness.new_proxy();
353
354 assert_eq!(proxy.watch_status().await.unwrap(), Status::Ok);
356 let second_watch_fut = proxy.watch_status();
358 assert_matches!(
360 proxy.watch_status().await.unwrap_err(),
361 FidlError::ClientChannelClosed { status: zx::Status::BAD_STATE, .. }
362 );
363 assert_matches!(
364 second_watch_fut.await.unwrap_err(),
365 FidlError::ClientChannelClosed { status: zx::Status::BAD_STATE, .. }
366 );
367 }
368
369 #[fuchsia::test(allow_stalls = false)]
370 async fn test_watch_sample() {
371 let mut harness = TestHarness::new();
372 let proxy = harness.new_proxy();
373
374 let sample_fut = proxy.watch_sample();
376 harness
377 .push_update(Update::Sample(Arc::new(TimeSample {
378 reference: Some(zx::BootInstant::from_nanos(23)),
379 utc: Some(24),
380 standard_deviation: None,
381 ..Default::default()
382 })))
383 .await;
384 assert_eq!(
385 sample_fut.await.unwrap(),
386 TimeSample {
387 reference: Some(zx::BootInstant::from_nanos(23)),
388 utc: Some(24),
389 standard_deviation: None,
390 ..Default::default()
391 }
392 );
393
394 let sample_fut = proxy.watch_sample();
396 harness
397 .push_update(Update::Sample(Arc::new(TimeSample {
398 reference: Some(zx::BootInstant::from_nanos(25)),
399 utc: Some(26),
400 standard_deviation: None,
401 ..Default::default()
402 })))
403 .await;
404 assert_eq!(
405 sample_fut.await.unwrap(),
406 TimeSample {
407 reference: Some(zx::BootInstant::from_nanos(25)),
408 utc: Some(26),
409 standard_deviation: None,
410 ..Default::default()
411 }
412 );
413
414 assert!(proxy.watch_sample().now_or_never().is_none());
416 }
417
418 #[fuchsia::test(allow_stalls = false)]
419 async fn test_watch_sample_sent_to_all_clients() {
420 let mut harness = TestHarness::new();
421 let proxy = harness.new_proxy();
422 let proxy_2 = harness.new_proxy();
423
424 let sample_fut = proxy.watch_sample();
426 let sample_fut_2 = proxy_2.watch_sample();
427 harness
428 .push_update(Update::Sample(Arc::new(TimeSample {
429 reference: Some(zx::BootInstant::from_nanos(23)),
430 utc: Some(24),
431 standard_deviation: None,
432 ..Default::default()
433 })))
434 .await;
435 assert_eq!(
436 sample_fut.await.unwrap(),
437 TimeSample {
438 reference: Some(zx::BootInstant::from_nanos(23)),
439 utc: Some(24),
440 standard_deviation: None,
441 ..Default::default()
442 }
443 );
444 assert_eq!(
445 sample_fut_2.await.unwrap(),
446 TimeSample {
447 reference: Some(zx::BootInstant::from_nanos(23)),
448 utc: Some(24),
449 standard_deviation: None,
450 ..Default::default()
451 }
452 );
453
454 let sample_fut = proxy.watch_sample();
456 let sample_fut_2 = proxy_2.watch_sample();
457 harness
458 .push_update(Update::Sample(Arc::new(TimeSample {
459 reference: Some(zx::BootInstant::from_nanos(25)),
460 utc: Some(26),
461 standard_deviation: None,
462 ..Default::default()
463 })))
464 .await;
465 assert_eq!(
466 sample_fut.await.unwrap(),
467 TimeSample {
468 reference: Some(zx::BootInstant::from_nanos(25)),
469 utc: Some(26),
470 standard_deviation: None,
471 ..Default::default()
472 }
473 );
474 assert_eq!(
475 sample_fut_2.await.unwrap(),
476 TimeSample {
477 reference: Some(zx::BootInstant::from_nanos(25)),
478 utc: Some(26),
479 standard_deviation: None,
480 ..Default::default()
481 }
482 );
483
484 let proxy_3 = harness.new_proxy();
486 assert_eq!(
487 proxy_3.watch_sample().await.unwrap(),
488 TimeSample {
489 reference: Some(zx::BootInstant::from_nanos(25)),
490 utc: Some(26),
491 standard_deviation: None,
492 ..Default::default()
493 }
494 );
495 }
496
497 #[fuchsia::test(allow_stalls = false)]
498 async fn test_watch_status() {
499 let mut harness = TestHarness::new();
500 let proxy = harness.new_proxy();
501
502 assert_eq!(proxy.watch_status().await.unwrap(), Status::Ok);
504
505 let status_fut = proxy.watch_status();
507 harness.push_update(Update::Status(Status::Hardware)).await;
508 assert_eq!(status_fut.await.unwrap(), Status::Hardware);
509
510 assert!(proxy.watch_status().now_or_never().is_none());
512 }
513
514 #[fuchsia::test(allow_stalls = false)]
515 async fn test_watch_status_sent_to_all_clients() {
516 let mut harness = TestHarness::new();
517 let proxy = harness.new_proxy();
518 let proxy_2 = harness.new_proxy();
519
520 assert_eq!(proxy.watch_status().await.unwrap(), Status::Ok);
522 assert_eq!(proxy_2.watch_status().await.unwrap(), Status::Ok);
523
524 let status_fut = proxy.watch_status();
526 let status_fut_2 = proxy_2.watch_status();
527 harness.push_update(Update::Status(Status::Hardware)).await;
528 assert_eq!(status_fut.await.unwrap(), Status::Hardware);
529 assert_eq!(status_fut_2.await.unwrap(), Status::Hardware);
530
531 let proxy_3 = harness.new_proxy();
533 assert_eq!(proxy_3.watch_status().await.unwrap(), Status::Hardware);
534 }
535
536 #[fuchsia::test(allow_stalls = false)]
537 async fn test_property_updates_sent_to_update_algorithm() {
538 let mut harness = TestHarness::new();
539 let proxy = harness.new_proxy();
540 let proxy_2 = harness.new_proxy();
541
542 proxy.update_device_properties(&Properties::default()).unwrap();
543 proxy_2.update_device_properties(&Properties::default()).unwrap();
544
545 let _ = fasync::TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
547
548 harness.assert_device_properties(&vec![Properties::default(), Properties::default()]).await;
549 }
550}