push_source/
lib.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! The `push_source` library defines an implementation of the `PushSource` API and traits to hook
6//! in an algorithm that produces time updates.
7
8use anyhow::Result;
9use async_trait::async_trait;
10use fidl::prelude::*;
11use fidl_fuchsia_time_external::{
12    Properties, PushSourceRequest, PushSourceRequestStream, PushSourceWatchSampleResponder,
13    PushSourceWatchStatusResponder, Status, TimeSample,
14};
15
16use futures::channel::mpsc::{channel, Receiver, Sender};
17use futures::lock::Mutex;
18use futures::{StreamExt, TryStreamExt};
19use log::warn;
20use std::sync::{Arc, Weak};
21use watch_handler::{Sender as WatchSender, WatchHandler};
22
23/// A time update generated by an |UpdateAlgorithm|.
24#[derive(Clone, PartialEq, Debug)]
25pub enum Update {
26    /// A new TimeSample. The Arc may be removed once fidl tables support clone.
27    Sample(Arc<TimeSample>),
28    /// A new Status.
29    Status(Status),
30}
31
32impl From<Status> for Update {
33    fn from(status: Status) -> Self {
34        Update::Status(status)
35    }
36}
37
38impl From<TimeSample> for Update {
39    fn from(sample: TimeSample) -> Self {
40        Update::Sample(Arc::new(sample))
41    }
42}
43
44impl Update {
45    /// Returns true iff the update contained is a status.
46    pub fn is_status(&self) -> bool {
47        match self {
48            Update::Sample(_) => false,
49            Update::Status(_) => true,
50        }
51    }
52}
53
54/// An |UpdateAlgorithm| asynchronously produces Updates.
55#[async_trait]
56pub trait UpdateAlgorithm {
57    /// Update the algorithm's knowledge of device properties.
58    async fn update_device_properties(&self, properties: Properties);
59
60    /// Generate updates asynchronously and push them to |sink|. This method may run
61    /// indefinitely. This method may generate duplicate updates.
62    async fn generate_updates(&self, sink: Sender<Update>) -> Result<()>;
63}
64
65/// An implementation of |fuchsia.time.external.PushSource| that routes time updates from an
66/// |UpdateAlgorithm| to clients of the fidl protocol and routes device property updates from fidl
67/// clients to the |UpdateAlgorithm|.
68pub struct PushSource<UA: UpdateAlgorithm> {
69    /// Internal state of the push source.
70    internal: Mutex<PushSourceInternal>,
71    /// The algorithm used to obtain new updates.
72    update_algorithm: UA,
73}
74
75impl<UA: UpdateAlgorithm> PushSource<UA> {
76    /// Create a new |PushSource| that polls |update_algorithm| for time updates and starts in the
77    /// |initial_status| status.
78    pub fn new(update_algorithm: UA, initial_status: Status) -> Result<Self> {
79        Ok(Self { internal: Mutex::new(PushSourceInternal::new(initial_status)), update_algorithm })
80    }
81
82    /// Polls updates received on |update_algorithm| and pushes them to bound clients.
83    pub async fn poll_updates(&self) -> Result<()> {
84        // Updates should be processed immediately so add no extra buffer space.
85        let (sender, mut receiver) = channel(0);
86        let updater_fut = self.update_algorithm.generate_updates(sender);
87        let consumer_fut = async move {
88            while let Some(update) = receiver.next().await {
89                self.internal.lock().await.push_update(update).await;
90            }
91        };
92        let (update_res, _) = futures::future::join(updater_fut, consumer_fut).await;
93        update_res
94    }
95
96    /// Handle a single client's requests received on the given |request_stream|.
97    pub async fn handle_requests_for_stream(
98        &self,
99        mut request_stream: PushSourceRequestStream,
100    ) -> Result<()> {
101        log::debug!("handle_requests_for_stream: ");
102        let client_context = self.internal.lock().await.register_client();
103        while let Some(request) = request_stream.try_next().await? {
104            client_context.lock().await.handle_request(request, &self.update_algorithm).await?;
105        }
106        Ok(())
107    }
108}
109
110/// Contains internal state for |PushSource| that must be updated atomically.
111struct PushSourceInternal {
112    /// A set of weak pointers to registered clients.
113    clients: Vec<Weak<Mutex<PushSourceClientHandler>>>,
114    /// The last known sample.
115    latest_sample: Option<Arc<TimeSample>>,
116    /// The last known status.
117    latest_status: Status,
118}
119
120impl PushSourceInternal {
121    /// Create a new |PushSourceInternal|.
122    pub fn new(initial_status: Status) -> Self {
123        PushSourceInternal { clients: vec![], latest_sample: None, latest_status: initial_status }
124    }
125
126    /// Create a new client handler registered to receive asynchonous updates
127    /// for the duration of its lifetime.
128    pub fn register_client(&mut self) -> Arc<Mutex<PushSourceClientHandler>> {
129        log::debug!("PushSourceInternal: register_client");
130        let client = Arc::new(Mutex::new(PushSourceClientHandler {
131            sample_watcher: WatchHandler::create(self.latest_sample.clone()),
132            status_watcher: WatchHandler::create(Some(self.latest_status)),
133        }));
134        self.clients.push(Arc::downgrade(&client));
135        client
136    }
137
138    /// Push a new update to all existing clients.
139    pub async fn push_update(&mut self, update: Update) {
140        log::debug!("push_update: received update: {:?}", &update);
141        match &update {
142            Update::Sample(sample) => self.latest_sample = Some(Arc::clone(&sample)),
143            Update::Status(status) => self.latest_status = *status,
144        }
145        // Discard any references to clients that no longer exist.
146        let mut client_arcs = vec![];
147        self.clients.retain(|client_weak| match client_weak.upgrade() {
148            Some(client_arc) => {
149                client_arcs.push(client_arc);
150                true
151            }
152            None => false,
153        });
154        // Note well that the number of clients here matches the number of
155        // clients you expect. If your setup makes it so that an update comes
156        // before any clients are connected, your clients will never see that
157        // update. And if your clients depend on receiving that update, then
158        // you may have a bug on your hands.
159        log::debug!("push_update: clients to update: {}", client_arcs.len());
160        for client in client_arcs {
161            client.lock().await.handle_update(update.clone());
162        }
163    }
164}
165
166/// Per-client state for handling requests.
167struct PushSourceClientHandler {
168    /// Watcher for parking `WatchSample` requests.
169    sample_watcher: WatchHandler<Arc<TimeSample>, WatchSampleResponder>,
170    /// Watcher for parking `WatchStatus` requests.
171    status_watcher: WatchHandler<Status, WatchStatusResponder>,
172}
173
174impl PushSourceClientHandler {
175    /// Handle a fidl request received from the client.
176    async fn handle_request(
177        &mut self,
178        request: PushSourceRequest,
179        update_algorithm: &impl UpdateAlgorithm,
180    ) -> Result<()> {
181        match request {
182            PushSourceRequest::WatchSample { responder } => {
183                self.sample_watcher.watch(WatchSampleResponder(responder)).map_err(|e| {
184                    e.responder.0.control_handle().shutdown_with_epitaph(zx::Status::BAD_STATE);
185                    e
186                })?;
187            }
188            PushSourceRequest::WatchStatus { responder } => {
189                self.status_watcher.watch(WatchStatusResponder(responder)).map_err(|e| {
190                    e.responder.0.control_handle().shutdown_with_epitaph(zx::Status::BAD_STATE);
191                    e
192                })?;
193            }
194            PushSourceRequest::UpdateDeviceProperties { properties, .. } => {
195                update_algorithm.update_device_properties(properties).await;
196            }
197        }
198        Ok(())
199    }
200
201    /// Push an internal update to any hanging gets parked by the client.
202    fn handle_update(&mut self, update: Update) {
203        match update {
204            Update::Sample(sample) => self.sample_watcher.set_value(sample),
205            Update::Status(status) => self.status_watcher.set_value(status),
206        }
207    }
208}
209
210struct WatchSampleResponder(PushSourceWatchSampleResponder);
211struct WatchStatusResponder(PushSourceWatchStatusResponder);
212
213impl WatchSender<Arc<TimeSample>> for WatchSampleResponder {
214    fn send_response(self, data: Arc<TimeSample>) {
215        let time_sample = TimeSample {
216            utc: data.utc.clone(),
217            reference: data.reference.clone(),
218            standard_deviation: data.standard_deviation.clone(),
219            ..Default::default()
220        };
221        self.0.send(&time_sample).unwrap_or_else(|e| warn!("Error sending response: {:?}", e));
222    }
223}
224
225impl WatchSender<Status> for WatchStatusResponder {
226    fn send_response(self, data: Status) {
227        self.0.send(data).unwrap_or_else(|e| warn!("Error sending response: {:?}", e));
228    }
229}
230
231/// An UpdateAlgorithm that forwards updates produced by a test.
232pub struct TestUpdateAlgorithm {
233    /// Receiver that accepts updates pushed by a test.
234    receiver: Mutex<Option<Receiver<Update>>>,
235    /// List of received device property updates
236    device_property_updates: Mutex<Vec<Properties>>,
237}
238
239impl TestUpdateAlgorithm {
240    pub fn new() -> (Self, Sender<Update>) {
241        // It is important that the capacity of this channel is 0. This
242        // capacity will *block* the sender until there is someone to receive
243        // the message. This isn't normally an issue when the distribution
244        // of the channel objects is linear. However, the sequence of receivers
245        // bottoms out at a publish-subscribe hub, which may drop an event if it
246        // arrives before there are any subscribers. This will confuse the tests.
247        // The zero capacity channel prevents that sequence of events.
248        let (sender, receiver) = channel(0);
249        (
250            TestUpdateAlgorithm {
251                receiver: Mutex::new(Some(receiver)),
252                device_property_updates: Mutex::new(vec![]),
253            },
254            sender,
255        )
256    }
257}
258
259#[async_trait]
260impl UpdateAlgorithm for TestUpdateAlgorithm {
261    async fn update_device_properties(&self, properties: Properties) {
262        self.device_property_updates.lock().await.push(properties);
263    }
264
265    async fn generate_updates(&self, sink: Sender<Update>) -> Result<()> {
266        let receiver = self.receiver.lock().await.take().unwrap();
267        receiver.map(Ok).forward(sink).await?;
268        Ok(())
269    }
270}
271
272#[cfg(test)]
273mod test {
274    use super::*;
275    use assert_matches::assert_matches;
276    use fidl::endpoints::create_proxy_and_stream;
277    use fidl::Error as FidlError;
278    use fidl_fuchsia_time_external::{PushSourceMarker, PushSourceProxy};
279    use fuchsia_async as fasync;
280    use futures::{FutureExt, SinkExt};
281
282    struct TestHarness {
283        /// The PushSource under test.
284        test_source: Arc<PushSource<TestUpdateAlgorithm>>,
285        /// Tasks spawned for the test.
286        tasks: Vec<fasync::Task<Result<()>>>,
287        /// Sender that injects updates to test_source.
288        update_sender: Sender<Update>,
289    }
290
291    impl TestHarness {
292        /// Create a new TestHarness.
293        fn new() -> Self {
294            let (update_algorithm, update_sender) = TestUpdateAlgorithm::new();
295            let test_source = Arc::new(PushSource::new(update_algorithm, Status::Ok).unwrap());
296            let source_clone = Arc::clone(&test_source);
297            let update_task = fasync::Task::spawn(async move { source_clone.poll_updates().await });
298
299            TestHarness { test_source, tasks: vec![update_task], update_sender }
300        }
301
302        /// Return a new proxy connected to the test PushSource.
303        fn new_proxy(&mut self) -> PushSourceProxy {
304            let source_clone = Arc::clone(&self.test_source);
305            let (proxy, stream) = create_proxy_and_stream::<PushSourceMarker>();
306            let server_task = fasync::Task::spawn(async move {
307                source_clone.handle_requests_for_stream(stream).await
308            });
309            self.tasks.push(server_task);
310            proxy
311        }
312
313        /// Push a new update to the PushSource.
314        async fn push_update(&mut self, update: Update) {
315            self.update_sender.send(update).await.unwrap();
316        }
317
318        /// Assert that the TestUpdateAlgorithm received the property updates.
319        async fn assert_device_properties(&self, properties: &[Properties]) {
320            assert_eq!(
321                self.test_source.update_algorithm.device_property_updates.lock().await.as_slice(),
322                properties
323            );
324        }
325    }
326
327    // Since we rely on WatchHandler to achieve most of the hanging get behavior, these tests
328    // focus primarily on behavior specific to PushSource and ensuring multiple clients are
329    // supported.
330
331    #[fuchsia::test(allow_stalls = false)]
332    async fn test_watch_sample_closes_on_multiple_watches() {
333        let mut harness = TestHarness::new();
334        let proxy = harness.new_proxy();
335
336        // Since there aren't any samples yet the first call should hang
337        let first_watch_fut = proxy.watch_sample();
338        // Calling again while second watch is active should close the channel.
339        assert_matches!(
340            proxy.watch_sample().await.unwrap_err(),
341            FidlError::ClientChannelClosed { status: zx::Status::BAD_STATE, .. }
342        );
343        assert_matches!(
344            first_watch_fut.await.unwrap_err(),
345            FidlError::ClientChannelClosed { status: zx::Status::BAD_STATE, .. }
346        );
347    }
348
349    #[fuchsia::test(allow_stalls = false)]
350    async fn test_watch_status_closes_on_multiple_watches() {
351        let mut harness = TestHarness::new();
352        let proxy = harness.new_proxy();
353
354        // First watch always immediately returns Ok
355        assert_eq!(proxy.watch_status().await.unwrap(), Status::Ok);
356        // In absence of updates second watch does not finish
357        let second_watch_fut = proxy.watch_status();
358        // Calling again while second watch is active should close the channel.
359        assert_matches!(
360            proxy.watch_status().await.unwrap_err(),
361            FidlError::ClientChannelClosed { status: zx::Status::BAD_STATE, .. }
362        );
363        assert_matches!(
364            second_watch_fut.await.unwrap_err(),
365            FidlError::ClientChannelClosed { status: zx::Status::BAD_STATE, .. }
366        );
367    }
368
369    #[fuchsia::test(allow_stalls = false)]
370    async fn test_watch_sample() {
371        let mut harness = TestHarness::new();
372        let proxy = harness.new_proxy();
373
374        // The first watch completes only after update is produced.
375        let sample_fut = proxy.watch_sample();
376        harness
377            .push_update(Update::Sample(Arc::new(TimeSample {
378                reference: Some(zx::BootInstant::from_nanos(23)),
379                utc: Some(24),
380                standard_deviation: None,
381                ..Default::default()
382            })))
383            .await;
384        assert_eq!(
385            sample_fut.await.unwrap(),
386            TimeSample {
387                reference: Some(zx::BootInstant::from_nanos(23)),
388                utc: Some(24),
389                standard_deviation: None,
390                ..Default::default()
391            }
392        );
393
394        // Subsequent watches complete only after a new update is produced.
395        let sample_fut = proxy.watch_sample();
396        harness
397            .push_update(Update::Sample(Arc::new(TimeSample {
398                reference: Some(zx::BootInstant::from_nanos(25)),
399                utc: Some(26),
400                standard_deviation: None,
401                ..Default::default()
402            })))
403            .await;
404        assert_eq!(
405            sample_fut.await.unwrap(),
406            TimeSample {
407                reference: Some(zx::BootInstant::from_nanos(25)),
408                utc: Some(26),
409                standard_deviation: None,
410                ..Default::default()
411            }
412        );
413
414        // Watches hangs in absence of new update.
415        assert!(proxy.watch_sample().now_or_never().is_none());
416    }
417
418    #[fuchsia::test(allow_stalls = false)]
419    async fn test_watch_sample_sent_to_all_clients() {
420        let mut harness = TestHarness::new();
421        let proxy = harness.new_proxy();
422        let proxy_2 = harness.new_proxy();
423
424        // The first watch completes only after update is produced.
425        let sample_fut = proxy.watch_sample();
426        let sample_fut_2 = proxy_2.watch_sample();
427        harness
428            .push_update(Update::Sample(Arc::new(TimeSample {
429                reference: Some(zx::BootInstant::from_nanos(23)),
430                utc: Some(24),
431                standard_deviation: None,
432                ..Default::default()
433            })))
434            .await;
435        assert_eq!(
436            sample_fut.await.unwrap(),
437            TimeSample {
438                reference: Some(zx::BootInstant::from_nanos(23)),
439                utc: Some(24),
440                standard_deviation: None,
441                ..Default::default()
442            }
443        );
444        assert_eq!(
445            sample_fut_2.await.unwrap(),
446            TimeSample {
447                reference: Some(zx::BootInstant::from_nanos(23)),
448                utc: Some(24),
449                standard_deviation: None,
450                ..Default::default()
451            }
452        );
453
454        // Subsequent watches complete only after a new update is produced.
455        let sample_fut = proxy.watch_sample();
456        let sample_fut_2 = proxy_2.watch_sample();
457        harness
458            .push_update(Update::Sample(Arc::new(TimeSample {
459                reference: Some(zx::BootInstant::from_nanos(25)),
460                utc: Some(26),
461                standard_deviation: None,
462                ..Default::default()
463            })))
464            .await;
465        assert_eq!(
466            sample_fut.await.unwrap(),
467            TimeSample {
468                reference: Some(zx::BootInstant::from_nanos(25)),
469                utc: Some(26),
470                standard_deviation: None,
471                ..Default::default()
472            }
473        );
474        assert_eq!(
475            sample_fut_2.await.unwrap(),
476            TimeSample {
477                reference: Some(zx::BootInstant::from_nanos(25)),
478                utc: Some(26),
479                standard_deviation: None,
480                ..Default::default()
481            }
482        );
483
484        // A client that connects later gets the latest update.
485        let proxy_3 = harness.new_proxy();
486        assert_eq!(
487            proxy_3.watch_sample().await.unwrap(),
488            TimeSample {
489                reference: Some(zx::BootInstant::from_nanos(25)),
490                utc: Some(26),
491                standard_deviation: None,
492                ..Default::default()
493            }
494        );
495    }
496
497    #[fuchsia::test(allow_stalls = false)]
498    async fn test_watch_status() {
499        let mut harness = TestHarness::new();
500        let proxy = harness.new_proxy();
501
502        // The first watch completes immediately.
503        assert_eq!(proxy.watch_status().await.unwrap(), Status::Ok);
504
505        // Subsequent watches complete only after an update is produced.
506        let status_fut = proxy.watch_status();
507        harness.push_update(Update::Status(Status::Hardware)).await;
508        assert_eq!(status_fut.await.unwrap(), Status::Hardware);
509
510        // Watches hang in absence of a new update.
511        assert!(proxy.watch_status().now_or_never().is_none());
512    }
513
514    #[fuchsia::test(allow_stalls = false)]
515    async fn test_watch_status_sent_to_all_clients() {
516        let mut harness = TestHarness::new();
517        let proxy = harness.new_proxy();
518        let proxy_2 = harness.new_proxy();
519
520        // The first watch completes immediately.
521        assert_eq!(proxy.watch_status().await.unwrap(), Status::Ok);
522        assert_eq!(proxy_2.watch_status().await.unwrap(), Status::Ok);
523
524        // Subsequent watches complete only after an update is produced.
525        let status_fut = proxy.watch_status();
526        let status_fut_2 = proxy_2.watch_status();
527        harness.push_update(Update::Status(Status::Hardware)).await;
528        assert_eq!(status_fut.await.unwrap(), Status::Hardware);
529        assert_eq!(status_fut_2.await.unwrap(), Status::Hardware);
530
531        // A client that connects later gets the latest update.
532        let proxy_3 = harness.new_proxy();
533        assert_eq!(proxy_3.watch_status().await.unwrap(), Status::Hardware);
534    }
535
536    #[fuchsia::test(allow_stalls = false)]
537    async fn test_property_updates_sent_to_update_algorithm() {
538        let mut harness = TestHarness::new();
539        let proxy = harness.new_proxy();
540        let proxy_2 = harness.new_proxy();
541
542        proxy.update_device_properties(&Properties::default()).unwrap();
543        proxy_2.update_device_properties(&Properties::default()).unwrap();
544
545        // Allow tasks to service the request before checking the properties.
546        let _ = fasync::TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
547
548        harness.assert_device_properties(&vec![Properties::default(), Properties::default()]).await;
549    }
550}