use anyhow::Error;
use async_trait::async_trait;
use fidl_fuchsia_time_external::{
self as ftexternal, Properties, PullSourceRequest, PullSourceRequestStream, TimeSample, Urgency,
};
use futures::lock::Mutex;
use futures::TryStreamExt;
use log::warn;
#[async_trait]
pub trait UpdateAlgorithm {
async fn update_device_properties(&self, properties: Properties);
async fn sample(&self, urgency: Urgency) -> Result<TimeSample, SampleError>;
async fn next_possible_sample_time(&self) -> zx::BootInstant;
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub enum SampleError {
Unknown,
Internal,
Resource,
Network,
Hardware,
Protocol,
ProtocolUnrecoverable,
RateLimited,
}
impl From<SampleError> for ftexternal::Error {
fn from(e: SampleError) -> Self {
match e {
SampleError::Unknown => ftexternal::Error::Unknown,
SampleError::Internal => ftexternal::Error::Internal,
SampleError::Resource => ftexternal::Error::Resource,
SampleError::Network => ftexternal::Error::Network,
SampleError::Hardware => ftexternal::Error::Hardware,
SampleError::Protocol => ftexternal::Error::Protocol,
SampleError::ProtocolUnrecoverable => ftexternal::Error::ProtocolUnrecoverable,
SampleError::RateLimited => ftexternal::Error::RateLimited,
}
}
}
pub struct PullSource<UA: UpdateAlgorithm> {
update_algorithm: UA,
}
impl<UA: UpdateAlgorithm> PullSource<UA> {
pub fn new(update_algorithm: UA) -> Result<Self, Error> {
Ok(Self { update_algorithm })
}
pub async fn handle_requests_for_stream(
&self,
mut request_stream: PullSourceRequestStream,
) -> Result<(), Error> {
while let Some(request) = request_stream.try_next().await? {
match request {
PullSourceRequest::Sample { urgency, responder } => {
let sample = self.update_algorithm.sample(urgency).await;
responder.send(sample.as_ref().map_err(|e| (*e).into()))?;
}
PullSourceRequest::NextPossibleSampleTime { responder, .. } => {
responder.send(
self.update_algorithm.next_possible_sample_time().await.into_nanos(),
)?;
}
PullSourceRequest::UpdateDeviceProperties { properties, .. } => {
self.update_algorithm.update_device_properties(properties).await;
}
}
}
Ok(())
}
}
pub struct TestUpdateAlgorithm {
device_property_updates: Mutex<Vec<Properties>>,
samples: Mutex<Vec<(Urgency, Result<TimeSample, SampleError>)>>,
}
impl TestUpdateAlgorithm {
pub fn new() -> Self {
let device_property_updates = Mutex::new(Vec::new());
let samples = Mutex::new(Vec::new());
TestUpdateAlgorithm { device_property_updates, samples }
}
}
#[async_trait]
impl UpdateAlgorithm for TestUpdateAlgorithm {
async fn update_device_properties(&self, properties: Properties) {
self.device_property_updates.lock().await.push(properties);
}
async fn sample(&self, urgency: Urgency) -> Result<TimeSample, SampleError> {
let mut samples = self.samples.lock().await;
if samples.is_empty() {
warn!("No test samples found.");
return Err(SampleError::Internal);
}
let (expected_urgency, sample) = samples.remove(0);
if urgency == expected_urgency {
sample
} else {
warn!("Wrong urgency provided: expected {:?}, got {:?}.", expected_urgency, urgency);
Err(SampleError::Internal)
}
}
async fn next_possible_sample_time(&self) -> zx::BootInstant {
zx::BootInstant::get()
}
}
#[cfg(test)]
mod test {
use super::*;
use fidl::endpoints::create_proxy_and_stream;
use fidl_fuchsia_time_external::{PullSourceMarker, PullSourceProxy};
use fuchsia_async as fasync;
use std::sync::Arc;
struct TestHarness {
test_source: Arc<PullSource<TestUpdateAlgorithm>>,
_server: fasync::Task<Result<(), Error>>,
}
impl TestHarness {
fn new() -> (Self, PullSourceProxy) {
let update_algorithm = TestUpdateAlgorithm::new();
let test_source = Arc::new(PullSource::new(update_algorithm).unwrap());
let (proxy, stream) = create_proxy_and_stream::<PullSourceMarker>();
let server = fasync::Task::spawn({
let test_source = Arc::clone(&test_source);
async move { test_source.handle_requests_for_stream(stream).await }
});
(TestHarness { test_source, _server: server }, proxy)
}
async fn add_sample(&mut self, urgency: Urgency, sample: Result<TimeSample, SampleError>) {
self.test_source.update_algorithm.samples.lock().await.push((urgency, sample));
}
async fn get_device_properties(&self) -> Vec<Properties> {
self.test_source.update_algorithm.device_property_updates.lock().await.clone()
}
}
#[fuchsia::test]
async fn test_empty_harness() {
let (_harness, client) = TestHarness::new();
let result = client.sample(Urgency::Low).await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), Err(ftexternal::Error::Internal),);
}
#[fuchsia::test]
async fn test_harness_expects_sample_urgency() {
let (mut harness, client) = TestHarness::new();
harness
.add_sample(
Urgency::Low,
Ok(TimeSample {
reference: Some(zx::BootInstant::from_nanos(12)),
utc: Some(34),
standard_deviation: None,
..Default::default()
}),
)
.await;
let result = client.sample(Urgency::High).await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), Err(ftexternal::Error::Internal),);
}
#[fuchsia::test]
async fn test_multiple_samples() {
let (mut harness, client) = TestHarness::new();
harness
.add_sample(
Urgency::Low,
Ok(TimeSample {
reference: Some(zx::BootInstant::from_nanos(12)),
utc: Some(34),
standard_deviation: None,
..Default::default()
}),
)
.await;
harness
.add_sample(
Urgency::High,
Ok(TimeSample {
reference: Some(zx::BootInstant::from_nanos(56)),
utc: Some(78),
standard_deviation: None,
..Default::default()
}),
)
.await;
assert_eq!(
client.sample(Urgency::Low).await.unwrap().unwrap(),
TimeSample {
reference: Some(zx::BootInstant::from_nanos(12)),
utc: Some(34),
standard_deviation: None,
..Default::default()
}
);
assert_eq!(
client.sample(Urgency::High).await.unwrap().unwrap(),
TimeSample {
reference: Some(zx::BootInstant::from_nanos(56)),
utc: Some(78),
standard_deviation: None,
..Default::default()
}
);
}
#[fuchsia::test(allow_stalls = false)]
async fn test_property_updates_sent_to_update_algorithm() {
let (harness, client) = TestHarness::new();
client.update_device_properties(&Properties::default()).unwrap();
let _ = fasync::TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
assert_eq!(harness.get_device_properties().await, vec![Properties::default()]);
}
}