1use anyhow::Error;
9use async_trait::async_trait;
10use fidl_fuchsia_time_external::{
11 self as ftexternal, Properties, PullSourceRequest, PullSourceRequestStream, TimeSample, Urgency,
12};
13
14use futures::TryStreamExt;
15use futures::lock::Mutex;
16use log::warn;
17use std::sync::Arc;
18
19#[async_trait]
21pub trait UpdateAlgorithm {
22 async fn update_device_properties(&self, properties: Properties);
24
25 async fn sample(&self, urgency: Urgency) -> Result<TimeSample, SampleError>;
27
28 async fn next_possible_sample_time(&self) -> zx::BootInstant;
35}
36
37#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
39pub enum SampleError {
40 Unknown,
43 Internal,
46 Resource,
49 Network,
51 Hardware,
53 Protocol,
56 ProtocolUnrecoverable,
59 RateLimited,
62}
63
64impl From<SampleError> for ftexternal::Error {
65 fn from(e: SampleError) -> Self {
66 match e {
67 SampleError::Unknown => ftexternal::Error::Unknown,
68 SampleError::Internal => ftexternal::Error::Internal,
69 SampleError::Resource => ftexternal::Error::Resource,
70 SampleError::Network => ftexternal::Error::Network,
71 SampleError::Hardware => ftexternal::Error::Hardware,
72 SampleError::Protocol => ftexternal::Error::Protocol,
73 SampleError::ProtocolUnrecoverable => ftexternal::Error::ProtocolUnrecoverable,
74 SampleError::RateLimited => ftexternal::Error::RateLimited,
75 }
76 }
77}
78
79pub struct PullSource<UA: UpdateAlgorithm> {
84 update_algorithm: UA,
86}
87
88impl<UA: UpdateAlgorithm> PullSource<UA> {
89 pub fn new(update_algorithm: UA) -> Result<Self, Error> {
92 Ok(Self { update_algorithm })
93 }
94
95 pub async fn handle_requests_for_stream(
97 &self,
98 mut request_stream: PullSourceRequestStream,
99 inspect_controller: Arc<Mutex<Option<inspect_runtime::PublishedInspectController>>>,
100 ) -> Result<(), Error> {
101 while let Some(request) = request_stream.try_next().await? {
102 match request {
103 PullSourceRequest::Sample { urgency, responder } => {
104 let sample = self.update_algorithm.sample(urgency).await;
105 responder.send(sample.as_ref().map_err(|e| (*e).into()))?;
106 }
107 PullSourceRequest::NextPossibleSampleTime { responder, .. } => {
108 responder.send(
109 self.update_algorithm.next_possible_sample_time().await.into_nanos(),
110 )?;
111 }
112 PullSourceRequest::UpdateDeviceProperties { properties, .. } => {
113 self.update_algorithm.update_device_properties(properties).await;
114 }
115 PullSourceRequest::Shutdown { responder } => {
116 let mut controller_lock = inspect_controller.lock().await;
117 if let Some(controller) = controller_lock.take() {
118 let options = inspect_runtime::EscrowOptions::default();
119 match controller.escrow_frozen(options).await {
120 Ok(token) => {
121 if let Err(e) = responder.send(Ok(token)) {
122 warn!("Failed to send shutdown response: {:?}", e);
123 }
124 }
125 Err(e) => {
126 warn!("Failed to escrow frozen Inspect controller: {:?}", e);
127 if let Err(e) = responder
128 .send(Err(ftexternal::EscrowError::EscrowInspectFailed))
129 {
130 warn!("Failed to send shutdown error response: {:?}", e);
131 }
132 }
133 }
134 } else {
135 warn!("Inspect controller already taken or None during shutdown");
136 if let Err(e) =
137 responder.send(Err(ftexternal::EscrowError::EscrowNoInspectController))
138 {
139 warn!("Failed to send shutdown error response: {:?}", e);
140 }
141 }
142 return Ok(());
143 }
144 }
145 }
146 Ok(())
147 }
148}
149
150pub struct TestUpdateAlgorithm {
154 device_property_updates: Mutex<Vec<Properties>>,
156
157 samples: Mutex<Vec<(Urgency, Result<TimeSample, SampleError>)>>,
159}
160
161impl TestUpdateAlgorithm {
162 pub fn new() -> Self {
164 let device_property_updates = Mutex::new(Vec::new());
165 let samples = Mutex::new(Vec::new());
166 TestUpdateAlgorithm { device_property_updates, samples }
167 }
168}
169
170#[async_trait]
171impl UpdateAlgorithm for TestUpdateAlgorithm {
172 async fn update_device_properties(&self, properties: Properties) {
173 self.device_property_updates.lock().await.push(properties);
174 }
175
176 async fn sample(&self, urgency: Urgency) -> Result<TimeSample, SampleError> {
177 let mut samples = self.samples.lock().await;
178 if samples.is_empty() {
179 warn!("No test samples found.");
180 return Err(SampleError::Internal);
181 }
182 let (expected_urgency, sample) = samples.remove(0);
183 if urgency == expected_urgency {
184 sample
185 } else {
186 warn!("Wrong urgency provided: expected {:?}, got {:?}.", expected_urgency, urgency);
187 Err(SampleError::Internal)
188 }
189 }
190
191 async fn next_possible_sample_time(&self) -> zx::BootInstant {
192 zx::BootInstant::get()
193 }
194}
195
196#[cfg(test)]
197mod test {
198 use super::*;
199 use fidl::endpoints::create_proxy_and_stream;
200 use fidl_fuchsia_time_external::{PullSourceMarker, PullSourceProxy};
201 use fuchsia_async as fasync;
202 use std::sync::Arc;
203
204 struct TestHarness {
205 test_source: Arc<PullSource<TestUpdateAlgorithm>>,
207
208 _server: fasync::Task<Result<(), Error>>,
210 }
211
212 impl TestHarness {
213 fn new() -> (Self, PullSourceProxy) {
214 let update_algorithm = TestUpdateAlgorithm::new();
215 let test_source = Arc::new(PullSource::new(update_algorithm).unwrap());
216 let (proxy, stream) = create_proxy_and_stream::<PullSourceMarker>();
217 let server = fasync::Task::spawn({
218 let test_source = Arc::clone(&test_source);
219 async move {
220 test_source.handle_requests_for_stream(stream, Arc::new(Mutex::new(None))).await
221 }
222 });
223 (TestHarness { test_source, _server: server }, proxy)
224 }
225
226 async fn add_sample(&mut self, urgency: Urgency, sample: Result<TimeSample, SampleError>) {
227 self.test_source.update_algorithm.samples.lock().await.push((urgency, sample));
228 }
229
230 async fn get_device_properties(&self) -> Vec<Properties> {
231 self.test_source.update_algorithm.device_property_updates.lock().await.clone()
232 }
233 }
234
235 #[fuchsia::test]
236 async fn test_empty_harness() {
237 let (_harness, client) = TestHarness::new();
238 let result = client.sample(Urgency::Low).await;
240 assert!(result.is_ok());
241 assert_eq!(result.unwrap(), Err(ftexternal::Error::Internal),);
242 }
243
244 #[fuchsia::test]
245 async fn test_harness_expects_sample_urgency() {
246 let (mut harness, client) = TestHarness::new();
247
248 harness
249 .add_sample(
250 Urgency::Low,
251 Ok(TimeSample {
252 reference: Some(zx::BootInstant::from_nanos(12)),
253 utc: Some(34),
254 standard_deviation: None,
255 ..Default::default()
256 }),
257 )
258 .await;
259 let result = client.sample(Urgency::High).await;
261 assert!(result.is_ok());
262 assert_eq!(result.unwrap(), Err(ftexternal::Error::Internal),);
263 }
264
265 #[fuchsia::test]
266 async fn test_multiple_samples() {
267 let (mut harness, client) = TestHarness::new();
268
269 harness
270 .add_sample(
271 Urgency::Low,
272 Ok(TimeSample {
273 reference: Some(zx::BootInstant::from_nanos(12)),
274 utc: Some(34),
275 standard_deviation: None,
276 ..Default::default()
277 }),
278 )
279 .await;
280 harness
281 .add_sample(
282 Urgency::High,
283 Ok(TimeSample {
284 reference: Some(zx::BootInstant::from_nanos(56)),
285 utc: Some(78),
286 standard_deviation: None,
287 ..Default::default()
288 }),
289 )
290 .await;
291
292 assert_eq!(
293 client.sample(Urgency::Low).await.unwrap().unwrap(),
294 TimeSample {
295 reference: Some(zx::BootInstant::from_nanos(12)),
296 utc: Some(34),
297 standard_deviation: None,
298 ..Default::default()
299 }
300 );
301 assert_eq!(
302 client.sample(Urgency::High).await.unwrap().unwrap(),
303 TimeSample {
304 reference: Some(zx::BootInstant::from_nanos(56)),
305 utc: Some(78),
306 standard_deviation: None,
307 ..Default::default()
308 }
309 );
310 }
311
312 #[fuchsia::test(allow_stalls = false)]
313 async fn test_property_updates_sent_to_update_algorithm() {
314 let (harness, client) = TestHarness::new();
315
316 client.update_device_properties(&Properties::default()).unwrap();
317
318 let _ = fasync::TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
320
321 assert_eq!(harness.get_device_properties().await, vec![Properties::default()]);
322 }
323}