Skip to main content

pull_source/
lib.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! The `pull_source` library defines an implementation of the `PullSource` API and traits to hook
6//! in an algorithm that produces time updates.
7
8use anyhow::Error;
9use async_trait::async_trait;
10use fidl_fuchsia_time_external::{
11    self as ftexternal, Properties, PullSourceRequest, PullSourceRequestStream, TimeSample, Urgency,
12};
13
14use futures::TryStreamExt;
15use futures::lock::Mutex;
16use log::warn;
17use std::sync::Arc;
18
19/// An |UpdateAlgorithm| trait produces time samples on demand.
20#[async_trait]
21pub trait UpdateAlgorithm {
22    /// Update the algorithm's knowledge of device properties.
23    async fn update_device_properties(&self, properties: Properties);
24
25    /// Produce a new time sample, taking into account `Urgency`.
26    async fn sample(&self, urgency: Urgency) -> Result<TimeSample, SampleError>;
27
28    /// Returns the reference time at which the next sample may be produced.
29    ///
30    /// A reference timeline is always given on the boot timeline, which means
31    /// it could fall in a time when the device was suspended.  We may want
32    /// to wake the device to sample time, but also may decide not to, depending
33    /// on power policy.
34    async fn next_possible_sample_time(&self) -> zx::BootInstant;
35}
36
37/// Reasons `sample()` may fail.
38#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
39pub enum SampleError {
40    /// An error occurred that cannot be classified as one of the more specific
41    /// error statuses.
42    Unknown,
43    /// An internal error occurred. This usually indicates a bug in the
44    /// component implementation.
45    Internal,
46    /// A local resource error occurred such as IO, FIDL, or memory allocation
47    /// failure.
48    Resource,
49    /// A network error occurred.
50    Network,
51    /// Some hardware that the time source depends on failed.
52    Hardware,
53    /// A retriable error specific to the implemented time protocol occurred,
54    /// such as a malformed response from a remote server.
55    Protocol,
56    /// Sampling failed in a nonretriable way. Examples include failed
57    /// authentication, or a missing configuration.
58    ProtocolUnrecoverable,
59    /// The request was made too soon and the client should wait before making
60    /// another request.
61    RateLimited,
62}
63
64impl From<SampleError> for ftexternal::Error {
65    fn from(e: SampleError) -> Self {
66        match e {
67            SampleError::Unknown => ftexternal::Error::Unknown,
68            SampleError::Internal => ftexternal::Error::Internal,
69            SampleError::Resource => ftexternal::Error::Resource,
70            SampleError::Network => ftexternal::Error::Network,
71            SampleError::Hardware => ftexternal::Error::Hardware,
72            SampleError::Protocol => ftexternal::Error::Protocol,
73            SampleError::ProtocolUnrecoverable => ftexternal::Error::ProtocolUnrecoverable,
74            SampleError::RateLimited => ftexternal::Error::RateLimited,
75        }
76    }
77}
78
79/// An implementation of |fuchsia.time.external.PullSource| that routes time updates from an
80/// |UpdateAlgorithm| to clients of the fidl protocol and routes device property updates from fidl
81/// clients to the |UpdateAlgorithm|.
82/// This implementation is based on assumption that there's only one client.
83pub struct PullSource<UA: UpdateAlgorithm> {
84    /// The algorithm used to obtain new updates.
85    update_algorithm: UA,
86}
87
88impl<UA: UpdateAlgorithm> PullSource<UA> {
89    /// Create a new |PullSource| that polls |update_algorithm| for time updates and starts in the
90    /// |initial_status| status.
91    pub fn new(update_algorithm: UA) -> Result<Self, Error> {
92        Ok(Self { update_algorithm })
93    }
94
95    /// Handle a single client's requests received on the given |request_stream|.
96    pub async fn handle_requests_for_stream(
97        &self,
98        mut request_stream: PullSourceRequestStream,
99        inspect_controller: Arc<Mutex<Option<inspect_runtime::PublishedInspectController>>>,
100    ) -> Result<(), Error> {
101        while let Some(request) = request_stream.try_next().await? {
102            match request {
103                PullSourceRequest::Sample { urgency, responder } => {
104                    let sample = self.update_algorithm.sample(urgency).await;
105                    responder.send(sample.as_ref().map_err(|e| (*e).into()))?;
106                }
107                PullSourceRequest::NextPossibleSampleTime { responder, .. } => {
108                    responder.send(
109                        self.update_algorithm.next_possible_sample_time().await.into_nanos(),
110                    )?;
111                }
112                PullSourceRequest::UpdateDeviceProperties { properties, .. } => {
113                    self.update_algorithm.update_device_properties(properties).await;
114                }
115                PullSourceRequest::Shutdown { responder } => {
116                    let mut controller_lock = inspect_controller.lock().await;
117                    if let Some(controller) = controller_lock.take() {
118                        let options = inspect_runtime::EscrowOptions::default();
119                        match controller.escrow_frozen(options).await {
120                            Ok(token) => {
121                                if let Err(e) = responder.send(Ok(token)) {
122                                    warn!("Failed to send shutdown response: {:?}", e);
123                                }
124                            }
125                            Err(e) => {
126                                warn!("Failed to escrow frozen Inspect controller: {:?}", e);
127                                if let Err(e) = responder
128                                    .send(Err(ftexternal::EscrowError::EscrowInspectFailed))
129                                {
130                                    warn!("Failed to send shutdown error response: {:?}", e);
131                                }
132                            }
133                        }
134                    } else {
135                        warn!("Inspect controller already taken or None during shutdown");
136                        if let Err(e) =
137                            responder.send(Err(ftexternal::EscrowError::EscrowNoInspectController))
138                        {
139                            warn!("Failed to send shutdown error response: {:?}", e);
140                        }
141                    }
142                    return Ok(());
143                }
144            }
145        }
146        Ok(())
147    }
148}
149
150/// An UpdateAlgorithm that is backed up by the samples, set up by a test.
151/// This implementation allows other crates and integration tests to use an implementation of
152/// `UpdateAlgorithm`.
153pub struct TestUpdateAlgorithm {
154    /// List of received device property updates
155    device_property_updates: Mutex<Vec<Properties>>,
156
157    /// Time Samples to be generated by `sample()`.
158    samples: Mutex<Vec<(Urgency, Result<TimeSample, SampleError>)>>,
159}
160
161impl TestUpdateAlgorithm {
162    /// Create a new instance of `TestUpdateAlgorithm` with empty collection of samples to be used.
163    pub fn new() -> Self {
164        let device_property_updates = Mutex::new(Vec::new());
165        let samples = Mutex::new(Vec::new());
166        TestUpdateAlgorithm { device_property_updates, samples }
167    }
168}
169
170#[async_trait]
171impl UpdateAlgorithm for TestUpdateAlgorithm {
172    async fn update_device_properties(&self, properties: Properties) {
173        self.device_property_updates.lock().await.push(properties);
174    }
175
176    async fn sample(&self, urgency: Urgency) -> Result<TimeSample, SampleError> {
177        let mut samples = self.samples.lock().await;
178        if samples.is_empty() {
179            warn!("No test samples found.");
180            return Err(SampleError::Internal);
181        }
182        let (expected_urgency, sample) = samples.remove(0);
183        if urgency == expected_urgency {
184            sample
185        } else {
186            warn!("Wrong urgency provided: expected {:?}, got {:?}.", expected_urgency, urgency);
187            Err(SampleError::Internal)
188        }
189    }
190
191    async fn next_possible_sample_time(&self) -> zx::BootInstant {
192        zx::BootInstant::get()
193    }
194}
195
196#[cfg(test)]
197mod test {
198    use super::*;
199    use fidl::endpoints::create_proxy_and_stream;
200    use fidl_fuchsia_time_external::{PullSourceMarker, PullSourceProxy};
201    use fuchsia_async as fasync;
202    use std::sync::Arc;
203
204    struct TestHarness {
205        /// The `PullSource` under test.
206        test_source: Arc<PullSource<TestUpdateAlgorithm>>,
207
208        /// Task which handles requests from `PullSource` proxy client.
209        _server: fasync::Task<Result<(), Error>>,
210    }
211
212    impl TestHarness {
213        fn new() -> (Self, PullSourceProxy) {
214            let update_algorithm = TestUpdateAlgorithm::new();
215            let test_source = Arc::new(PullSource::new(update_algorithm).unwrap());
216            let (proxy, stream) = create_proxy_and_stream::<PullSourceMarker>();
217            let server = fasync::Task::spawn({
218                let test_source = Arc::clone(&test_source);
219                async move {
220                    test_source.handle_requests_for_stream(stream, Arc::new(Mutex::new(None))).await
221                }
222            });
223            (TestHarness { test_source, _server: server }, proxy)
224        }
225
226        async fn add_sample(&mut self, urgency: Urgency, sample: Result<TimeSample, SampleError>) {
227            self.test_source.update_algorithm.samples.lock().await.push((urgency, sample));
228        }
229
230        async fn get_device_properties(&self) -> Vec<Properties> {
231            self.test_source.update_algorithm.device_property_updates.lock().await.clone()
232        }
233    }
234
235    #[fuchsia::test]
236    async fn test_empty_harness() {
237        let (_harness, client) = TestHarness::new();
238        // Should generate an error here since there are no events set up.
239        let result = client.sample(Urgency::Low).await;
240        assert!(result.is_ok());
241        assert_eq!(result.unwrap(), Err(ftexternal::Error::Internal),);
242    }
243
244    #[fuchsia::test]
245    async fn test_harness_expects_sample_urgency() {
246        let (mut harness, client) = TestHarness::new();
247
248        harness
249            .add_sample(
250                Urgency::Low,
251                Ok(TimeSample {
252                    reference: Some(zx::BootInstant::from_nanos(12)),
253                    utc: Some(34),
254                    standard_deviation: None,
255                    ..Default::default()
256                }),
257            )
258            .await;
259        // Should generate an error here since there requested urgency doesn't match provided.
260        let result = client.sample(Urgency::High).await;
261        assert!(result.is_ok());
262        assert_eq!(result.unwrap(), Err(ftexternal::Error::Internal),);
263    }
264
265    #[fuchsia::test]
266    async fn test_multiple_samples() {
267        let (mut harness, client) = TestHarness::new();
268
269        harness
270            .add_sample(
271                Urgency::Low,
272                Ok(TimeSample {
273                    reference: Some(zx::BootInstant::from_nanos(12)),
274                    utc: Some(34),
275                    standard_deviation: None,
276                    ..Default::default()
277                }),
278            )
279            .await;
280        harness
281            .add_sample(
282                Urgency::High,
283                Ok(TimeSample {
284                    reference: Some(zx::BootInstant::from_nanos(56)),
285                    utc: Some(78),
286                    standard_deviation: None,
287                    ..Default::default()
288                }),
289            )
290            .await;
291
292        assert_eq!(
293            client.sample(Urgency::Low).await.unwrap().unwrap(),
294            TimeSample {
295                reference: Some(zx::BootInstant::from_nanos(12)),
296                utc: Some(34),
297                standard_deviation: None,
298                ..Default::default()
299            }
300        );
301        assert_eq!(
302            client.sample(Urgency::High).await.unwrap().unwrap(),
303            TimeSample {
304                reference: Some(zx::BootInstant::from_nanos(56)),
305                utc: Some(78),
306                standard_deviation: None,
307                ..Default::default()
308            }
309        );
310    }
311
312    #[fuchsia::test(allow_stalls = false)]
313    async fn test_property_updates_sent_to_update_algorithm() {
314        let (harness, client) = TestHarness::new();
315
316        client.update_device_properties(&Properties::default()).unwrap();
317
318        // Allow tasks to service the request before checking the properties.
319        let _ = fasync::TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
320
321        assert_eq!(harness.get_device_properties().await, vec![Properties::default()]);
322    }
323}