use anyhow::{format_err, Context as _, Error};
use fidl::endpoints::{
DiscoverableProtocolMarker, MemberOpener, ProtocolMarker, ServiceMarker, ServiceProxy,
};
use fidl_fuchsia_component::{RealmMarker, RealmProxy};
use fidl_fuchsia_component_decl::ChildRef;
use fidl_fuchsia_io as fio;
use fuchsia_fs::directory::{WatchEvent, Watcher};
use futures::stream::FusedStream;
use futures::{Stream, StreamExt};
use pin_project::pin_project;
use std::borrow::Borrow;
use std::marker::PhantomData;
use std::pin::pin;
use std::task::Poll;
use crate::directory::{open_directory_async, AsRefDirectory};
use crate::SVC_DIR;
pub struct ProtocolConnector<D: Borrow<fio::DirectoryProxy>, P: DiscoverableProtocolMarker> {
svc_dir: D,
_svc_marker: PhantomData<P>,
}
impl<D: Borrow<fio::DirectoryProxy>, P: DiscoverableProtocolMarker> ProtocolConnector<D, P> {
fn new(svc_dir: D) -> ProtocolConnector<D, P> {
ProtocolConnector { svc_dir, _svc_marker: PhantomData }
}
pub async fn exists(&self) -> Result<bool, Error> {
match fuchsia_fs::directory::dir_contains(self.svc_dir.borrow(), P::PROTOCOL_NAME).await {
Ok(v) => Ok(v),
Err(fuchsia_fs::directory::EnumerateError::Fidl(
_,
fidl::Error::ClientChannelClosed { status, protocol_name: _ },
)) if status == zx::Status::PEER_CLOSED => Ok(false),
Err(e) => Err(Error::new(e).context("error checking for service entry in directory")),
}
}
pub fn connect_with(self, server_end: zx::Channel) -> Result<(), Error> {
self.svc_dir
.borrow()
.open3(
P::PROTOCOL_NAME,
fio::Flags::PROTOCOL_SERVICE,
&fio::Options::default(),
server_end.into(),
)
.context("error connecting to protocol")
}
pub fn connect(self) -> Result<P::Proxy, Error> {
let (proxy, server_end) = fidl::endpoints::create_proxy::<P>();
let () = self
.connect_with(server_end.into_channel())
.context("error connecting with server channel")?;
Ok(proxy)
}
}
pub fn clone_namespace_svc() -> Result<fio::DirectoryProxy, Error> {
fuchsia_fs::directory::open_in_namespace(SVC_DIR, fio::Flags::empty())
.context("error opening svc directory")
}
pub fn new_protocol_connector<P: DiscoverableProtocolMarker>(
) -> Result<ProtocolConnector<fio::DirectoryProxy, P>, Error> {
new_protocol_connector_at::<P>(SVC_DIR)
}
pub fn new_protocol_connector_at<P: DiscoverableProtocolMarker>(
service_directory_path: &str,
) -> Result<ProtocolConnector<fio::DirectoryProxy, P>, Error> {
let dir = fuchsia_fs::directory::open_in_namespace(service_directory_path, fio::Flags::empty())
.context("error opening service directory")?;
Ok(ProtocolConnector::new(dir))
}
pub fn new_protocol_connector_in_dir<P: DiscoverableProtocolMarker>(
dir: &fio::DirectoryProxy,
) -> ProtocolConnector<&fio::DirectoryProxy, P> {
ProtocolConnector::new(dir)
}
pub fn connect_channel_to_protocol<P: DiscoverableProtocolMarker>(
server_end: zx::Channel,
) -> Result<(), Error> {
connect_channel_to_protocol_at::<P>(server_end, SVC_DIR)
}
pub fn connect_channel_to_protocol_at<P: DiscoverableProtocolMarker>(
server_end: zx::Channel,
service_directory_path: &str,
) -> Result<(), Error> {
let protocol_path = format!("{}/{}", service_directory_path, P::PROTOCOL_NAME);
connect_channel_to_protocol_at_path(server_end, &protocol_path)
}
pub fn connect_channel_to_protocol_at_path(
server_end: zx::Channel,
protocol_path: &str,
) -> Result<(), Error> {
fdio::service_connect(&protocol_path, server_end)
.with_context(|| format!("Error connecting to protocol path: {}", protocol_path))
}
pub fn connect_to_protocol<P: DiscoverableProtocolMarker>() -> Result<P::Proxy, Error> {
connect_to_protocol_at::<P>(SVC_DIR)
}
pub fn connect_to_protocol_sync<P: DiscoverableProtocolMarker>(
) -> Result<P::SynchronousProxy, Error> {
connect_to_protocol_sync_at::<P>(SVC_DIR)
}
pub fn connect_to_protocol_at<P: DiscoverableProtocolMarker>(
service_prefix: impl AsRef<str>,
) -> Result<P::Proxy, Error> {
let (proxy, server_end) = fidl::endpoints::create_proxy::<P>();
let () =
connect_channel_to_protocol_at::<P>(server_end.into_channel(), service_prefix.as_ref())?;
Ok(proxy)
}
pub fn connect_to_protocol_sync_at<P: DiscoverableProtocolMarker>(
service_prefix: impl AsRef<str>,
) -> Result<P::SynchronousProxy, Error> {
let (proxy, server_end) = fidl::endpoints::create_sync_proxy::<P>();
let () =
connect_channel_to_protocol_at::<P>(server_end.into_channel(), service_prefix.as_ref())?;
Ok(proxy)
}
pub fn connect_to_protocol_at_path<P: ProtocolMarker>(
protocol_path: impl AsRef<str>,
) -> Result<P::Proxy, Error> {
let (proxy, server_end) = fidl::endpoints::create_proxy::<P>();
let () =
connect_channel_to_protocol_at_path(server_end.into_channel(), protocol_path.as_ref())?;
Ok(proxy)
}
pub fn connect_to_protocol_at_dir_root<P: DiscoverableProtocolMarker>(
directory: &impl AsRefDirectory,
) -> Result<P::Proxy, Error> {
connect_to_named_protocol_at_dir_root::<P>(directory, P::PROTOCOL_NAME)
}
pub fn connect_to_named_protocol_at_dir_root<P: ProtocolMarker>(
directory: &impl AsRefDirectory,
filename: &str,
) -> Result<P::Proxy, Error> {
let (proxy, server_end) = fidl::endpoints::create_proxy::<P>();
directory.as_ref_directory().open(
filename,
fio::Flags::PROTOCOL_SERVICE,
server_end.into_channel().into(),
)?;
Ok(proxy)
}
pub fn connect_to_protocol_at_dir_svc<P: DiscoverableProtocolMarker>(
directory: &impl AsRefDirectory,
) -> Result<P::Proxy, Error> {
let protocol_path = format!("{}/{}", SVC_DIR, P::PROTOCOL_NAME);
connect_to_named_protocol_at_dir_root::<P>(directory, &protocol_path)
}
pub struct ServiceInstanceDirectory(pub fio::DirectoryProxy, pub String);
impl MemberOpener for ServiceInstanceDirectory {
fn open_member(&self, member: &str, server_end: zx::Channel) -> Result<(), fidl::Error> {
let Self(directory, _) = self;
directory.open3(member, fio::Flags::PROTOCOL_SERVICE, &fio::Options::default(), server_end)
}
fn instance_name(&self) -> &str {
let Self(_, instance_name) = self;
return &instance_name;
}
}
struct ServiceInstance<S> {
name: String,
service: Service<S>,
}
impl<S: ServiceMarker> MemberOpener for ServiceInstance<S> {
fn open_member(&self, member: &str, server_end: zx::Channel) -> Result<(), fidl::Error> {
self.service.connect_to_instance_member_with_channel(&self.name, member, server_end)
}
fn instance_name(&self) -> &str {
return &self.name;
}
}
pub struct Service<S> {
dir: fio::DirectoryProxy,
_marker: S,
}
impl<S: Clone> Clone for Service<S> {
fn clone(&self) -> Self {
Self { dir: Clone::clone(&self.dir), _marker: self._marker.clone() }
}
}
impl<S> From<fio::DirectoryProxy> for Service<S>
where
S: Default,
{
fn from(dir: fio::DirectoryProxy) -> Self {
Self { dir, _marker: S::default() }
}
}
impl<S: ServiceMarker> Service<S> {
pub fn from_service_dir_proxy(dir: fio::DirectoryProxy, _marker: S) -> Self {
Self { dir, _marker }
}
pub fn open(marker: S) -> Result<Self, Error> {
Ok(Self::from_service_dir_proxy(open_service::<S>()?, marker))
}
pub fn open_from_dir(svc_dir: impl AsRefDirectory, marker: S) -> Result<Self, Error> {
let dir = open_directory_async(&svc_dir, S::SERVICE_NAME, fio::Rights::empty())?;
Ok(Self::from_service_dir_proxy(dir, marker))
}
pub fn open_from_dir_prefix(
dir: impl AsRefDirectory,
prefix: impl AsRef<str>,
marker: S,
) -> Result<Self, Error> {
let prefix = prefix.as_ref();
let service_path = format!("{prefix}/{}", S::SERVICE_NAME);
let service_path = service_path.strip_prefix('/').unwrap_or_else(|| service_path.as_ref());
let dir = open_directory_async(&dir, &service_path, fio::Rights::empty())?;
Ok(Self::from_service_dir_proxy(dir, marker))
}
pub fn connect_to_instance(&self, name: impl AsRef<str>) -> Result<S::Proxy, Error> {
let directory_proxy = fuchsia_fs::directory::open_directory_async(
&self.dir,
name.as_ref(),
fio::Flags::empty(),
)?;
Ok(S::Proxy::from_member_opener(Box::new(ServiceInstanceDirectory(
directory_proxy,
name.as_ref().to_string(),
))))
}
fn connect_to_instance_member_with_channel(
&self,
instance: impl AsRef<str>,
member: impl AsRef<str>,
server_end: zx::Channel,
) -> Result<(), fidl::Error> {
let path = format!("{}/{}", instance.as_ref(), member.as_ref());
self.dir.open3(&path, fio::Flags::PROTOCOL_SERVICE, &fio::Options::default(), server_end)
}
pub async fn watch(self) -> Result<ServiceInstanceStream<S>, Error> {
let watcher = Watcher::new(&self.dir).await?;
let finished = false;
Ok(ServiceInstanceStream { service: self, watcher, finished })
}
pub async fn watch_for_any(self) -> Result<S::Proxy, Error> {
self.watch()
.await?
.next()
.await
.context("No instances found before service directory was removed")?
}
pub async fn enumerate(self) -> Result<Vec<S::Proxy>, Error> {
let instances: Vec<S::Proxy> = fuchsia_fs::directory::readdir(&self.dir)
.await?
.into_iter()
.map(|dirent| {
S::Proxy::from_member_opener(Box::new(ServiceInstance {
service: self.clone(),
name: dirent.name,
}))
})
.collect();
Ok(instances)
}
}
#[pin_project]
pub struct ServiceInstanceStream<S: Clone> {
service: Service<S>,
watcher: Watcher,
finished: bool,
}
impl<S: ServiceMarker> Stream for ServiceInstanceStream<S> {
type Item = Result<S::Proxy, Error>;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let this = self.project();
use Poll::*;
if *this.finished {
return Poll::Ready(None);
}
while let Ready(next) = this.watcher.poll_next_unpin(cx) {
match next {
Some(Ok(state)) => match state.event {
WatchEvent::DELETED => {
*this.finished = true;
return Ready(None);
}
WatchEvent::ADD_FILE | WatchEvent::EXISTING => {
let filename = state.filename.to_str().unwrap();
if filename != "." {
let proxy = S::Proxy::from_member_opener(Box::new(ServiceInstance {
service: this.service.clone(),
name: filename.to_owned(),
}));
return Ready(Some(Ok(proxy)));
}
}
_ => {}
},
Some(Err(err)) => {
*this.finished = true;
return Ready(Some(Err(err.into())));
}
None => {
*this.finished = true;
return Ready(None);
}
}
}
Pending
}
}
impl<S: ServiceMarker> FusedStream for ServiceInstanceStream<S> {
fn is_terminated(&self) -> bool {
self.finished
}
}
pub fn connect_to_service_instance<S: ServiceMarker>(instance: &str) -> Result<S::Proxy, Error> {
let service_path = format!("{}/{}/{}", SVC_DIR, S::SERVICE_NAME, instance);
let directory_proxy =
fuchsia_fs::directory::open_in_namespace(&service_path, fio::Flags::empty())?;
Ok(S::Proxy::from_member_opener(Box::new(ServiceInstanceDirectory(
directory_proxy,
instance.to_string(),
))))
}
pub fn connect_to_service_instance_at_dir<S: ServiceMarker>(
directory: &fio::DirectoryProxy,
instance: &str,
) -> Result<S::Proxy, Error> {
let service_path = format!("{}/{}", S::SERVICE_NAME, instance);
let directory_proxy =
fuchsia_fs::directory::open_directory_async(directory, &service_path, fio::Flags::empty())?;
Ok(S::Proxy::from_member_opener(Box::new(ServiceInstanceDirectory(
directory_proxy,
instance.to_string(),
))))
}
pub fn connect_to_service_instance_at_dir_svc<S: ServiceMarker>(
directory: &impl AsRefDirectory,
instance: impl AsRef<str>,
) -> Result<S::Proxy, Error> {
let service_path = format!("{SVC_DIR}/{}/{}", S::SERVICE_NAME, instance.as_ref());
let service_path = service_path.strip_prefix('/').unwrap();
let directory_proxy = open_directory_async(directory, service_path, fio::Rights::empty())?;
Ok(S::Proxy::from_member_opener(Box::new(ServiceInstanceDirectory(
directory_proxy,
instance.as_ref().to_string(),
))))
}
pub fn open_service<S: ServiceMarker>() -> Result<fio::DirectoryProxy, Error> {
let service_path = format!("{}/{}", SVC_DIR, S::SERVICE_NAME);
fuchsia_fs::directory::open_in_namespace(&service_path, fio::Flags::empty())
.context("namespace open failed")
}
pub async fn open_childs_exposed_directory(
child_name: impl Into<String>,
collection_name: Option<String>,
) -> Result<fio::DirectoryProxy, Error> {
let realm_proxy = connect_to_protocol::<RealmMarker>()?;
let (directory_proxy, server_end) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
let child_ref = ChildRef { name: child_name.into(), collection: collection_name };
realm_proxy.open_exposed_dir(&child_ref, server_end).await?.map_err(|e| {
let ChildRef { name, collection } = child_ref;
format_err!("failed to bind to child {} in collection {:?}: {:?}", name, collection, e)
})?;
Ok(directory_proxy)
}
pub async fn connect_to_childs_protocol<P: DiscoverableProtocolMarker>(
child_name: String,
collection_name: Option<String>,
) -> Result<P::Proxy, Error> {
let child_exposed_directory =
open_childs_exposed_directory(child_name, collection_name).await?;
connect_to_protocol_at_dir_root::<P>(&child_exposed_directory)
}
pub fn realm() -> Result<RealmProxy, Error> {
connect_to_protocol::<RealmMarker>()
}
#[allow(missing_docs)]
#[cfg(test)]
pub mod test_util {
use super::*;
use std::sync::Arc;
use vfs::directory::entry_container::Directory;
use vfs::execution_scope::ExecutionScope;
use vfs::ObjectRequest;
#[cfg(test)]
pub fn run_directory_server(dir: Arc<dyn Directory>) -> fio::DirectoryProxy {
let (dir_proxy, dir_server) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
let scope = ExecutionScope::new();
let flags =
fio::Flags::PROTOCOL_DIRECTORY | fio::Flags::from_bits(fio::R_STAR_DIR.bits()).unwrap();
ObjectRequest::new(flags, &fio::Options::default(), dir_server.into_channel())
.handle(|request| dir.open3(scope, vfs::path::Path::dot(), flags, request));
dir_proxy
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::sync::Arc;
use super::*;
use fidl::endpoints::ServiceMarker as _;
use fidl_fuchsia_component_client_test::{
ProtocolAMarker, ProtocolAProxy, ProtocolBMarker, ProtocolBProxy, ServiceMarker,
};
use fuchsia_async::{self as fasync};
use futures::{future, TryStreamExt};
use vfs::directory::simple::Simple;
use vfs::file::vmo::read_only;
use vfs::pseudo_directory;
#[fasync::run_singlethreaded(test)]
async fn test_svc_connector_svc_does_not_exist() -> Result<(), Error> {
let req = new_protocol_connector::<ProtocolAMarker>().context("error probing service")?;
assert_matches::assert_matches!(
req.exists().await.context("error checking service"),
Ok(false)
);
let _: ProtocolAProxy = req.connect().context("error connecting to service")?;
let req = new_protocol_connector_at::<ProtocolAMarker>(SVC_DIR)
.context("error probing service at svc dir")?;
assert_matches::assert_matches!(
req.exists().await.context("error checking service at svc dir"),
Ok(false)
);
let _: ProtocolAProxy = req.connect().context("error connecting to service at svc dir")?;
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_svc_connector_connect_with_dir() -> Result<(), Error> {
let dir = pseudo_directory! {
ProtocolBMarker::PROTOCOL_NAME => read_only("read_only"),
};
let dir_proxy = test_util::run_directory_server(dir);
let req = new_protocol_connector_in_dir::<ProtocolAMarker>(&dir_proxy);
assert_matches::assert_matches!(
req.exists().await.context("error probing invalid service"),
Ok(false)
);
let _: ProtocolAProxy = req.connect().context("error connecting to invalid service")?;
let req = new_protocol_connector_in_dir::<ProtocolBMarker>(&dir_proxy);
assert_matches::assert_matches!(
req.exists().await.context("error probing service"),
Ok(true)
);
let _: ProtocolBProxy = req.connect().context("error connecting to service")?;
Ok(())
}
fn make_inner_service_instance_tree() -> Arc<Simple> {
pseudo_directory! {
ServiceMarker::SERVICE_NAME => pseudo_directory! {
"default" => read_only("read_only"),
"another_instance" => read_only("read_only"),
},
}
}
fn make_service_instance_tree() -> Arc<Simple> {
pseudo_directory! {
"svc" => make_inner_service_instance_tree(),
}
}
#[fasync::run_until_stalled(test)]
async fn test_service_instance_watcher_from_root() -> Result<(), Error> {
let dir_proxy = test_util::run_directory_server(make_service_instance_tree());
let watcher = Service::open_from_dir_prefix(&dir_proxy, SVC_DIR, ServiceMarker)?;
let found_names: HashSet<_> = watcher
.watch()
.await?
.take(2)
.and_then(|proxy| future::ready(Ok(proxy.instance_name().to_owned())))
.try_collect()
.await?;
assert_eq!(
found_names,
HashSet::from_iter(["default".to_owned(), "another_instance".to_owned()])
);
Ok(())
}
#[fasync::run_until_stalled(test)]
async fn test_service_instance_watcher_from_svc() -> Result<(), Error> {
let dir_proxy = test_util::run_directory_server(make_inner_service_instance_tree());
let watcher = Service::open_from_dir(&dir_proxy, ServiceMarker)?;
let found_names: HashSet<_> = watcher
.watch()
.await?
.take(2)
.and_then(|proxy| future::ready(Ok(proxy.instance_name().to_owned())))
.try_collect()
.await?;
assert_eq!(
found_names,
HashSet::from_iter(["default".to_owned(), "another_instance".to_owned()])
);
Ok(())
}
#[fasync::run_until_stalled(test)]
async fn test_connect_to_all_services() -> Result<(), Error> {
let dir_proxy = test_util::run_directory_server(make_service_instance_tree());
let watcher = Service::open_from_dir_prefix(&dir_proxy, SVC_DIR, ServiceMarker)?;
let _: Vec<_> = watcher.watch().await?.take(2).try_collect().await?;
Ok(())
}
#[fasync::run_until_stalled(test)]
async fn test_connect_to_any() -> Result<(), Error> {
let dir_proxy = test_util::run_directory_server(make_service_instance_tree());
let watcher = Service::open_from_dir_prefix(&dir_proxy, SVC_DIR, ServiceMarker)?;
let found = watcher.watch_for_any().await?;
assert!(["default", "another_instance"].contains(&found.instance_name()));
Ok(())
}
}