use {
crate::startup,
anyhow::{Context as _, Error},
fidl::endpoints::ProtocolMarker,
fidl_fuchsia_component as fcomponent, fidl_fuchsia_element as felement,
fidl_fuchsia_session::{
LaunchConfiguration, LaunchError, LauncherRequest, LauncherRequestStream, RestartError,
RestarterRequest, RestarterRequestStream,
},
fuchsia_component::server::{ServiceFs, ServiceObjLocal},
fuchsia_inspect_contrib::nodes::BoundedListNode,
fuchsia_zircon as zx,
futures::{lock::Mutex, StreamExt, TryFutureExt, TryStreamExt},
std::{future::Future, sync::Arc},
tracing::error,
};
const MAX_CONCURRENT_CONNECTIONS: usize = 10_000;
const DIANGNOSTICS_SESSION_STARTED_AT_NAME: &str = "session_started_at";
const DIANGNOSTICS_SESSION_STARTED_AT_SIZE: usize = 100;
const DIAGNOSTICS_TIME_PROPERTY_NAME: &str = "@time";
pub enum IncomingRequest {
Manager(felement::ManagerRequestStream),
GraphicalPresenter(felement::GraphicalPresenterRequestStream),
Launcher(LauncherRequestStream),
Restarter(RestarterRequestStream),
}
struct Diagnostics {
session_started_at: BoundedListNode,
}
impl Diagnostics {
pub fn record_session_start(&mut self) {
self.session_started_at
.create_entry()
.record_int(DIAGNOSTICS_TIME_PROPERTY_NAME, zx::Time::get_monotonic().into_nanos());
}
}
struct SessionManagerState {
session_url: Option<String>,
session_exposed_dir_channel: Option<zx::Channel>,
realm: fcomponent::RealmProxy,
diagnostics: Diagnostics,
}
#[derive(Clone)]
pub struct SessionManager {
state: Arc<Mutex<SessionManagerState>>,
}
impl SessionManager {
pub fn new(realm: fcomponent::RealmProxy, inspector: &fuchsia_inspect::Inspector) -> Self {
let session_started_at = BoundedListNode::new(
inspector.root().create_child(DIANGNOSTICS_SESSION_STARTED_AT_NAME),
DIANGNOSTICS_SESSION_STARTED_AT_SIZE,
);
let diagnostics = Diagnostics { session_started_at };
let state = SessionManagerState {
session_url: None,
session_exposed_dir_channel: None,
realm,
diagnostics,
};
SessionManager { state: Arc::new(Mutex::new(state)) }
}
pub async fn launch_startup_session(&mut self, session_url: String) -> Result<(), Error> {
let mut state = self.state.lock().await;
state.session_exposed_dir_channel =
Some(startup::launch_session(&session_url, &state.realm).await?);
state.session_url = Some(session_url);
state.diagnostics.record_session_start();
Ok(())
}
pub async fn serve(
&mut self,
fs: &mut ServiceFs<ServiceObjLocal<'_, IncomingRequest>>,
) -> Result<(), Error> {
fs.dir("svc")
.add_fidl_service(IncomingRequest::Manager)
.add_fidl_service(IncomingRequest::GraphicalPresenter)
.add_fidl_service(IncomingRequest::Launcher)
.add_fidl_service(IncomingRequest::Restarter);
fs.take_and_serve_directory_handle()?;
fs.for_each_concurrent(MAX_CONCURRENT_CONNECTIONS, |request| {
let mut session_manager = self.clone();
async move {
session_manager
.handle_incoming_request(request)
.unwrap_or_else(|err| error!(?err))
.await
}
})
.await;
Ok(())
}
async fn forward_request_to_session<RS, F, Fut>(
&mut self,
request_stream: RS,
handler: F,
) -> Result<(), Error>
where
RS: fidl::endpoints::RequestStream,
RS::Protocol: ProtocolMarker,
F: Fn(RS, <RS::Protocol as ProtocolMarker>::Proxy) -> Fut,
Fut: Future<Output = Result<(), Error>>,
{
let protocol_name = <RS::Protocol as ProtocolMarker>::DEBUG_NAME;
let (proxy, server_end) = fidl::endpoints::create_proxy::<RS::Protocol>()
.with_context(|| format!("Failed to create_proxy for {}", protocol_name))?;
{
let state = self.state.lock().await;
let session_exposed_dir_channel =
state.session_exposed_dir_channel.as_ref().with_context(|| {
format!("Failed to connect to {} because no session was started", protocol_name)
})?;
fdio::service_connect_at(
session_exposed_dir_channel,
protocol_name,
server_end.into_channel(),
)
.with_context(|| format!("Failed to connect to {}", protocol_name))?;
}
handler(request_stream, proxy)
.await
.with_context(|| format!("{} request stream got an error", protocol_name))
}
async fn handle_incoming_request(&mut self, request: IncomingRequest) -> Result<(), Error> {
match request {
IncomingRequest::Manager(request_stream) => {
self.forward_request_to_session(
request_stream,
SessionManager::handle_manager_request_stream,
)
.await?;
}
IncomingRequest::GraphicalPresenter(request_stream) => {
self.forward_request_to_session(
request_stream,
SessionManager::handle_graphical_presenter_request_stream,
)
.await?;
}
IncomingRequest::Launcher(request_stream) => {
self.handle_launcher_request_stream(request_stream)
.await
.context("Session Launcher request stream got an error.")?;
}
IncomingRequest::Restarter(request_stream) => {
self.handle_restarter_request_stream(request_stream)
.await
.context("Session Restarter request stream got an error.")?;
}
}
Ok(())
}
pub async fn handle_manager_request_stream(
mut request_stream: felement::ManagerRequestStream,
manager_proxy: felement::ManagerProxy,
) -> Result<(), Error> {
while let Some(request) =
request_stream.try_next().await.context("Error handling Manager request stream")?
{
match request {
felement::ManagerRequest::ProposeElement { spec, controller, responder } => {
let result = manager_proxy.propose_element(spec, controller).await?;
responder.send(result)?;
}
};
}
Ok(())
}
pub async fn handle_graphical_presenter_request_stream(
mut request_stream: felement::GraphicalPresenterRequestStream,
graphical_presenter_proxy: felement::GraphicalPresenterProxy,
) -> Result<(), Error> {
while let Some(request) = request_stream
.try_next()
.await
.context("Error handling Graphical Presenter request stream")?
{
match request {
felement::GraphicalPresenterRequest::PresentView {
view_spec,
annotation_controller,
view_controller_request,
responder,
} => {
let result = graphical_presenter_proxy
.present_view(view_spec, annotation_controller, view_controller_request)
.await?;
responder.send(result)?;
}
};
}
Ok(())
}
pub async fn handle_launcher_request_stream(
&mut self,
mut request_stream: LauncherRequestStream,
) -> Result<(), Error> {
while let Some(request) =
request_stream.try_next().await.context("Error handling Launcher request stream")?
{
match request {
LauncherRequest::Launch { configuration, responder } => {
let result = self.handle_launch_request(configuration).await;
let _ = responder.send(result);
}
};
}
Ok(())
}
pub async fn handle_restarter_request_stream(
&mut self,
mut request_stream: RestarterRequestStream,
) -> Result<(), Error> {
while let Some(request) =
request_stream.try_next().await.context("Error handling Restarter request stream")?
{
match request {
RestarterRequest::Restart { responder } => {
let result = self.handle_restart_request().await;
let _ = responder.send(result);
}
};
}
Ok(())
}
async fn handle_launch_request(
&mut self,
configuration: LaunchConfiguration,
) -> Result<(), LaunchError> {
if let Some(session_url) = configuration.session_url {
let mut state = self.state.lock().await;
startup::launch_session(&session_url, &state.realm)
.await
.map_err(|err| match err {
startup::StartupError::NotDestroyed { .. } => {
LaunchError::DestroyComponentFailed
}
startup::StartupError::NotCreated { err, .. } => match err {
fcomponent::Error::InstanceCannotResolve => LaunchError::NotFound,
_ => LaunchError::CreateComponentFailed,
},
startup::StartupError::ExposedDirNotOpened { .. } => {
LaunchError::CreateComponentFailed
}
startup::StartupError::NotLaunched { .. } => LaunchError::CreateComponentFailed,
})
.map(|session_exposed_dir_channel| {
state.session_url = Some(session_url);
state.session_exposed_dir_channel = Some(session_exposed_dir_channel);
state.diagnostics.record_session_start();
})
} else {
Err(LaunchError::NotFound)
}
}
async fn handle_restart_request(&mut self) -> Result<(), RestartError> {
let mut state = self.state.lock().await;
if let Some(ref session_url) = state.session_url {
startup::launch_session(&session_url, &state.realm)
.await
.map_err(|err| match err {
startup::StartupError::NotDestroyed { .. } => {
RestartError::DestroyComponentFailed
}
startup::StartupError::NotCreated { err, .. } => match err {
fcomponent::Error::InstanceCannotResolve => RestartError::NotFound,
_ => RestartError::CreateComponentFailed,
},
startup::StartupError::ExposedDirNotOpened { .. } => {
RestartError::CreateComponentFailed
}
startup::StartupError::NotLaunched { .. } => {
RestartError::CreateComponentFailed
}
})
.map(|session_exposed_dir_channel| {
state.session_exposed_dir_channel = Some(session_exposed_dir_channel);
state.diagnostics.record_session_start();
})
} else {
Err(RestartError::NotRunning)
}
}
}
#[cfg(test)]
mod tests {
use {
super::SessionManager,
fidl::endpoints::{create_proxy_and_stream, spawn_stream_handler},
fidl_fuchsia_component as fcomponent, fidl_fuchsia_element as felement,
fidl_fuchsia_session::{
LaunchConfiguration, LauncherMarker, LauncherProxy, RestartError, RestarterMarker,
RestarterProxy,
},
fuchsia_inspect::{self, assert_data_tree, testing::AnyProperty},
futures::prelude::*,
session_testing::spawn_noop_directory_server,
};
fn serve_session_manager_services(
session_manager: SessionManager,
) -> (LauncherProxy, RestarterProxy) {
let (launcher_proxy, launcher_stream) =
create_proxy_and_stream::<LauncherMarker>().unwrap();
{
let mut session_manager_ = session_manager.clone();
fuchsia_async::Task::spawn(async move {
session_manager_
.handle_launcher_request_stream(launcher_stream)
.await
.expect("Session launcher request stream got an error.");
})
.detach();
}
let (restarter_proxy, restarter_stream) =
create_proxy_and_stream::<RestarterMarker>().unwrap();
{
let mut session_manager_ = session_manager.clone();
fuchsia_async::Task::spawn(async move {
session_manager_
.handle_restarter_request_stream(restarter_stream)
.await
.expect("Session restarter request stream got an error.");
})
.detach();
}
(launcher_proxy, restarter_proxy)
}
#[fuchsia::test]
async fn test_launch() {
let session_url = "session";
let realm = spawn_stream_handler(move |realm_request| async move {
match realm_request {
fcomponent::RealmRequest::DestroyChild { child: _, responder } => {
let _ = responder.send(Ok(()));
}
fcomponent::RealmRequest::CreateChild {
collection: _,
decl,
args: _,
responder,
} => {
assert_eq!(decl.url.unwrap(), session_url);
let _ = responder.send(Ok(()));
}
fcomponent::RealmRequest::OpenExposedDir { child: _, exposed_dir, responder } => {
spawn_noop_directory_server(exposed_dir);
let _ = responder.send(Ok(()));
}
_ => panic!("Realm handler received an unexpected request"),
};
})
.unwrap();
let inspector = fuchsia_inspect::Inspector::default();
let session_manager = SessionManager::new(realm, &inspector);
let (launcher, _restarter) = serve_session_manager_services(session_manager);
assert!(launcher
.launch(&LaunchConfiguration {
session_url: Some(session_url.to_string()),
..Default::default()
})
.await
.is_ok());
assert_data_tree!(inspector, root: {
session_started_at: {
"0": {
"@time": AnyProperty
}
}
});
}
#[fuchsia::test]
async fn test_restart() {
let session_url = "session";
let realm = spawn_stream_handler(move |realm_request| async move {
match realm_request {
fcomponent::RealmRequest::DestroyChild { child: _, responder } => {
let _ = responder.send(Ok(()));
}
fcomponent::RealmRequest::CreateChild {
collection: _,
decl,
args: _,
responder,
} => {
assert_eq!(decl.url.unwrap(), session_url);
let _ = responder.send(Ok(()));
}
fcomponent::RealmRequest::OpenExposedDir { child: _, exposed_dir, responder } => {
spawn_noop_directory_server(exposed_dir);
let _ = responder.send(Ok(()));
}
_ => panic!("Realm handler received an unexpected request"),
};
})
.unwrap();
let inspector = fuchsia_inspect::Inspector::default();
let session_manager = SessionManager::new(realm, &inspector);
let (launcher, restarter) = serve_session_manager_services(session_manager);
assert!(launcher
.launch(&LaunchConfiguration {
session_url: Some(session_url.to_string()),
..Default::default()
})
.await
.expect("could not call Launch")
.is_ok());
assert!(restarter.restart().await.expect("could not call Restart").is_ok());
assert_data_tree!(inspector, root: {
session_started_at: {
"0": {
"@time": AnyProperty
},
"1": {
"@time": AnyProperty
}
}
});
}
#[fuchsia::test]
async fn test_restart_error_not_running() {
let realm = spawn_stream_handler(move |_realm_request| async move {
panic!("Realm should not receive any requests as there is no session to launch")
})
.unwrap();
let inspector = fuchsia_inspect::Inspector::default();
let session_manager = SessionManager::new(realm, &inspector);
let (_launcher, restarter) = serve_session_manager_services(session_manager);
assert_eq!(
Err(RestartError::NotRunning),
restarter.restart().await.expect("could not call Restart")
);
assert_data_tree!(inspector, root: {
session_started_at: {}
});
}
#[fuchsia::test]
async fn handle_element_manager_request_stream_propagates_request_to_downstream_service() {
let (local_proxy, local_request_stream) =
create_proxy_and_stream::<felement::ManagerMarker>()
.expect("Failed to create local Manager proxy and stream");
let (downstream_proxy, mut downstream_request_stream) =
create_proxy_and_stream::<felement::ManagerMarker>()
.expect("Failed to create downstream Manager proxy and stream");
let element_url = "element_url";
let mut num_elements_proposed = 0;
let local_server_fut =
SessionManager::handle_manager_request_stream(local_request_stream, downstream_proxy);
let downstream_server_fut = async {
while let Some(request) = downstream_request_stream.try_next().await.unwrap() {
match request {
felement::ManagerRequest::ProposeElement { spec, responder, .. } => {
num_elements_proposed += 1;
assert_eq!(Some(element_url.to_string()), spec.component_url);
let _ = responder.send(Ok(()));
}
}
}
};
let propose_and_drop_fut = async {
local_proxy
.propose_element(
felement::Spec {
component_url: Some(element_url.to_string()),
..Default::default()
},
None,
)
.await
.expect("Failed to call ProposeElement")
.expect("Failed to propose element");
std::mem::drop(local_proxy); };
let _ = future::join3(propose_and_drop_fut, local_server_fut, downstream_server_fut).await;
assert_eq!(num_elements_proposed, 1);
}
}