1use anyhow::Error;
9use async_trait::async_trait;
10use fidl_fuchsia_time_external::{
11 self as ftexternal, Properties, PullSourceRequest, PullSourceRequestStream, TimeSample, Urgency,
12};
13
14use futures::TryStreamExt;
15use futures::lock::Mutex;
16use log::warn;
17
18#[async_trait]
20pub trait UpdateAlgorithm {
21 async fn update_device_properties(&self, properties: Properties);
23
24 async fn sample(&self, urgency: Urgency) -> Result<TimeSample, SampleError>;
26
27 async fn next_possible_sample_time(&self) -> zx::BootInstant;
34}
35
36#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
38pub enum SampleError {
39 Unknown,
42 Internal,
45 Resource,
48 Network,
50 Hardware,
52 Protocol,
55 ProtocolUnrecoverable,
58 RateLimited,
61}
62
63impl From<SampleError> for ftexternal::Error {
64 fn from(e: SampleError) -> Self {
65 match e {
66 SampleError::Unknown => ftexternal::Error::Unknown,
67 SampleError::Internal => ftexternal::Error::Internal,
68 SampleError::Resource => ftexternal::Error::Resource,
69 SampleError::Network => ftexternal::Error::Network,
70 SampleError::Hardware => ftexternal::Error::Hardware,
71 SampleError::Protocol => ftexternal::Error::Protocol,
72 SampleError::ProtocolUnrecoverable => ftexternal::Error::ProtocolUnrecoverable,
73 SampleError::RateLimited => ftexternal::Error::RateLimited,
74 }
75 }
76}
77
78pub struct PullSource<UA: UpdateAlgorithm> {
83 update_algorithm: UA,
85}
86
87impl<UA: UpdateAlgorithm> PullSource<UA> {
88 pub fn new(update_algorithm: UA) -> Result<Self, Error> {
91 Ok(Self { update_algorithm })
92 }
93
94 pub async fn handle_requests_for_stream(
96 &self,
97 mut request_stream: PullSourceRequestStream,
98 ) -> Result<(), Error> {
99 while let Some(request) = request_stream.try_next().await? {
100 match request {
101 PullSourceRequest::Sample { urgency, responder } => {
102 let sample = self.update_algorithm.sample(urgency).await;
103 responder.send(sample.as_ref().map_err(|e| (*e).into()))?;
104 }
105 PullSourceRequest::NextPossibleSampleTime { responder, .. } => {
106 responder.send(
107 self.update_algorithm.next_possible_sample_time().await.into_nanos(),
108 )?;
109 }
110 PullSourceRequest::UpdateDeviceProperties { properties, .. } => {
111 self.update_algorithm.update_device_properties(properties).await;
112 }
113 PullSourceRequest::Shutdown { responder: _, .. } => {
114 warn!("receive shutdown request");
116 }
117 }
118 }
119 Ok(())
120 }
121}
122
123pub struct TestUpdateAlgorithm {
127 device_property_updates: Mutex<Vec<Properties>>,
129
130 samples: Mutex<Vec<(Urgency, Result<TimeSample, SampleError>)>>,
132}
133
134impl TestUpdateAlgorithm {
135 pub fn new() -> Self {
137 let device_property_updates = Mutex::new(Vec::new());
138 let samples = Mutex::new(Vec::new());
139 TestUpdateAlgorithm { device_property_updates, samples }
140 }
141}
142
143#[async_trait]
144impl UpdateAlgorithm for TestUpdateAlgorithm {
145 async fn update_device_properties(&self, properties: Properties) {
146 self.device_property_updates.lock().await.push(properties);
147 }
148
149 async fn sample(&self, urgency: Urgency) -> Result<TimeSample, SampleError> {
150 let mut samples = self.samples.lock().await;
151 if samples.is_empty() {
152 warn!("No test samples found.");
153 return Err(SampleError::Internal);
154 }
155 let (expected_urgency, sample) = samples.remove(0);
156 if urgency == expected_urgency {
157 sample
158 } else {
159 warn!("Wrong urgency provided: expected {:?}, got {:?}.", expected_urgency, urgency);
160 Err(SampleError::Internal)
161 }
162 }
163
164 async fn next_possible_sample_time(&self) -> zx::BootInstant {
165 zx::BootInstant::get()
166 }
167}
168
169#[cfg(test)]
170mod test {
171 use super::*;
172 use fidl::endpoints::create_proxy_and_stream;
173 use fidl_fuchsia_time_external::{PullSourceMarker, PullSourceProxy};
174 use fuchsia_async as fasync;
175 use std::sync::Arc;
176
177 struct TestHarness {
178 test_source: Arc<PullSource<TestUpdateAlgorithm>>,
180
181 _server: fasync::Task<Result<(), Error>>,
183 }
184
185 impl TestHarness {
186 fn new() -> (Self, PullSourceProxy) {
187 let update_algorithm = TestUpdateAlgorithm::new();
188 let test_source = Arc::new(PullSource::new(update_algorithm).unwrap());
189 let (proxy, stream) = create_proxy_and_stream::<PullSourceMarker>();
190 let server = fasync::Task::spawn({
191 let test_source = Arc::clone(&test_source);
192 async move { test_source.handle_requests_for_stream(stream).await }
193 });
194 (TestHarness { test_source, _server: server }, proxy)
195 }
196
197 async fn add_sample(&mut self, urgency: Urgency, sample: Result<TimeSample, SampleError>) {
198 self.test_source.update_algorithm.samples.lock().await.push((urgency, sample));
199 }
200
201 async fn get_device_properties(&self) -> Vec<Properties> {
202 self.test_source.update_algorithm.device_property_updates.lock().await.clone()
203 }
204 }
205
206 #[fuchsia::test]
207 async fn test_empty_harness() {
208 let (_harness, client) = TestHarness::new();
209 let result = client.sample(Urgency::Low).await;
211 assert!(result.is_ok());
212 assert_eq!(result.unwrap(), Err(ftexternal::Error::Internal),);
213 }
214
215 #[fuchsia::test]
216 async fn test_harness_expects_sample_urgency() {
217 let (mut harness, client) = TestHarness::new();
218
219 harness
220 .add_sample(
221 Urgency::Low,
222 Ok(TimeSample {
223 reference: Some(zx::BootInstant::from_nanos(12)),
224 utc: Some(34),
225 standard_deviation: None,
226 ..Default::default()
227 }),
228 )
229 .await;
230 let result = client.sample(Urgency::High).await;
232 assert!(result.is_ok());
233 assert_eq!(result.unwrap(), Err(ftexternal::Error::Internal),);
234 }
235
236 #[fuchsia::test]
237 async fn test_multiple_samples() {
238 let (mut harness, client) = TestHarness::new();
239
240 harness
241 .add_sample(
242 Urgency::Low,
243 Ok(TimeSample {
244 reference: Some(zx::BootInstant::from_nanos(12)),
245 utc: Some(34),
246 standard_deviation: None,
247 ..Default::default()
248 }),
249 )
250 .await;
251 harness
252 .add_sample(
253 Urgency::High,
254 Ok(TimeSample {
255 reference: Some(zx::BootInstant::from_nanos(56)),
256 utc: Some(78),
257 standard_deviation: None,
258 ..Default::default()
259 }),
260 )
261 .await;
262
263 assert_eq!(
264 client.sample(Urgency::Low).await.unwrap().unwrap(),
265 TimeSample {
266 reference: Some(zx::BootInstant::from_nanos(12)),
267 utc: Some(34),
268 standard_deviation: None,
269 ..Default::default()
270 }
271 );
272 assert_eq!(
273 client.sample(Urgency::High).await.unwrap().unwrap(),
274 TimeSample {
275 reference: Some(zx::BootInstant::from_nanos(56)),
276 utc: Some(78),
277 standard_deviation: None,
278 ..Default::default()
279 }
280 );
281 }
282
283 #[fuchsia::test(allow_stalls = false)]
284 async fn test_property_updates_sent_to_update_algorithm() {
285 let (harness, client) = TestHarness::new();
286
287 client.update_device_properties(&Properties::default()).unwrap();
288
289 let _ = fasync::TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
291
292 assert_eq!(harness.get_device_properties().await, vec![Properties::default()]);
293 }
294}