settings/handler/
setting_handler_factory_impl.rsuse crate::base::SettingType;
use crate::handler::base::{
Context, Environment, GenerateHandler, SettingHandlerFactory, SettingHandlerFactoryError,
};
use crate::handler::setting_handler::{Command, Payload, State};
use crate::message::base::{Audience, MessageEvent, MessengerType};
use crate::service;
use crate::service::message::{Delegate, Signature};
use crate::service_context::ServiceContext;
use async_trait::async_trait;
use futures::StreamExt;
use std::collections::{HashMap, HashSet};
use std::rc::Rc;
use std::sync::atomic::{AtomicU64, Ordering};
pub(crate) struct SettingHandlerFactoryImpl {
environment: Environment,
generators: HashMap<SettingType, GenerateHandler>,
context_id_counter: Rc<AtomicU64>,
}
#[async_trait(?Send)]
impl SettingHandlerFactory for SettingHandlerFactoryImpl {
async fn generate(
&mut self,
setting_type: SettingType,
delegate: Delegate,
notifier_signature: Signature,
) -> Result<Signature, SettingHandlerFactoryError> {
if !self.environment.settings.contains(&setting_type) {
return Err(SettingHandlerFactoryError::SettingNotFound(setting_type));
}
let (messenger, receptor) = delegate
.create(MessengerType::Unbound)
.await
.map_err(|_| SettingHandlerFactoryError::HandlerMessengerError)?;
let signature = receptor.get_signature();
let generate_function = self
.generators
.get(&setting_type)
.ok_or(SettingHandlerFactoryError::GeneratorNotFound(setting_type))?;
(generate_function)(Context::new(
setting_type,
messenger,
receptor,
notifier_signature,
self.environment.clone(),
self.context_id_counter.fetch_add(1, Ordering::Relaxed),
))
.await
.map_err(|e| {
SettingHandlerFactoryError::HandlerStartupError(
setting_type,
format!("Failed to generate setting handler: {e:?}").into(),
)
})?;
let (controller_messenger, _) = delegate
.create(MessengerType::Unbound)
.await
.map_err(|_| SettingHandlerFactoryError::ControllerMessengerError)?;
let mut controller_receptor = controller_messenger.message(
Payload::Command(Command::ChangeState(State::Startup)).into(),
Audience::Messenger(signature),
);
while let Some(message_event) = controller_receptor.next().await {
match message_event {
MessageEvent::Status(_) => {} MessageEvent::Message(service::Payload::Controller(Payload::Result(result)), _) => {
return result.map(|_| signature).map_err(|e| {
SettingHandlerFactoryError::HandlerStartupError(
setting_type,
format!("Got bad startup response: {e:?}").into(),
)
});
}
e => {
return Err(SettingHandlerFactoryError::HandlerStartupError(
setting_type,
format!("Unexpected startup response: {e:?}").into(),
));
}
}
}
panic!("Did not get any responses from {setting_type:?} controller startup");
}
}
impl SettingHandlerFactoryImpl {
pub(crate) fn new(
settings: HashSet<SettingType>,
service_context: Rc<ServiceContext>,
context_id_counter: Rc<AtomicU64>,
) -> SettingHandlerFactoryImpl {
SettingHandlerFactoryImpl {
environment: Environment::new(settings, service_context),
generators: HashMap::new(),
context_id_counter,
}
}
pub(crate) fn register(
&mut self,
setting_type: SettingType,
generate_function: GenerateHandler,
) {
let _ = self.generators.insert(setting_type, generate_function);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::handler::base::Request;
use crate::handler::setting_handler::controller::{Create, Handle};
use crate::handler::setting_handler::{
BoxedController, ClientImpl, ControllerError, ControllerStateResult, SettingHandlerResult,
};
use fuchsia_async as fasync;
use futures::future::FutureExt;
use futures::select;
struct TestController {
client: Rc<ClientImpl>,
}
#[async_trait(?Send)]
impl Create for TestController {
async fn create(client: Rc<ClientImpl>) -> Result<Self, ControllerError> {
Ok(Self { client })
}
}
#[async_trait(?Send)]
impl Handle for TestController {
async fn handle(&self, _request: Request) -> Option<SettingHandlerResult> {
None
}
async fn change_state(&mut self, state: State) -> Option<ControllerStateResult> {
let _ = self.client.emit_state_event(state);
match state {
State::Startup => Some(Ok(())),
_ => None,
}
}
}
#[fasync::run_until_stalled(test)]
async fn ensure_startup_is_awaited() {
let delegate = service::MessageHub::create_hub();
let mut factory_impl = SettingHandlerFactoryImpl::new(
[SettingType::Unknown].into(),
Rc::new(ServiceContext::new(None, Some(delegate.clone()))),
Rc::new(AtomicU64::new(0)),
);
let generate_handler: GenerateHandler = Box::new(move |context| {
Box::pin(ClientImpl::create(
context,
Box::new(move |proxy| {
Box::pin(async move {
Ok(Box::new(TestController::create(proxy).await.unwrap())
as BoxedController)
})
}),
))
});
factory_impl.register(SettingType::Unknown, generate_handler);
let (_, broker_receptor) =
delegate.create_sink().await.expect("could not create event sink receptor");
let (_, receptor) =
delegate.create(MessengerType::Unbound).await.expect("messenger should be created");
let mut generate_future = factory_impl
.generate(SettingType::Unknown, delegate, receptor.get_signature())
.into_stream()
.fuse();
let mut broker_receptor = broker_receptor.fuse();
let mut generate_done = None;
let mut startup_done = None;
let mut idx: u8 = 0;
while generate_done.is_none() || startup_done.is_none() {
select! {
maybe = broker_receptor.next() => {
let _ = maybe.expect("should have gotten a reply");
startup_done = Some(idx);
idx += 1;
}
result = generate_future.select_next_some() => {
let _ = result.expect("should have received a signature");
generate_done = Some(idx);
idx += 1;
}
complete => break,
}
}
assert!(generate_done.unwrap() > startup_done.unwrap());
}
}