use anyhow::Result;
use async_trait::async_trait;
use fidl::prelude::*;
use fidl_fuchsia_time_external::{
Properties, PushSourceRequest, PushSourceRequestStream, PushSourceWatchSampleResponder,
PushSourceWatchStatusResponder, Status, TimeSample,
};
use futures::channel::mpsc::{channel, Receiver, Sender};
use futures::lock::Mutex;
use futures::{StreamExt, TryStreamExt};
use log::warn;
use std::sync::{Arc, Weak};
use watch_handler::{Sender as WatchSender, WatchHandler};
#[derive(Clone, PartialEq, Debug)]
pub enum Update {
Sample(Arc<TimeSample>),
Status(Status),
}
impl From<Status> for Update {
fn from(status: Status) -> Self {
Update::Status(status)
}
}
impl From<TimeSample> for Update {
fn from(sample: TimeSample) -> Self {
Update::Sample(Arc::new(sample))
}
}
impl Update {
pub fn is_status(&self) -> bool {
match self {
Update::Sample(_) => false,
Update::Status(_) => true,
}
}
}
#[async_trait]
pub trait UpdateAlgorithm {
async fn update_device_properties(&self, properties: Properties);
async fn generate_updates(&self, sink: Sender<Update>) -> Result<()>;
}
pub struct PushSource<UA: UpdateAlgorithm> {
internal: Mutex<PushSourceInternal>,
update_algorithm: UA,
}
impl<UA: UpdateAlgorithm> PushSource<UA> {
pub fn new(update_algorithm: UA, initial_status: Status) -> Result<Self> {
Ok(Self { internal: Mutex::new(PushSourceInternal::new(initial_status)), update_algorithm })
}
pub async fn poll_updates(&self) -> Result<()> {
let (sender, mut receiver) = channel(0);
let updater_fut = self.update_algorithm.generate_updates(sender);
let consumer_fut = async move {
while let Some(update) = receiver.next().await {
self.internal.lock().await.push_update(update).await;
}
};
let (update_res, _) = futures::future::join(updater_fut, consumer_fut).await;
update_res
}
pub async fn handle_requests_for_stream(
&self,
mut request_stream: PushSourceRequestStream,
) -> Result<()> {
log::debug!("handle_requests_for_stream: ");
let client_context = self.internal.lock().await.register_client();
while let Some(request) = request_stream.try_next().await? {
client_context.lock().await.handle_request(request, &self.update_algorithm).await?;
}
Ok(())
}
}
struct PushSourceInternal {
clients: Vec<Weak<Mutex<PushSourceClientHandler>>>,
latest_sample: Option<Arc<TimeSample>>,
latest_status: Status,
}
impl PushSourceInternal {
pub fn new(initial_status: Status) -> Self {
PushSourceInternal { clients: vec![], latest_sample: None, latest_status: initial_status }
}
pub fn register_client(&mut self) -> Arc<Mutex<PushSourceClientHandler>> {
log::debug!("PushSourceInternal: register_client");
let client = Arc::new(Mutex::new(PushSourceClientHandler {
sample_watcher: WatchHandler::create(self.latest_sample.clone()),
status_watcher: WatchHandler::create(Some(self.latest_status)),
}));
self.clients.push(Arc::downgrade(&client));
client
}
pub async fn push_update(&mut self, update: Update) {
log::debug!("push_update: received update: {:?}", &update);
match &update {
Update::Sample(sample) => self.latest_sample = Some(Arc::clone(&sample)),
Update::Status(status) => self.latest_status = *status,
}
let mut client_arcs = vec![];
self.clients.retain(|client_weak| match client_weak.upgrade() {
Some(client_arc) => {
client_arcs.push(client_arc);
true
}
None => false,
});
log::debug!("push_update: clients to update: {}", client_arcs.len());
for client in client_arcs {
client.lock().await.handle_update(update.clone());
}
}
}
struct PushSourceClientHandler {
sample_watcher: WatchHandler<Arc<TimeSample>, WatchSampleResponder>,
status_watcher: WatchHandler<Status, WatchStatusResponder>,
}
impl PushSourceClientHandler {
async fn handle_request(
&mut self,
request: PushSourceRequest,
update_algorithm: &impl UpdateAlgorithm,
) -> Result<()> {
match request {
PushSourceRequest::WatchSample { responder } => {
self.sample_watcher.watch(WatchSampleResponder(responder)).map_err(|e| {
e.responder.0.control_handle().shutdown_with_epitaph(zx::Status::BAD_STATE);
e
})?;
}
PushSourceRequest::WatchStatus { responder } => {
self.status_watcher.watch(WatchStatusResponder(responder)).map_err(|e| {
e.responder.0.control_handle().shutdown_with_epitaph(zx::Status::BAD_STATE);
e
})?;
}
PushSourceRequest::UpdateDeviceProperties { properties, .. } => {
update_algorithm.update_device_properties(properties).await;
}
}
Ok(())
}
fn handle_update(&mut self, update: Update) {
match update {
Update::Sample(sample) => self.sample_watcher.set_value(sample),
Update::Status(status) => self.status_watcher.set_value(status),
}
}
}
struct WatchSampleResponder(PushSourceWatchSampleResponder);
struct WatchStatusResponder(PushSourceWatchStatusResponder);
impl WatchSender<Arc<TimeSample>> for WatchSampleResponder {
fn send_response(self, data: Arc<TimeSample>) {
let time_sample = TimeSample {
utc: data.utc.clone(),
reference: data.reference.clone(),
standard_deviation: data.standard_deviation.clone(),
..Default::default()
};
self.0.send(&time_sample).unwrap_or_else(|e| warn!("Error sending response: {:?}", e));
}
}
impl WatchSender<Status> for WatchStatusResponder {
fn send_response(self, data: Status) {
self.0.send(data).unwrap_or_else(|e| warn!("Error sending response: {:?}", e));
}
}
pub struct TestUpdateAlgorithm {
receiver: Mutex<Option<Receiver<Update>>>,
device_property_updates: Mutex<Vec<Properties>>,
}
impl TestUpdateAlgorithm {
pub fn new() -> (Self, Sender<Update>) {
let (sender, receiver) = channel(0);
(
TestUpdateAlgorithm {
receiver: Mutex::new(Some(receiver)),
device_property_updates: Mutex::new(vec![]),
},
sender,
)
}
}
#[async_trait]
impl UpdateAlgorithm for TestUpdateAlgorithm {
async fn update_device_properties(&self, properties: Properties) {
self.device_property_updates.lock().await.push(properties);
}
async fn generate_updates(&self, sink: Sender<Update>) -> Result<()> {
let receiver = self.receiver.lock().await.take().unwrap();
receiver.map(Ok).forward(sink).await?;
Ok(())
}
}
#[cfg(test)]
mod test {
use super::*;
use assert_matches::assert_matches;
use fidl::endpoints::create_proxy_and_stream;
use fidl::Error as FidlError;
use fidl_fuchsia_time_external::{PushSourceMarker, PushSourceProxy};
use fuchsia_async as fasync;
use futures::{FutureExt, SinkExt};
struct TestHarness {
test_source: Arc<PushSource<TestUpdateAlgorithm>>,
tasks: Vec<fasync::Task<Result<()>>>,
update_sender: Sender<Update>,
}
impl TestHarness {
fn new() -> Self {
let (update_algorithm, update_sender) = TestUpdateAlgorithm::new();
let test_source = Arc::new(PushSource::new(update_algorithm, Status::Ok).unwrap());
let source_clone = Arc::clone(&test_source);
let update_task = fasync::Task::spawn(async move { source_clone.poll_updates().await });
TestHarness { test_source, tasks: vec![update_task], update_sender }
}
fn new_proxy(&mut self) -> PushSourceProxy {
let source_clone = Arc::clone(&self.test_source);
let (proxy, stream) = create_proxy_and_stream::<PushSourceMarker>();
let server_task = fasync::Task::spawn(async move {
source_clone.handle_requests_for_stream(stream).await
});
self.tasks.push(server_task);
proxy
}
async fn push_update(&mut self, update: Update) {
self.update_sender.send(update).await.unwrap();
}
async fn assert_device_properties(&self, properties: &[Properties]) {
assert_eq!(
self.test_source.update_algorithm.device_property_updates.lock().await.as_slice(),
properties
);
}
}
#[fuchsia::test(allow_stalls = false)]
async fn test_watch_sample_closes_on_multiple_watches() {
let mut harness = TestHarness::new();
let proxy = harness.new_proxy();
let first_watch_fut = proxy.watch_sample();
assert_matches!(
proxy.watch_sample().await.unwrap_err(),
FidlError::ClientChannelClosed { status: zx::Status::BAD_STATE, .. }
);
assert_matches!(
first_watch_fut.await.unwrap_err(),
FidlError::ClientChannelClosed { status: zx::Status::BAD_STATE, .. }
);
}
#[fuchsia::test(allow_stalls = false)]
async fn test_watch_status_closes_on_multiple_watches() {
let mut harness = TestHarness::new();
let proxy = harness.new_proxy();
assert_eq!(proxy.watch_status().await.unwrap(), Status::Ok);
let second_watch_fut = proxy.watch_status();
assert_matches!(
proxy.watch_status().await.unwrap_err(),
FidlError::ClientChannelClosed { status: zx::Status::BAD_STATE, .. }
);
assert_matches!(
second_watch_fut.await.unwrap_err(),
FidlError::ClientChannelClosed { status: zx::Status::BAD_STATE, .. }
);
}
#[fuchsia::test(allow_stalls = false)]
async fn test_watch_sample() {
let mut harness = TestHarness::new();
let proxy = harness.new_proxy();
let sample_fut = proxy.watch_sample();
harness
.push_update(Update::Sample(Arc::new(TimeSample {
reference: Some(zx::BootInstant::from_nanos(23)),
utc: Some(24),
standard_deviation: None,
..Default::default()
})))
.await;
assert_eq!(
sample_fut.await.unwrap(),
TimeSample {
reference: Some(zx::BootInstant::from_nanos(23)),
utc: Some(24),
standard_deviation: None,
..Default::default()
}
);
let sample_fut = proxy.watch_sample();
harness
.push_update(Update::Sample(Arc::new(TimeSample {
reference: Some(zx::BootInstant::from_nanos(25)),
utc: Some(26),
standard_deviation: None,
..Default::default()
})))
.await;
assert_eq!(
sample_fut.await.unwrap(),
TimeSample {
reference: Some(zx::BootInstant::from_nanos(25)),
utc: Some(26),
standard_deviation: None,
..Default::default()
}
);
assert!(proxy.watch_sample().now_or_never().is_none());
}
#[fuchsia::test(allow_stalls = false)]
async fn test_watch_sample_sent_to_all_clients() {
let mut harness = TestHarness::new();
let proxy = harness.new_proxy();
let proxy_2 = harness.new_proxy();
let sample_fut = proxy.watch_sample();
let sample_fut_2 = proxy_2.watch_sample();
harness
.push_update(Update::Sample(Arc::new(TimeSample {
reference: Some(zx::BootInstant::from_nanos(23)),
utc: Some(24),
standard_deviation: None,
..Default::default()
})))
.await;
assert_eq!(
sample_fut.await.unwrap(),
TimeSample {
reference: Some(zx::BootInstant::from_nanos(23)),
utc: Some(24),
standard_deviation: None,
..Default::default()
}
);
assert_eq!(
sample_fut_2.await.unwrap(),
TimeSample {
reference: Some(zx::BootInstant::from_nanos(23)),
utc: Some(24),
standard_deviation: None,
..Default::default()
}
);
let sample_fut = proxy.watch_sample();
let sample_fut_2 = proxy_2.watch_sample();
harness
.push_update(Update::Sample(Arc::new(TimeSample {
reference: Some(zx::BootInstant::from_nanos(25)),
utc: Some(26),
standard_deviation: None,
..Default::default()
})))
.await;
assert_eq!(
sample_fut.await.unwrap(),
TimeSample {
reference: Some(zx::BootInstant::from_nanos(25)),
utc: Some(26),
standard_deviation: None,
..Default::default()
}
);
assert_eq!(
sample_fut_2.await.unwrap(),
TimeSample {
reference: Some(zx::BootInstant::from_nanos(25)),
utc: Some(26),
standard_deviation: None,
..Default::default()
}
);
let proxy_3 = harness.new_proxy();
assert_eq!(
proxy_3.watch_sample().await.unwrap(),
TimeSample {
reference: Some(zx::BootInstant::from_nanos(25)),
utc: Some(26),
standard_deviation: None,
..Default::default()
}
);
}
#[fuchsia::test(allow_stalls = false)]
async fn test_watch_status() {
let mut harness = TestHarness::new();
let proxy = harness.new_proxy();
assert_eq!(proxy.watch_status().await.unwrap(), Status::Ok);
let status_fut = proxy.watch_status();
harness.push_update(Update::Status(Status::Hardware)).await;
assert_eq!(status_fut.await.unwrap(), Status::Hardware);
assert!(proxy.watch_status().now_or_never().is_none());
}
#[fuchsia::test(allow_stalls = false)]
async fn test_watch_status_sent_to_all_clients() {
let mut harness = TestHarness::new();
let proxy = harness.new_proxy();
let proxy_2 = harness.new_proxy();
assert_eq!(proxy.watch_status().await.unwrap(), Status::Ok);
assert_eq!(proxy_2.watch_status().await.unwrap(), Status::Ok);
let status_fut = proxy.watch_status();
let status_fut_2 = proxy_2.watch_status();
harness.push_update(Update::Status(Status::Hardware)).await;
assert_eq!(status_fut.await.unwrap(), Status::Hardware);
assert_eq!(status_fut_2.await.unwrap(), Status::Hardware);
let proxy_3 = harness.new_proxy();
assert_eq!(proxy_3.watch_status().await.unwrap(), Status::Hardware);
}
#[fuchsia::test(allow_stalls = false)]
async fn test_property_updates_sent_to_update_algorithm() {
let mut harness = TestHarness::new();
let proxy = harness.new_proxy();
let proxy_2 = harness.new_proxy();
proxy.update_device_properties(&Properties::default()).unwrap();
proxy_2.update_device_properties(&Properties::default()).unwrap();
let _ = fasync::TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
harness.assert_device_properties(&vec![Properties::default(), Properties::default()]).await;
}
}