Skip to main content

pull_source/
lib.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! The `pull_source` library defines an implementation of the `PullSource` API and traits to hook
6//! in an algorithm that produces time updates.
7
8use anyhow::Error;
9use async_trait::async_trait;
10use fidl_fuchsia_time_external::{
11    self as ftexternal, Properties, PullSourceRequest, PullSourceRequestStream, TimeSample, Urgency,
12};
13
14use futures::TryStreamExt;
15use futures::lock::Mutex;
16use log::warn;
17
18/// An |UpdateAlgorithm| trait produces time samples on demand.
19#[async_trait]
20pub trait UpdateAlgorithm {
21    /// Update the algorithm's knowledge of device properties.
22    async fn update_device_properties(&self, properties: Properties);
23
24    /// Produce a new time sample, taking into account `Urgency`.
25    async fn sample(&self, urgency: Urgency) -> Result<TimeSample, SampleError>;
26
27    /// Returns the reference time at which the next sample may be produced.
28    ///
29    /// A reference timeline is always given on the boot timeline, which means
30    /// it could fall in a time when the device was suspended.  We may want
31    /// to wake the device to sample time, but also may decide not to, depending
32    /// on power policy.
33    async fn next_possible_sample_time(&self) -> zx::BootInstant;
34}
35
36/// Reasons `sample()` may fail.
37#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
38pub enum SampleError {
39    /// An error occurred that cannot be classified as one of the more specific
40    /// error statuses.
41    Unknown,
42    /// An internal error occurred. This usually indicates a bug in the
43    /// component implementation.
44    Internal,
45    /// A local resource error occurred such as IO, FIDL, or memory allocation
46    /// failure.
47    Resource,
48    /// A network error occurred.
49    Network,
50    /// Some hardware that the time source depends on failed.
51    Hardware,
52    /// A retriable error specific to the implemented time protocol occurred,
53    /// such as a malformed response from a remote server.
54    Protocol,
55    /// Sampling failed in a nonretriable way. Examples include failed
56    /// authentication, or a missing configuration.
57    ProtocolUnrecoverable,
58    /// The request was made too soon and the client should wait before making
59    /// another request.
60    RateLimited,
61}
62
63impl From<SampleError> for ftexternal::Error {
64    fn from(e: SampleError) -> Self {
65        match e {
66            SampleError::Unknown => ftexternal::Error::Unknown,
67            SampleError::Internal => ftexternal::Error::Internal,
68            SampleError::Resource => ftexternal::Error::Resource,
69            SampleError::Network => ftexternal::Error::Network,
70            SampleError::Hardware => ftexternal::Error::Hardware,
71            SampleError::Protocol => ftexternal::Error::Protocol,
72            SampleError::ProtocolUnrecoverable => ftexternal::Error::ProtocolUnrecoverable,
73            SampleError::RateLimited => ftexternal::Error::RateLimited,
74        }
75    }
76}
77
78/// An implementation of |fuchsia.time.external.PullSource| that routes time updates from an
79/// |UpdateAlgorithm| to clients of the fidl protocol and routes device property updates from fidl
80/// clients to the |UpdateAlgorithm|.
81/// This implementation is based on assumption that there's only one client.
82pub struct PullSource<UA: UpdateAlgorithm> {
83    /// The algorithm used to obtain new updates.
84    update_algorithm: UA,
85}
86
87impl<UA: UpdateAlgorithm> PullSource<UA> {
88    /// Create a new |PullSource| that polls |update_algorithm| for time updates and starts in the
89    /// |initial_status| status.
90    pub fn new(update_algorithm: UA) -> Result<Self, Error> {
91        Ok(Self { update_algorithm })
92    }
93
94    /// Handle a single client's requests received on the given |request_stream|.
95    pub async fn handle_requests_for_stream(
96        &self,
97        mut request_stream: PullSourceRequestStream,
98    ) -> Result<(), Error> {
99        while let Some(request) = request_stream.try_next().await? {
100            match request {
101                PullSourceRequest::Sample { urgency, responder } => {
102                    let sample = self.update_algorithm.sample(urgency).await;
103                    responder.send(sample.as_ref().map_err(|e| (*e).into()))?;
104                }
105                PullSourceRequest::NextPossibleSampleTime { responder, .. } => {
106                    responder.send(
107                        self.update_algorithm.next_possible_sample_time().await.into_nanos(),
108                    )?;
109                }
110                PullSourceRequest::UpdateDeviceProperties { properties, .. } => {
111                    self.update_algorithm.update_device_properties(properties).await;
112                }
113                PullSourceRequest::Shutdown { responder: _, .. } => {
114                    // TODO: b/367743262 - handle shutdown request
115                    warn!("receive shutdown request");
116                }
117            }
118        }
119        Ok(())
120    }
121}
122
123/// An UpdateAlgorithm that is backed up by the samples, set up by a test.
124/// This implementation allows other crates and integration tests to use an implementation of
125/// `UpdateAlgorithm`.
126pub struct TestUpdateAlgorithm {
127    /// List of received device property updates
128    device_property_updates: Mutex<Vec<Properties>>,
129
130    /// Time Samples to be generated by `sample()`.
131    samples: Mutex<Vec<(Urgency, Result<TimeSample, SampleError>)>>,
132}
133
134impl TestUpdateAlgorithm {
135    /// Create a new instance of `TestUpdateAlgorithm` with empty collection of samples to be used.
136    pub fn new() -> Self {
137        let device_property_updates = Mutex::new(Vec::new());
138        let samples = Mutex::new(Vec::new());
139        TestUpdateAlgorithm { device_property_updates, samples }
140    }
141}
142
143#[async_trait]
144impl UpdateAlgorithm for TestUpdateAlgorithm {
145    async fn update_device_properties(&self, properties: Properties) {
146        self.device_property_updates.lock().await.push(properties);
147    }
148
149    async fn sample(&self, urgency: Urgency) -> Result<TimeSample, SampleError> {
150        let mut samples = self.samples.lock().await;
151        if samples.is_empty() {
152            warn!("No test samples found.");
153            return Err(SampleError::Internal);
154        }
155        let (expected_urgency, sample) = samples.remove(0);
156        if urgency == expected_urgency {
157            sample
158        } else {
159            warn!("Wrong urgency provided: expected {:?}, got {:?}.", expected_urgency, urgency);
160            Err(SampleError::Internal)
161        }
162    }
163
164    async fn next_possible_sample_time(&self) -> zx::BootInstant {
165        zx::BootInstant::get()
166    }
167}
168
169#[cfg(test)]
170mod test {
171    use super::*;
172    use fidl::endpoints::create_proxy_and_stream;
173    use fidl_fuchsia_time_external::{PullSourceMarker, PullSourceProxy};
174    use fuchsia_async as fasync;
175    use std::sync::Arc;
176
177    struct TestHarness {
178        /// The `PullSource` under test.
179        test_source: Arc<PullSource<TestUpdateAlgorithm>>,
180
181        /// Task which handles requests from `PullSource` proxy client.
182        _server: fasync::Task<Result<(), Error>>,
183    }
184
185    impl TestHarness {
186        fn new() -> (Self, PullSourceProxy) {
187            let update_algorithm = TestUpdateAlgorithm::new();
188            let test_source = Arc::new(PullSource::new(update_algorithm).unwrap());
189            let (proxy, stream) = create_proxy_and_stream::<PullSourceMarker>();
190            let server = fasync::Task::spawn({
191                let test_source = Arc::clone(&test_source);
192                async move { test_source.handle_requests_for_stream(stream).await }
193            });
194            (TestHarness { test_source, _server: server }, proxy)
195        }
196
197        async fn add_sample(&mut self, urgency: Urgency, sample: Result<TimeSample, SampleError>) {
198            self.test_source.update_algorithm.samples.lock().await.push((urgency, sample));
199        }
200
201        async fn get_device_properties(&self) -> Vec<Properties> {
202            self.test_source.update_algorithm.device_property_updates.lock().await.clone()
203        }
204    }
205
206    #[fuchsia::test]
207    async fn test_empty_harness() {
208        let (_harness, client) = TestHarness::new();
209        // Should generate an error here since there are no events set up.
210        let result = client.sample(Urgency::Low).await;
211        assert!(result.is_ok());
212        assert_eq!(result.unwrap(), Err(ftexternal::Error::Internal),);
213    }
214
215    #[fuchsia::test]
216    async fn test_harness_expects_sample_urgency() {
217        let (mut harness, client) = TestHarness::new();
218
219        harness
220            .add_sample(
221                Urgency::Low,
222                Ok(TimeSample {
223                    reference: Some(zx::BootInstant::from_nanos(12)),
224                    utc: Some(34),
225                    standard_deviation: None,
226                    ..Default::default()
227                }),
228            )
229            .await;
230        // Should generate an error here since there requested urgency doesn't match provided.
231        let result = client.sample(Urgency::High).await;
232        assert!(result.is_ok());
233        assert_eq!(result.unwrap(), Err(ftexternal::Error::Internal),);
234    }
235
236    #[fuchsia::test]
237    async fn test_multiple_samples() {
238        let (mut harness, client) = TestHarness::new();
239
240        harness
241            .add_sample(
242                Urgency::Low,
243                Ok(TimeSample {
244                    reference: Some(zx::BootInstant::from_nanos(12)),
245                    utc: Some(34),
246                    standard_deviation: None,
247                    ..Default::default()
248                }),
249            )
250            .await;
251        harness
252            .add_sample(
253                Urgency::High,
254                Ok(TimeSample {
255                    reference: Some(zx::BootInstant::from_nanos(56)),
256                    utc: Some(78),
257                    standard_deviation: None,
258                    ..Default::default()
259                }),
260            )
261            .await;
262
263        assert_eq!(
264            client.sample(Urgency::Low).await.unwrap().unwrap(),
265            TimeSample {
266                reference: Some(zx::BootInstant::from_nanos(12)),
267                utc: Some(34),
268                standard_deviation: None,
269                ..Default::default()
270            }
271        );
272        assert_eq!(
273            client.sample(Urgency::High).await.unwrap().unwrap(),
274            TimeSample {
275                reference: Some(zx::BootInstant::from_nanos(56)),
276                utc: Some(78),
277                standard_deviation: None,
278                ..Default::default()
279            }
280        );
281    }
282
283    #[fuchsia::test(allow_stalls = false)]
284    async fn test_property_updates_sent_to_update_algorithm() {
285        let (harness, client) = TestHarness::new();
286
287        client.update_device_properties(&Properties::default()).unwrap();
288
289        // Allow tasks to service the request before checking the properties.
290        let _ = fasync::TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
291
292        assert_eq!(harness.get_device_properties().await, vec![Properties::default()]);
293    }
294}