pull_source/
lib.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! The `pull_source` library defines an implementation of the `PullSource` API and traits to hook
6//! in an algorithm that produces time updates.
7
8use anyhow::Error;
9use async_trait::async_trait;
10use fidl_fuchsia_time_external::{
11    self as ftexternal, Properties, PullSourceRequest, PullSourceRequestStream, TimeSample, Urgency,
12};
13
14use futures::lock::Mutex;
15use futures::TryStreamExt;
16use log::warn;
17
18/// An |UpdateAlgorithm| trait produces time samples on demand.
19#[async_trait]
20pub trait UpdateAlgorithm {
21    /// Update the algorithm's knowledge of device properties.
22    async fn update_device_properties(&self, properties: Properties);
23
24    /// Produce a new time sample, taking into account `Urgency`.
25    async fn sample(&self, urgency: Urgency) -> Result<TimeSample, SampleError>;
26
27    /// Returns the reference time at which the next sample may be produced.
28    ///
29    /// A reference timeline is always given on the boot timeline, which means
30    /// it could fall in a time when the device was suspended.  We may want
31    /// to wake the device to sample time, but also may decide not to, depending
32    /// on power policy.
33    async fn next_possible_sample_time(&self) -> zx::BootInstant;
34}
35
36/// Reasons `sample()` may fail.
37#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
38pub enum SampleError {
39    /// An error occurred that cannot be classified as one of the more specific
40    /// error statuses.
41    Unknown,
42    /// An internal error occurred. This usually indicates a bug in the
43    /// component implementation.
44    Internal,
45    /// A local resource error occurred such as IO, FIDL, or memory allocation
46    /// failure.
47    Resource,
48    /// A network error occurred.
49    Network,
50    /// Some hardware that the time source depends on failed.
51    Hardware,
52    /// A retriable error specific to the implemented time protocol occurred,
53    /// such as a malformed response from a remote server.
54    Protocol,
55    /// Sampling failed in a nonretriable way. Examples include failed
56    /// authentication, or a missing configuration.
57    ProtocolUnrecoverable,
58    /// The request was made too soon and the client should wait before making
59    /// another request.
60    RateLimited,
61}
62
63impl From<SampleError> for ftexternal::Error {
64    fn from(e: SampleError) -> Self {
65        match e {
66            SampleError::Unknown => ftexternal::Error::Unknown,
67            SampleError::Internal => ftexternal::Error::Internal,
68            SampleError::Resource => ftexternal::Error::Resource,
69            SampleError::Network => ftexternal::Error::Network,
70            SampleError::Hardware => ftexternal::Error::Hardware,
71            SampleError::Protocol => ftexternal::Error::Protocol,
72            SampleError::ProtocolUnrecoverable => ftexternal::Error::ProtocolUnrecoverable,
73            SampleError::RateLimited => ftexternal::Error::RateLimited,
74        }
75    }
76}
77
78/// An implementation of |fuchsia.time.external.PullSource| that routes time updates from an
79/// |UpdateAlgorithm| to clients of the fidl protocol and routes device property updates from fidl
80/// clients to the |UpdateAlgorithm|.
81/// This implementation is based on assumption that there's only one client.
82pub struct PullSource<UA: UpdateAlgorithm> {
83    /// The algorithm used to obtain new updates.
84    update_algorithm: UA,
85}
86
87impl<UA: UpdateAlgorithm> PullSource<UA> {
88    /// Create a new |PullSource| that polls |update_algorithm| for time updates and starts in the
89    /// |initial_status| status.
90    pub fn new(update_algorithm: UA) -> Result<Self, Error> {
91        Ok(Self { update_algorithm })
92    }
93
94    /// Handle a single client's requests received on the given |request_stream|.
95    pub async fn handle_requests_for_stream(
96        &self,
97        mut request_stream: PullSourceRequestStream,
98    ) -> Result<(), Error> {
99        while let Some(request) = request_stream.try_next().await? {
100            match request {
101                PullSourceRequest::Sample { urgency, responder } => {
102                    let sample = self.update_algorithm.sample(urgency).await;
103                    responder.send(sample.as_ref().map_err(|e| (*e).into()))?;
104                }
105                PullSourceRequest::NextPossibleSampleTime { responder, .. } => {
106                    responder.send(
107                        self.update_algorithm.next_possible_sample_time().await.into_nanos(),
108                    )?;
109                }
110                PullSourceRequest::UpdateDeviceProperties { properties, .. } => {
111                    self.update_algorithm.update_device_properties(properties).await;
112                }
113            }
114        }
115        Ok(())
116    }
117}
118
119/// An UpdateAlgorithm that is backed up by the samples, set up by a test.
120/// This implementation allows other crates and integration tests to use an implementation of
121/// `UpdateAlgorithm`.
122pub struct TestUpdateAlgorithm {
123    /// List of received device property updates
124    device_property_updates: Mutex<Vec<Properties>>,
125
126    /// Time Samples to be generated by `sample()`.
127    samples: Mutex<Vec<(Urgency, Result<TimeSample, SampleError>)>>,
128}
129
130impl TestUpdateAlgorithm {
131    /// Create a new instance of `TestUpdateAlgorithm` with empty collection of samples to be used.
132    pub fn new() -> Self {
133        let device_property_updates = Mutex::new(Vec::new());
134        let samples = Mutex::new(Vec::new());
135        TestUpdateAlgorithm { device_property_updates, samples }
136    }
137}
138
139#[async_trait]
140impl UpdateAlgorithm for TestUpdateAlgorithm {
141    async fn update_device_properties(&self, properties: Properties) {
142        self.device_property_updates.lock().await.push(properties);
143    }
144
145    async fn sample(&self, urgency: Urgency) -> Result<TimeSample, SampleError> {
146        let mut samples = self.samples.lock().await;
147        if samples.is_empty() {
148            warn!("No test samples found.");
149            return Err(SampleError::Internal);
150        }
151        let (expected_urgency, sample) = samples.remove(0);
152        if urgency == expected_urgency {
153            sample
154        } else {
155            warn!("Wrong urgency provided: expected {:?}, got {:?}.", expected_urgency, urgency);
156            Err(SampleError::Internal)
157        }
158    }
159
160    async fn next_possible_sample_time(&self) -> zx::BootInstant {
161        zx::BootInstant::get()
162    }
163}
164
165#[cfg(test)]
166mod test {
167    use super::*;
168    use fidl::endpoints::create_proxy_and_stream;
169    use fidl_fuchsia_time_external::{PullSourceMarker, PullSourceProxy};
170    use fuchsia_async as fasync;
171    use std::sync::Arc;
172
173    struct TestHarness {
174        /// The `PullSource` under test.
175        test_source: Arc<PullSource<TestUpdateAlgorithm>>,
176
177        /// Task which handles requests from `PullSource` proxy client.
178        _server: fasync::Task<Result<(), Error>>,
179    }
180
181    impl TestHarness {
182        fn new() -> (Self, PullSourceProxy) {
183            let update_algorithm = TestUpdateAlgorithm::new();
184            let test_source = Arc::new(PullSource::new(update_algorithm).unwrap());
185            let (proxy, stream) = create_proxy_and_stream::<PullSourceMarker>();
186            let server = fasync::Task::spawn({
187                let test_source = Arc::clone(&test_source);
188                async move { test_source.handle_requests_for_stream(stream).await }
189            });
190            (TestHarness { test_source, _server: server }, proxy)
191        }
192
193        async fn add_sample(&mut self, urgency: Urgency, sample: Result<TimeSample, SampleError>) {
194            self.test_source.update_algorithm.samples.lock().await.push((urgency, sample));
195        }
196
197        async fn get_device_properties(&self) -> Vec<Properties> {
198            self.test_source.update_algorithm.device_property_updates.lock().await.clone()
199        }
200    }
201
202    #[fuchsia::test]
203    async fn test_empty_harness() {
204        let (_harness, client) = TestHarness::new();
205        // Should generate an error here since there are no events set up.
206        let result = client.sample(Urgency::Low).await;
207        assert!(result.is_ok());
208        assert_eq!(result.unwrap(), Err(ftexternal::Error::Internal),);
209    }
210
211    #[fuchsia::test]
212    async fn test_harness_expects_sample_urgency() {
213        let (mut harness, client) = TestHarness::new();
214
215        harness
216            .add_sample(
217                Urgency::Low,
218                Ok(TimeSample {
219                    reference: Some(zx::BootInstant::from_nanos(12)),
220                    utc: Some(34),
221                    standard_deviation: None,
222                    ..Default::default()
223                }),
224            )
225            .await;
226        // Should generate an error here since there requested urgency doesn't match provided.
227        let result = client.sample(Urgency::High).await;
228        assert!(result.is_ok());
229        assert_eq!(result.unwrap(), Err(ftexternal::Error::Internal),);
230    }
231
232    #[fuchsia::test]
233    async fn test_multiple_samples() {
234        let (mut harness, client) = TestHarness::new();
235
236        harness
237            .add_sample(
238                Urgency::Low,
239                Ok(TimeSample {
240                    reference: Some(zx::BootInstant::from_nanos(12)),
241                    utc: Some(34),
242                    standard_deviation: None,
243                    ..Default::default()
244                }),
245            )
246            .await;
247        harness
248            .add_sample(
249                Urgency::High,
250                Ok(TimeSample {
251                    reference: Some(zx::BootInstant::from_nanos(56)),
252                    utc: Some(78),
253                    standard_deviation: None,
254                    ..Default::default()
255                }),
256            )
257            .await;
258
259        assert_eq!(
260            client.sample(Urgency::Low).await.unwrap().unwrap(),
261            TimeSample {
262                reference: Some(zx::BootInstant::from_nanos(12)),
263                utc: Some(34),
264                standard_deviation: None,
265                ..Default::default()
266            }
267        );
268        assert_eq!(
269            client.sample(Urgency::High).await.unwrap().unwrap(),
270            TimeSample {
271                reference: Some(zx::BootInstant::from_nanos(56)),
272                utc: Some(78),
273                standard_deviation: None,
274                ..Default::default()
275            }
276        );
277    }
278
279    #[fuchsia::test(allow_stalls = false)]
280    async fn test_property_updates_sent_to_update_algorithm() {
281        let (harness, client) = TestHarness::new();
282
283        client.update_device_properties(&Properties::default()).unwrap();
284
285        // Allow tasks to service the request before checking the properties.
286        let _ = fasync::TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
287
288        assert_eq!(harness.get_device_properties().await, vec![Properties::default()]);
289    }
290}