use crate::base::{SettingInfo, SettingType};
use crate::handler::base::{Error, Payload, Request};
use crate::job::data::{self, Data, Key};
use crate::job::work::{Error as WorkError, Load, Sequential};
use crate::job::{Job, Signature};
use crate::message::base::Audience;
use crate::message::receptor::Receptor;
use crate::service::{message, Address};
use crate::trace;
use async_trait::async_trait;
use fuchsia_trace as ftrace;
use futures::channel::oneshot;
use futures::FutureExt;
use std::collections::HashMap;
use std::marker::PhantomData;
const LAST_VALUE_KEY: &str = "LAST_VALUE";
pub(crate) struct ChangeFunction {
#[allow(clippy::type_complexity)]
function: Box<dyn Fn(&SettingInfo, &SettingInfo) -> bool>,
id: u64,
}
impl ChangeFunction {
#[allow(clippy::type_complexity)]
pub fn new(
id: u64,
function: Box<dyn Fn(&SettingInfo, &SettingInfo) -> bool>,
) -> ChangeFunction {
ChangeFunction { function, id }
}
}
pub trait Responder<R: From<SettingInfo>, E: From<Error>> {
fn respond(self, response: Result<R, E>);
}
pub struct Work<R: From<SettingInfo>, E: From<Error>, T: Responder<R, E>> {
setting_type: SettingType,
signature: Signature,
responder: T,
cancelation_rx: oneshot::Receiver<()>,
change_function: Option<ChangeFunction>,
_response_type: PhantomData<R>,
_error_type: PhantomData<E>,
}
impl<R: From<SettingInfo> + 'static, E: From<Error> + 'static, T: Responder<R, E> + 'static>
Work<R, E, T>
{
fn new(setting_type: SettingType, responder: T, cancelation_rx: oneshot::Receiver<()>) -> Self
where
T: 'static,
{
Self {
setting_type,
signature: Signature::new::<T>(),
responder,
cancelation_rx,
change_function: None,
_response_type: PhantomData,
_error_type: PhantomData,
}
}
pub(crate) fn new_job(setting_type: SettingType, responder: T) -> Job
where
T: 'static,
{
let (cancelation_tx, cancelation_rx) = oneshot::channel();
let work = Self::new(setting_type, responder, cancelation_rx);
Job::from((work, cancelation_tx))
}
pub(crate) fn new_job_with_change_function(
setting_type: SettingType,
responder: T,
change_function: ChangeFunction,
) -> Job
where
T: 'static,
{
let (cancelation_tx, cancelation_rx) = oneshot::channel();
let work =
Self::with_change_function(setting_type, responder, cancelation_rx, change_function);
Job::from((work, cancelation_tx))
}
pub(crate) fn with_change_function(
setting_type: SettingType,
responder: T,
cancelation_rx: oneshot::Receiver<()>,
change_function: ChangeFunction,
) -> Self {
Self {
setting_type,
signature: Signature::with::<T>(change_function.id),
responder,
cancelation_rx,
change_function: Some(change_function),
_response_type: PhantomData,
_error_type: PhantomData,
}
}
async fn get_next(
&mut self,
receptor: &mut Receptor,
) -> Result<Result<Payload, anyhow::Error>, WorkError> {
let receptor = receptor.next_of::<Payload>().fuse();
let mut cancelation_rx = &mut self.cancelation_rx;
futures::pin_mut!(receptor);
futures::select! {
result = receptor => Ok(result.map(|(payload, _)| payload)),
_ = cancelation_rx => Err(WorkError::Canceled),
}
}
fn process_response(
&self,
response: Result<Payload, anyhow::Error>,
store: &mut HashMap<Key, Data>,
) -> Option<Result<SettingInfo, Error>> {
match response {
Ok(Payload::Response(Ok(Some(setting_info)))) => {
let key = Key::Identifier(LAST_VALUE_KEY);
let return_val = match (store.get(&key), self.change_function.as_ref()) {
(Some(Data::SettingInfo(info)), Some(change_function))
if !(change_function.function)(info, &setting_info) =>
{
None
}
(Some(Data::SettingInfo(info)), None) if *info == setting_info => None,
_ => Some(Ok(setting_info)),
};
if let Some(Ok(ref info)) = return_val {
let _ = store.insert(key, Data::SettingInfo(info.clone()));
}
return_val
}
Ok(Payload::Response(Err(error))) => Some(Err(error)),
Err(error) => {
tracing::warn!(
"An error occurred while watching {:?}:{:?}",
self.setting_type,
error
);
Some(Err(match error.root_cause().downcast_ref::<Error>() {
Some(error) => error.clone(),
_ => crate::handler::base::Error::CommunicationError,
}))
}
_ => {
panic!("invalid variant {response:?}");
}
}
}
}
#[async_trait(?Send)]
impl<R: From<SettingInfo> + 'static, E: From<Error> + 'static, T: Responder<R, E> + 'static>
Sequential for Work<R, E, T>
{
async fn execute(
mut self: Box<Self>,
messenger: message::Messenger,
store_handle: data::StoreHandle,
id: ftrace::Id,
) -> Result<(), WorkError> {
trace!(id, c"Sequential Work execute");
let mut store = store_handle.lock().await;
let mut listen_receptor = messenger.message(
Payload::Request(Request::Listen).into(),
Audience::Address(Address::Handler(self.setting_type)),
);
let mut get_receptor = messenger.message(
Payload::Request(Request::Get).into(),
Audience::Address(Address::Handler(self.setting_type)),
);
trace!(id, c"Get first response");
let next_payload = self.get_next(&mut get_receptor).await?;
if let Some(response) = self.process_response(next_payload, &mut store) {
self.responder.respond(response.map(R::from).map_err(|err| {
tracing::error!("First watch response has an error: {:?}", err);
E::from(err)
}));
return Ok(());
}
loop {
trace!(id, c"Get looped response");
let next_payload = self.get_next(&mut listen_receptor).await?;
if let Some(response) = self.process_response(next_payload, &mut store) {
self.responder.respond(response.map(R::from).map_err(|err| {
tracing::error!("Updated watch response has an error: {:?}", err);
E::from(err)
}));
return Ok(());
}
}
}
}
impl<R: From<SettingInfo> + 'static, E: From<Error> + 'static, T: Responder<R, E> + 'static>
From<(Work<R, E, T>, oneshot::Sender<()>)> for Job
{
fn from((work, cancelation_tx): (Work<R, E, T>, oneshot::Sender<()>)) -> Job {
let signature = work.signature;
Job::new_with_cancellation(Load::Sequential(Box::new(work), signature), cancelation_tx)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::base::UnknownInfo;
use crate::message::base::MessengerType;
use crate::service::MessageHub;
use assert_matches::assert_matches;
use fuchsia_async as fasync;
use futures::channel::oneshot::Sender;
use futures::lock::Mutex;
use std::rc::Rc;
struct TestResponder {
sender: Sender<Result<SettingInfo, Error>>,
}
impl TestResponder {
pub(crate) fn new(sender: Sender<Result<SettingInfo, Error>>) -> Self {
Self { sender }
}
}
impl Responder<SettingInfo, Error> for TestResponder {
fn respond(self, response: Result<SettingInfo, Error>) {
self.sender.send(response).expect("send should succeed");
}
}
#[fuchsia::test(allow_stalls = false)]
async fn test_watch_basic_functionality() {
let store_handle = Rc::new(Mutex::new(HashMap::new()));
let get_info = SettingInfo::Unknown(UnknownInfo(true));
let listen_info = SettingInfo::Unknown(UnknownInfo(false));
verify_watch(
store_handle.clone(),
listen_info.clone(),
get_info.clone(),
get_info.clone(),
None,
)
.await;
verify_watch(
store_handle.clone(),
listen_info.clone(),
get_info.clone(),
listen_info.clone(),
None,
)
.await;
}
async fn verify_watch(
store_handle: data::StoreHandle,
listen_info: SettingInfo,
get_info: SettingInfo,
expected_info: SettingInfo,
change_function: Option<ChangeFunction>,
) {
let message_hub_delegate = MessageHub::create_hub();
let mut handler_receiver = message_hub_delegate
.create(MessengerType::Addressable(Address::Handler(SettingType::Unknown)))
.await
.expect("handler messenger should be created")
.1;
let (response_tx, response_rx) =
futures::channel::oneshot::channel::<Result<SettingInfo, Error>>();
let (_cancelation_tx, cancelation_rx) = oneshot::channel();
let work = match change_function {
None => Box::new(Work::new(
SettingType::Unknown,
TestResponder::new(response_tx),
cancelation_rx,
)),
Some(change_function) => Box::new(Work::with_change_function(
SettingType::Unknown,
TestResponder::new(response_tx),
cancelation_rx,
change_function,
)),
};
let work_messenger = message_hub_delegate
.create(MessengerType::Unbound)
.await
.expect("messenger should be created")
.0;
let work_messenger_signature = work_messenger.get_signature();
fasync::Task::local(async move {
let _ = work.execute(work_messenger, store_handle, 0.into()).await;
})
.detach();
let (listen_request, listen_client) = handler_receiver
.next_of::<Payload>()
.await
.expect("should successfully receive a listen request");
assert_matches!(listen_request, Payload::Request(Request::Listen));
assert!(listen_client.get_author() == work_messenger_signature);
let (get_request, get_client) = handler_receiver
.next_of::<Payload>()
.await
.expect("should successfully receive a get request");
assert_matches!(get_request, Payload::Request(Request::Get));
assert!(get_client.get_author() == work_messenger_signature);
let _ = get_client.reply(Payload::Response(Ok(Some(get_info))).into());
let _ = listen_client.reply(Payload::Response(Ok(Some(listen_info))).into());
assert_matches!(response_rx.await.expect("should receive successful response"),
Ok(x) if x == expected_info);
}
#[fuchsia::test(allow_stalls = false)]
async fn test_custom_change_function() {
let store_handle = Rc::new(Mutex::new(HashMap::new()));
let unchanged_info = SettingInfo::Unknown(UnknownInfo(true));
let _ = store_handle
.lock()
.await
.insert(Key::Identifier(LAST_VALUE_KEY), Data::SettingInfo(unchanged_info.clone()));
verify_watch(
store_handle,
unchanged_info.clone(),
unchanged_info.clone(),
unchanged_info,
Some(ChangeFunction::new(
0,
Box::new(move |_old: &SettingInfo, _new: &SettingInfo| true),
)),
)
.await;
}
#[fuchsia::test(allow_stalls = false)]
async fn test_error_propagation() {
let message_hub_delegate = MessageHub::create_hub();
let (response_tx, response_rx) = oneshot::channel::<Result<SettingInfo, Error>>();
let (_cancelation_tx, cancelation_rx) = oneshot::channel::<()>();
let work = Box::new(Work::new(
SettingType::Unknown,
TestResponder::new(response_tx),
cancelation_rx,
));
let work_messenger = message_hub_delegate
.create(MessengerType::Unbound)
.await
.expect("messenger should be created")
.0;
fasync::Task::local(async move {
let _ =
work.execute(work_messenger, Rc::new(Mutex::new(HashMap::new())), 0.into()).await;
})
.detach();
assert_matches!(response_rx.await.expect("should receive successful response"),
Err(x) if x == crate::handler::base::Error::CommunicationError);
}
}