1use anyhow::Error;
9use async_trait::async_trait;
10use fidl_fuchsia_time_external::{
11 self as ftexternal, Properties, PullSourceRequest, PullSourceRequestStream, TimeSample, Urgency,
12};
13
14use futures::lock::Mutex;
15use futures::TryStreamExt;
16use log::warn;
17
18#[async_trait]
20pub trait UpdateAlgorithm {
21 async fn update_device_properties(&self, properties: Properties);
23
24 async fn sample(&self, urgency: Urgency) -> Result<TimeSample, SampleError>;
26
27 async fn next_possible_sample_time(&self) -> zx::BootInstant;
34}
35
36#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
38pub enum SampleError {
39 Unknown,
42 Internal,
45 Resource,
48 Network,
50 Hardware,
52 Protocol,
55 ProtocolUnrecoverable,
58 RateLimited,
61}
62
63impl From<SampleError> for ftexternal::Error {
64 fn from(e: SampleError) -> Self {
65 match e {
66 SampleError::Unknown => ftexternal::Error::Unknown,
67 SampleError::Internal => ftexternal::Error::Internal,
68 SampleError::Resource => ftexternal::Error::Resource,
69 SampleError::Network => ftexternal::Error::Network,
70 SampleError::Hardware => ftexternal::Error::Hardware,
71 SampleError::Protocol => ftexternal::Error::Protocol,
72 SampleError::ProtocolUnrecoverable => ftexternal::Error::ProtocolUnrecoverable,
73 SampleError::RateLimited => ftexternal::Error::RateLimited,
74 }
75 }
76}
77
78pub struct PullSource<UA: UpdateAlgorithm> {
83 update_algorithm: UA,
85}
86
87impl<UA: UpdateAlgorithm> PullSource<UA> {
88 pub fn new(update_algorithm: UA) -> Result<Self, Error> {
91 Ok(Self { update_algorithm })
92 }
93
94 pub async fn handle_requests_for_stream(
96 &self,
97 mut request_stream: PullSourceRequestStream,
98 ) -> Result<(), Error> {
99 while let Some(request) = request_stream.try_next().await? {
100 match request {
101 PullSourceRequest::Sample { urgency, responder } => {
102 let sample = self.update_algorithm.sample(urgency).await;
103 responder.send(sample.as_ref().map_err(|e| (*e).into()))?;
104 }
105 PullSourceRequest::NextPossibleSampleTime { responder, .. } => {
106 responder.send(
107 self.update_algorithm.next_possible_sample_time().await.into_nanos(),
108 )?;
109 }
110 PullSourceRequest::UpdateDeviceProperties { properties, .. } => {
111 self.update_algorithm.update_device_properties(properties).await;
112 }
113 }
114 }
115 Ok(())
116 }
117}
118
119pub struct TestUpdateAlgorithm {
123 device_property_updates: Mutex<Vec<Properties>>,
125
126 samples: Mutex<Vec<(Urgency, Result<TimeSample, SampleError>)>>,
128}
129
130impl TestUpdateAlgorithm {
131 pub fn new() -> Self {
133 let device_property_updates = Mutex::new(Vec::new());
134 let samples = Mutex::new(Vec::new());
135 TestUpdateAlgorithm { device_property_updates, samples }
136 }
137}
138
139#[async_trait]
140impl UpdateAlgorithm for TestUpdateAlgorithm {
141 async fn update_device_properties(&self, properties: Properties) {
142 self.device_property_updates.lock().await.push(properties);
143 }
144
145 async fn sample(&self, urgency: Urgency) -> Result<TimeSample, SampleError> {
146 let mut samples = self.samples.lock().await;
147 if samples.is_empty() {
148 warn!("No test samples found.");
149 return Err(SampleError::Internal);
150 }
151 let (expected_urgency, sample) = samples.remove(0);
152 if urgency == expected_urgency {
153 sample
154 } else {
155 warn!("Wrong urgency provided: expected {:?}, got {:?}.", expected_urgency, urgency);
156 Err(SampleError::Internal)
157 }
158 }
159
160 async fn next_possible_sample_time(&self) -> zx::BootInstant {
161 zx::BootInstant::get()
162 }
163}
164
165#[cfg(test)]
166mod test {
167 use super::*;
168 use fidl::endpoints::create_proxy_and_stream;
169 use fidl_fuchsia_time_external::{PullSourceMarker, PullSourceProxy};
170 use fuchsia_async as fasync;
171 use std::sync::Arc;
172
173 struct TestHarness {
174 test_source: Arc<PullSource<TestUpdateAlgorithm>>,
176
177 _server: fasync::Task<Result<(), Error>>,
179 }
180
181 impl TestHarness {
182 fn new() -> (Self, PullSourceProxy) {
183 let update_algorithm = TestUpdateAlgorithm::new();
184 let test_source = Arc::new(PullSource::new(update_algorithm).unwrap());
185 let (proxy, stream) = create_proxy_and_stream::<PullSourceMarker>();
186 let server = fasync::Task::spawn({
187 let test_source = Arc::clone(&test_source);
188 async move { test_source.handle_requests_for_stream(stream).await }
189 });
190 (TestHarness { test_source, _server: server }, proxy)
191 }
192
193 async fn add_sample(&mut self, urgency: Urgency, sample: Result<TimeSample, SampleError>) {
194 self.test_source.update_algorithm.samples.lock().await.push((urgency, sample));
195 }
196
197 async fn get_device_properties(&self) -> Vec<Properties> {
198 self.test_source.update_algorithm.device_property_updates.lock().await.clone()
199 }
200 }
201
202 #[fuchsia::test]
203 async fn test_empty_harness() {
204 let (_harness, client) = TestHarness::new();
205 let result = client.sample(Urgency::Low).await;
207 assert!(result.is_ok());
208 assert_eq!(result.unwrap(), Err(ftexternal::Error::Internal),);
209 }
210
211 #[fuchsia::test]
212 async fn test_harness_expects_sample_urgency() {
213 let (mut harness, client) = TestHarness::new();
214
215 harness
216 .add_sample(
217 Urgency::Low,
218 Ok(TimeSample {
219 reference: Some(zx::BootInstant::from_nanos(12)),
220 utc: Some(34),
221 standard_deviation: None,
222 ..Default::default()
223 }),
224 )
225 .await;
226 let result = client.sample(Urgency::High).await;
228 assert!(result.is_ok());
229 assert_eq!(result.unwrap(), Err(ftexternal::Error::Internal),);
230 }
231
232 #[fuchsia::test]
233 async fn test_multiple_samples() {
234 let (mut harness, client) = TestHarness::new();
235
236 harness
237 .add_sample(
238 Urgency::Low,
239 Ok(TimeSample {
240 reference: Some(zx::BootInstant::from_nanos(12)),
241 utc: Some(34),
242 standard_deviation: None,
243 ..Default::default()
244 }),
245 )
246 .await;
247 harness
248 .add_sample(
249 Urgency::High,
250 Ok(TimeSample {
251 reference: Some(zx::BootInstant::from_nanos(56)),
252 utc: Some(78),
253 standard_deviation: None,
254 ..Default::default()
255 }),
256 )
257 .await;
258
259 assert_eq!(
260 client.sample(Urgency::Low).await.unwrap().unwrap(),
261 TimeSample {
262 reference: Some(zx::BootInstant::from_nanos(12)),
263 utc: Some(34),
264 standard_deviation: None,
265 ..Default::default()
266 }
267 );
268 assert_eq!(
269 client.sample(Urgency::High).await.unwrap().unwrap(),
270 TimeSample {
271 reference: Some(zx::BootInstant::from_nanos(56)),
272 utc: Some(78),
273 standard_deviation: None,
274 ..Default::default()
275 }
276 );
277 }
278
279 #[fuchsia::test(allow_stalls = false)]
280 async fn test_property_updates_sent_to_update_algorithm() {
281 let (harness, client) = TestHarness::new();
282
283 client.update_device_properties(&Properties::default()).unwrap();
284
285 let _ = fasync::TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
287
288 assert_eq!(harness.get_device_properties().await, vec![Properties::default()]);
289 }
290}