settings/agent/
camera_watcher.rsuse crate::agent::{
AgentError, Context as AgentContext, Invocation, InvocationResult, Lifespan, Payload,
};
use crate::base::SettingType;
use crate::event::{camera_watcher, Event, Publisher};
use crate::handler::base::{Payload as HandlerPayload, Request};
use crate::input::common::connect_to_camera;
use crate::message::base::Audience;
use crate::service_context::ServiceContext;
use crate::{service, trace, trace_guard};
use fuchsia_async as fasync;
use std::collections::HashSet;
use std::rc::Rc;
fn get_event_setting_types() -> HashSet<SettingType> {
vec![SettingType::Input].into_iter().collect()
}
pub(crate) struct CameraWatcherAgent {
publisher: Publisher,
messenger: service::message::Messenger,
recipient_settings: HashSet<SettingType>,
}
impl CameraWatcherAgent {
pub(crate) async fn create(context: AgentContext) {
let mut agent = CameraWatcherAgent {
publisher: context.get_publisher(),
messenger: context
.create_messenger()
.await
.expect("messenger should be created for CameraWatchAgent"),
recipient_settings: context
.available_components
.intersection(&get_event_setting_types())
.cloned()
.collect::<HashSet<SettingType>>(),
};
let mut receptor = context.receptor;
fasync::Task::local(async move {
let id = fuchsia_trace::Id::new();
let guard = trace_guard!(id, c"camera watcher agent");
while let Ok((payload, client)) = receptor.next_of::<Payload>().await {
trace!(id, c"payload");
if let Payload::Invocation(invocation) = payload {
let _ = client.reply(Payload::Complete(agent.handle(invocation).await).into());
}
}
drop(guard);
tracing::info!("Camera watcher agent done processing requests");
})
.detach()
}
async fn handle(&mut self, invocation: Invocation) -> InvocationResult {
match invocation.lifespan {
Lifespan::Initialization => Err(AgentError::UnhandledLifespan),
Lifespan::Service => self.handle_service_lifespan(invocation.service_context).await,
}
}
async fn handle_service_lifespan(
&mut self,
service_context: Rc<ServiceContext>,
) -> InvocationResult {
match connect_to_camera(service_context).await {
Ok(camera_device_client) => {
let mut event_handler = EventHandler {
publisher: self.publisher.clone(),
messenger: self.messenger.clone(),
recipient_settings: self.recipient_settings.clone(),
sw_muted: false,
};
fasync::Task::local(async move {
let id = fuchsia_trace::Id::new();
trace!(id, c"camera_watcher_agent_handler");
while let Ok((sw_muted, _hw_muted)) =
camera_device_client.watch_mute_state().await
{
trace!(id, c"event");
event_handler.handle_event(sw_muted);
}
})
.detach();
Ok(())
}
Err(e) => {
tracing::error!("Unable to watch camera device: {:?}", e);
Err(AgentError::UnexpectedError)
}
}
}
}
struct EventHandler {
publisher: Publisher,
messenger: service::message::Messenger,
recipient_settings: HashSet<SettingType>,
sw_muted: bool,
}
impl EventHandler {
fn handle_event(&mut self, sw_muted: bool) {
if self.sw_muted != sw_muted {
self.sw_muted = sw_muted;
self.send_event(sw_muted);
}
}
fn send_event(&self, muted: bool) {
self.publisher.send_event(Event::CameraUpdate(camera_watcher::Event::OnSWMuteState(muted)));
let setting_request: Request = Request::OnCameraSWState(muted);
for setting_type in self.recipient_settings.iter() {
let _ = self.messenger.message(
HandlerPayload::Request(setting_request.clone()).into(),
Audience::Address(service::Address::Handler(*setting_type)),
);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::event;
use crate::message::base::{MessageEvent, MessengerType};
use crate::message::receptor::Receptor;
use crate::tests::fakes::service_registry::ServiceRegistry;
use crate::tests::helpers::{
create_messenger_and_publisher, create_messenger_and_publisher_from_hub,
create_receptor_for_setting_type,
};
use assert_matches::assert_matches;
use futures::StreamExt;
#[fuchsia::test(allow_stalls = false)]
async fn initialization_lifespan_is_unhandled() {
let (messenger, publisher) = create_messenger_and_publisher().await;
let mut agent =
CameraWatcherAgent { publisher, messenger, recipient_settings: HashSet::new() };
let result = agent
.handle(Invocation {
lifespan: Lifespan::Initialization,
service_context: Rc::new(ServiceContext::new(None, None)),
})
.await;
assert!(matches!(result, Err(AgentError::UnhandledLifespan)));
}
#[fuchsia::test(allow_stalls = false)]
async fn when_camera3_inaccessible_returns_err() {
let (messenger, publisher) = create_messenger_and_publisher().await;
let mut agent =
CameraWatcherAgent { publisher, messenger, recipient_settings: HashSet::new() };
let service_context = Rc::new(ServiceContext::new(
Some(ServiceRegistry::serve(ServiceRegistry::create())),
None,
));
let result =
agent.handle(Invocation { lifespan: Lifespan::Service, service_context }).await;
assert!(matches!(result, Err(AgentError::UnexpectedError)));
}
#[fuchsia::test(allow_stalls = false)]
async fn event_handler_proxies_event() {
let service_message_hub = service::MessageHub::create_hub();
let (messenger, publisher) =
create_messenger_and_publisher_from_hub(&service_message_hub).await;
let event_receptor = service::build_event_listener(&service_message_hub).await;
let handler_receptor: Receptor =
create_receptor_for_setting_type(&service_message_hub, SettingType::Unknown).await;
let mut event_handler = EventHandler {
publisher,
messenger,
recipient_settings: vec![SettingType::Unknown].into_iter().collect(),
sw_muted: false,
};
event_handler.handle_event(true);
service_message_hub.delete(handler_receptor.get_signature());
service_message_hub.delete(event_receptor.get_signature());
let mut agent_received_sw_mute = false;
let mut handler_received_event = false;
let fused_event = event_receptor.fuse();
let fused_setting_handler = handler_receptor.fuse();
futures::pin_mut!(fused_event, fused_setting_handler);
loop {
futures::select! {
message = fused_event.select_next_some() => {
if let MessageEvent::Message(service::Payload::Event(event::Payload::Event(
event::Event::CameraUpdate(event)
)), _) = message
{
match event {
event::camera_watcher::Event::OnSWMuteState(muted) => {
assert!(muted);
agent_received_sw_mute = true;
}
}
}
},
message = fused_setting_handler.select_next_some() => {
if let MessageEvent::Message(
service::Payload::Setting(HandlerPayload::Request(
Request::OnCameraSWState(_muted))),
_,
) = message
{
handler_received_event = true;
}
}
complete => break,
}
}
assert!(agent_received_sw_mute);
assert!(handler_received_event);
}
#[fuchsia::test(allow_stalls = false)]
async fn event_handler_sends_no_events_if_no_settings_available() {
let service_message_hub = service::MessageHub::create_hub();
let (messenger, publisher) =
create_messenger_and_publisher_from_hub(&service_message_hub).await;
let handler_address = service::Address::Handler(SettingType::Unknown);
let verification_request = Request::Get;
let mut handler_receptor: Receptor = service_message_hub
.create(MessengerType::Addressable(handler_address))
.await
.expect("Unable to create handler receptor")
.1;
let mut event_handler = EventHandler {
publisher,
messenger,
recipient_settings: HashSet::new(),
sw_muted: false,
};
event_handler.handle_event(true);
let _ = service_message_hub
.create(MessengerType::Unbound)
.await
.expect("Unable to create messenger")
.0
.message(
HandlerPayload::Request(verification_request.clone()).into(),
Audience::Address(handler_address),
);
service_message_hub.delete(handler_receptor.get_signature());
assert_matches!(
handler_receptor.next_of::<HandlerPayload>().await,
Ok((HandlerPayload::Request(request), _))
if request == verification_request
)
}
}