use crate::base::{HasSettingType, SettingInfo, SettingType};
use crate::handler::base::{Context, ControllerGenerateResult, Request};
use crate::message::base::Audience;
use crate::service::message::{MessageClient, Messenger, Signature};
use crate::service_context::ServiceContext;
use crate::storage::StorageInfo;
use crate::{payload_convert, trace, trace_guard};
use async_trait::async_trait;
use core::convert::TryFrom;
use fuchsia_async as fasync;
use futures::future::LocalBoxFuture;
use futures::lock::Mutex;
use settings_storage::storage_factory::StorageFactory as StorageFactoryTrait;
use std::borrow::Cow;
use std::marker::PhantomData;
use std::rc::Rc;
use thiserror::Error;
pub type ExitResult = Result<(), ControllerError>;
pub type SettingHandlerResult = Result<Option<SettingInfo>, ControllerError>;
pub type ControllerStateResult = Result<(), ControllerError>;
#[derive(Clone, Debug, PartialEq)]
pub enum Payload {
Command(Command),
Event(Event),
Result(SettingHandlerResult),
}
payload_convert!(Controller, Payload);
#[derive(Debug, Clone, PartialEq)]
pub enum Command {
HandleRequest(Request),
ChangeState(State),
}
impl TryFrom<crate::handler::setting_handler::Payload> for Command {
type Error = &'static str;
fn try_from(value: crate::handler::setting_handler::Payload) -> Result<Self, Self::Error> {
match value {
crate::handler::setting_handler::Payload::Command(command) => Ok(command),
_ => Err("wrong payload type"),
}
}
}
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
pub enum State {
Startup,
Listen,
EndListen,
Teardown,
}
#[derive(Clone, Debug, PartialEq)]
pub enum Event {
Changed(SettingInfo),
Exited(ExitResult),
StateChanged(State),
}
#[allow(dead_code)]
pub(crate) trait StorageFactory: StorageFactoryTrait {}
impl<T: StorageFactoryTrait> StorageFactory for T {}
#[derive(Error, Debug, Clone, PartialEq)]
pub enum ControllerError {
#[error("Unimplemented Request:{1:?} for setting type: {0:?}")]
UnimplementedRequest(SettingType, Request),
#[error("Write failed. setting type: {0:?}")]
WriteFailure(SettingType),
#[error("Initialization failure: cause {0:?}")]
InitFailure(Cow<'static, str>),
#[error("Restoration of setting on controller startup failed: cause {0:?}")]
RestoreFailure(Cow<'static, str>),
#[error(
"Call to an external dependency {1:?} for setting type {0:?} failed. \
Request:{2:?}: Error:{3}"
)]
ExternalFailure(SettingType, Cow<'static, str>, Cow<'static, str>, Cow<'static, str>),
#[error("Invalid input argument for setting type: {0:?} argument:{1:?} value:{2:?}")]
InvalidArgument(SettingType, Cow<'static, str>, Cow<'static, str>),
#[error(
"Incompatible argument values passed: {setting_type:?} argument:{main_arg:?} cannot be \
combined with arguments:[{other_args:?}] with respective values:[{values:?}]. {reason:?}"
)]
IncompatibleArguments {
setting_type: SettingType,
main_arg: Cow<'static, str>,
other_args: Cow<'static, str>,
values: Cow<'static, str>,
reason: Cow<'static, str>,
},
#[error("Unhandled type: {0:?}")]
UnhandledType(SettingType),
#[error("Unexpected error: {0:?}")]
UnexpectedError(Cow<'static, str>),
#[error("Undeliverable Request:{1:?} for setting type: {0:?}")]
UndeliverableError(SettingType, Request),
#[error("Unsupported request for setting type: {0:?}")]
UnsupportedError(SettingType),
#[error("Delivery error for type: {0:?} received by: {1:?}")]
DeliveryError(SettingType, SettingType),
#[error("Irrecoverable error")]
IrrecoverableError,
#[error("Timeout occurred")]
TimeoutError,
#[error("Exit occurred")]
ExitError,
}
pub(crate) type BoxedController = Box<dyn controller::Handle>;
pub(crate) type BoxedControllerResult = Result<BoxedController, ControllerError>;
pub(crate) type GenerateController =
Box<dyn Fn(Rc<ClientImpl>) -> LocalBoxFuture<'static, BoxedControllerResult>>;
pub(crate) mod controller {
use super::*;
#[async_trait(?Send)]
#[cfg(test)]
pub(crate) trait Create: Sized {
async fn create(client: Rc<ClientImpl>) -> Result<Self, ControllerError>;
}
#[async_trait(?Send)]
pub(crate) trait Handle {
async fn handle(&self, request: Request) -> Option<SettingHandlerResult>;
async fn change_state(&mut self, _state: State) -> Option<ControllerStateResult> {
None
}
}
}
pub struct ClientImpl {
notify: Mutex<bool>,
messenger: Messenger,
notifier_signature: Signature,
service_context: Rc<ServiceContext>,
setting_type: SettingType,
}
impl ClientImpl {
fn new(context: &Context) -> Self {
Self {
messenger: context.messenger.clone(),
setting_type: context.setting_type,
notifier_signature: context.notifier_signature,
notify: Mutex::new(false),
service_context: Rc::clone(&context.environment.service_context),
}
}
#[cfg(test)]
pub fn for_test(
notify: Mutex<bool>,
messenger: Messenger,
notifier_signature: Signature,
service_context: Rc<ServiceContext>,
setting_type: SettingType,
) -> Self {
Self { notify, messenger, notifier_signature, service_context, setting_type }
}
async fn process_request(
setting_type: SettingType,
controller: &BoxedController,
request: Request,
) -> SettingHandlerResult {
let result = controller.handle(request.clone()).await;
match result {
Some(response_result) => response_result,
None => Err(ControllerError::UnimplementedRequest(setting_type, request)),
}
}
pub(crate) async fn create(
mut context: Context,
generate_controller: GenerateController,
) -> ControllerGenerateResult {
let client = Rc::new(Self::new(&context));
let mut controller = generate_controller(Rc::clone(&client)).await?;
fasync::Task::local(async move {
let _ = &context;
let id = fuchsia_trace::Id::new();
trace!(
id,
c"setting handler",
"setting_type" => format!("{:?}", client.setting_type).as_str()
);
while let Ok((payload, message_client)) = context.receptor.next_of::<Payload>().await {
let setting_type = client.setting_type;
match Command::try_from(payload).expect("should only receive commands") {
Command::HandleRequest(Request::Rebroadcast) => {
trace!(id, c"handle rebroadcast");
let controller_reply =
Self::process_request(setting_type, &controller, Request::Get).await;
if let Ok(Some(info)) = &controller_reply {
client.notify(Event::Changed(info.clone())).await;
}
reply(message_client, controller_reply);
}
Command::HandleRequest(request) => {
trace!(id, c"handle request");
reply(
message_client,
Self::process_request(setting_type, &controller, request.clone()).await,
);
}
Command::ChangeState(state) => {
trace!(
id,
c"change state",
"state" => format!("{state:?}").as_str()
);
match state {
State::Startup => {
if let Some(Err(e)) = controller.change_state(state).await {
log::error!(
"Failed startup phase for SettingType {:?} {}",
setting_type,
e
);
}
reply(message_client, Ok(None));
continue;
}
State::Listen => {
*client.notify.lock().await = true;
}
State::EndListen => {
*client.notify.lock().await = false;
}
State::Teardown => {
if let Some(Err(e)) = controller.change_state(state).await {
log::error!(
"Failed teardown phase for SettingType {:?} {}",
setting_type,
e
);
}
reply(message_client, Ok(None));
continue;
}
}
let _ = controller.change_state(state).await;
}
}
}
})
.detach();
Ok(())
}
pub(crate) fn get_service_context(&self) -> Rc<ServiceContext> {
Rc::clone(&self.service_context)
}
pub(crate) async fn notify(&self, event: Event) {
let notify = self.notify.lock().await;
if *notify {
let _ = self.messenger.message(
Payload::Event(event).into(),
Audience::Messenger(self.notifier_signature),
);
}
}
#[cfg(test)]
pub(crate) fn emit_state_event(&self, state: State) {
let event = Payload::Event(Event::StateChanged(state));
let _ = self.messenger.message(event.into(), Audience::EventSink);
}
}
pub(crate) trait IntoHandlerResult {
#[allow(clippy::result_large_err)] fn into_handler_result(self) -> SettingHandlerResult;
}
impl IntoHandlerResult for SettingInfo {
fn into_handler_result(self) -> SettingHandlerResult {
Ok(Some(self))
}
}
pub mod persist {
use super::{ClientImpl as BaseProxy, *};
use crate::message::base::MessageEvent;
use crate::{service, storage, trace};
use fuchsia_trace as ftrace;
use futures::StreamExt;
use settings_storage::device_storage::DeviceStorageConvertible;
use settings_storage::UpdateState;
pub trait Storage: DeviceStorageConvertible + Into<SettingInfo> {}
impl<T: DeviceStorageConvertible + Into<SettingInfo>> Storage for T {}
pub(crate) mod controller {
use super::*;
#[async_trait(?Send)]
pub(crate) trait Create: Sized {
async fn create(handler: ClientProxy) -> Result<Self, ControllerError>;
}
pub(crate) trait CreateWith: Sized {
type Data;
fn create_with(handler: ClientProxy, data: Self::Data)
-> Result<Self, ControllerError>;
}
#[async_trait(?Send)]
pub(crate) trait CreateWithAsync: Sized {
type Data;
async fn create_with(
handler: ClientProxy,
data: Self::Data,
) -> Result<Self, ControllerError>;
}
}
pub struct ClientProxy {
base: Rc<BaseProxy>,
setting_type: SettingType,
}
impl Clone for ClientProxy {
fn clone(&self) -> Self {
Self { base: Rc::clone(&self.base), setting_type: self.setting_type }
}
}
impl ClientProxy {
pub(crate) async fn new(base_proxy: Rc<BaseProxy>, setting_type: SettingType) -> Self {
Self { base: base_proxy, setting_type }
}
pub(crate) fn get_service_context(&self) -> Rc<ServiceContext> {
self.base.get_service_context()
}
pub(crate) async fn notify(&self, event: Event) {
self.base.notify(event).await;
}
pub(crate) async fn read_setting_info<T: HasSettingType>(
&self,
id: ftrace::Id,
) -> SettingInfo {
let guard = trace_guard!(
id,
c"read_setting_info send",
"setting_type" => format!("{:?}", T::SETTING_TYPE).as_str()
);
let mut receptor = self.base.messenger.message(
storage::Payload::Request(storage::StorageRequest::Read(
T::SETTING_TYPE.into(),
id,
))
.into(),
Audience::Address(service::Address::Storage),
);
drop(guard);
trace!(
id,
c"read_setting_info receive",
"setting_type" => format!("{:?}", T::SETTING_TYPE).as_str()
);
if let Ok((payload, _)) = receptor.next_of::<storage::Payload>().await {
if let storage::Payload::Response(storage::StorageResponse::Read(
StorageInfo::SettingInfo(setting_info),
)) = payload
{
return setting_info;
} else {
panic!("Incorrect response received from storage: {payload:?}");
}
}
panic!("Did not get a read response");
}
pub(crate) async fn read_setting<T: HasSettingType + TryFrom<SettingInfo>>(
&self,
id: ftrace::Id,
) -> T {
let setting_info = self.read_setting_info::<T>(id).await;
if let Ok(info) = setting_info.clone().try_into() {
info
} else {
panic!(
"Mismatching type during read. Expected {:?}, but got {:?}",
T::SETTING_TYPE,
setting_info
);
}
}
pub(crate) async fn write_setting(
&self,
setting_info: SettingInfo,
id: ftrace::Id,
) -> Result<UpdateState, ControllerError> {
let setting_type = (&setting_info).into();
let fst = format!("{setting_type:?}");
let guard = trace_guard!(
id,
c"write_setting send",
"setting_type" => fst.as_str()
);
let mut receptor = self.base.messenger.message(
storage::Payload::Request(storage::StorageRequest::Write(
setting_info.clone().into(),
id,
))
.into(),
Audience::Address(service::Address::Storage),
);
drop(guard);
trace!(
id,
c"write_setting receive",
"setting_type" => fst.as_str()
);
while let Some(response) = receptor.next().await {
if let MessageEvent::Message(
service::Payload::Storage(storage::Payload::Response(
storage::StorageResponse::Write(result),
)),
_,
) = response
{
if let Ok(UpdateState::Updated) = result {
trace!(
id,
c"write_setting notify",
"setting_type" => fst.as_str()
);
self.notify(Event::Changed(setting_info)).await;
}
return result.map_err(|e| {
log::error!("Failed to write setting: {:?}", e);
ControllerError::WriteFailure(setting_type)
});
}
}
panic!("Did not get a write response");
}
}
pub(crate) trait WriteResult: IntoHandlerResult {
fn notified(&self) -> bool;
}
impl WriteResult for Result<UpdateState, ControllerError> {
fn notified(&self) -> bool {
self.as_ref().map_or(false, |update_state| UpdateState::Updated == *update_state)
}
}
impl IntoHandlerResult for Result<UpdateState, ControllerError> {
fn into_handler_result(self) -> SettingHandlerResult {
self.map(|_| None)
}
}
pub(crate) struct Handler<C> {
_data: PhantomData<C>,
}
impl<C: controller::Create + super::controller::Handle + 'static> Handler<C> {
pub(crate) fn spawn(context: Context) -> LocalBoxFuture<'static, ControllerGenerateResult> {
Box::pin(async move {
let setting_type = context.setting_type;
ClientImpl::create(
context,
Box::new(move |proxy| {
Box::pin(async move {
let proxy = ClientProxy::new(proxy, setting_type).await;
let controller_result = C::create(proxy).await;
match controller_result {
Err(err) => Err(err),
Ok(controller) => Ok(Box::new(controller) as BoxedController),
}
})
}),
)
.await
})
}
}
impl<'a, C, O> Handler<C>
where
C: controller::CreateWith<Data = O> + super::controller::Handle + 'static,
O: Clone + 'static,
{
pub(crate) fn spawn_with(
context: Context,
data: O,
) -> LocalBoxFuture<'static, ControllerGenerateResult> {
Box::pin(async move {
let setting_type = context.setting_type;
ClientImpl::create(
context,
Box::new({
let data = data.clone();
move |proxy| {
let data = data.clone();
Box::pin(async move {
let proxy = ClientProxy::new(proxy, setting_type).await;
let controller_result = C::create_with(proxy, data);
match controller_result {
Err(err) => Err(err),
Ok(controller) => Ok(Box::new(controller) as BoxedController),
}
})
}
}),
)
.await
})
}
}
impl<'a, C, O> Handler<C>
where
C: controller::CreateWithAsync<Data = O> + super::controller::Handle + 'static,
O: Clone + 'static,
{
pub(crate) fn spawn_with_async(
context: Context,
data: O,
) -> LocalBoxFuture<'static, ControllerGenerateResult> {
Box::pin(async move {
let setting_type = context.setting_type;
ClientImpl::create(
context,
Box::new({
let data = data.clone();
move |proxy| {
let data = data.clone();
Box::pin(async move {
let proxy = ClientProxy::new(proxy, setting_type).await;
let controller_result = C::create_with(proxy, data).await;
match controller_result {
Err(err) => Err(err),
Ok(controller) => Ok(Box::new(controller) as BoxedController),
}
})
}
}),
)
.await
})
}
}
}
pub(crate) fn reply(client: MessageClient, result: SettingHandlerResult) {
let _ = client.reply(Payload::Result(result).into());
}