1#![deny(missing_docs)]
8
9pub mod fidl_next;
10
11use anyhow::{Context as _, Error, format_err};
12use fidl::endpoints::{
13 DiscoverableProtocolMarker, FromClient, MemberOpener, ProtocolMarker, ServiceMarker,
14 ServiceProxy,
15};
16use fidl_fuchsia_component::{RealmMarker, RealmProxy};
17use fidl_fuchsia_component_decl::ChildRef;
18use fidl_fuchsia_io as fio;
19use fuchsia_component_directory::{AsRefDirectory, open_directory_async};
20use fuchsia_fs::directory::{WatchEvent, Watcher};
21use futures::stream::FusedStream;
22use futures::{Stream, StreamExt};
23use pin_project::pin_project;
24use std::borrow::Borrow;
25use std::marker::PhantomData;
26use std::task::Poll;
27
28pub trait Connect: Sized + FromClient<Protocol: DiscoverableProtocolMarker> {
30 fn connect() -> Result<Self, Error> {
32 Self::connect_at(SVC_DIR)
33 }
34
35 fn connect_at(service_prefix: impl AsRef<str>) -> Result<Self, Error> {
37 let (client, server_end) = fidl::endpoints::create_endpoints::<Self::Protocol>();
38 let () = connect_channel_to_protocol_at::<Self::Protocol>(
39 server_end.into_channel(),
40 service_prefix.as_ref(),
41 )?;
42 Ok(Self::from_client(client))
43 }
44
45 fn connect_at_dir_svc(directory: &impl AsRefDirectory) -> Result<Self, Error> {
47 let protocol_path = format!("{}/{}", SVC_DIR, Self::Protocol::PROTOCOL_NAME);
48 Self::connect_at_dir_root_with_name(directory, &protocol_path)
49 }
50
51 fn connect_at_dir_root(directory: &impl AsRefDirectory) -> Result<Self, Error> {
53 Self::connect_at_dir_root_with_name(directory, Self::Protocol::PROTOCOL_NAME)
54 }
55
56 fn connect_at_dir_root_with_name(
58 directory: &impl AsRefDirectory,
59 filename: &str,
60 ) -> Result<Self, Error> {
61 let (client, server) = fidl::endpoints::create_endpoints::<Self::Protocol>();
62 directory.as_ref_directory().open(
63 filename,
64 fio::Flags::PROTOCOL_SERVICE,
65 server.into_channel().into(),
66 )?;
67 Ok(Self::from_client(client))
68 }
69}
70
71impl<T: FromClient<Protocol: DiscoverableProtocolMarker>> Connect for T {}
72
73pub mod connect {
79 use super::*;
80
81 pub fn connect_to_protocol<T: Connect>() -> Result<T, Error> {
83 T::connect()
84 }
85
86 pub fn connect_to_protocol_at<T: Connect>(service_prefix: impl AsRef<str>) -> Result<T, Error> {
88 T::connect_at(service_prefix)
89 }
90
91 pub fn connect_to_protocol_at_dir_svc<T: Connect>(
93 directory: &impl AsRefDirectory,
94 ) -> Result<T, Error> {
95 T::connect_at_dir_svc(directory)
96 }
97
98 pub fn connect_to_protocol_at_dir_root<T: Connect>(
100 directory: &impl AsRefDirectory,
101 ) -> Result<T, Error> {
102 T::connect_at_dir_root(directory)
103 }
104
105 pub fn connect_to_named_protocol_at_dir_root<T: Connect>(
107 directory: &impl AsRefDirectory,
108 filename: &str,
109 ) -> Result<T, Error> {
110 T::connect_at_dir_root_with_name(directory, filename)
111 }
112}
113
114pub const SVC_DIR: &'static str = "/svc";
116
117pub struct ProtocolConnector<D: Borrow<fio::DirectoryProxy>, P: DiscoverableProtocolMarker> {
119 svc_dir: D,
120 _svc_marker: PhantomData<P>,
121}
122
123impl<D: Borrow<fio::DirectoryProxy>, P: DiscoverableProtocolMarker> ProtocolConnector<D, P> {
124 fn new(svc_dir: D) -> ProtocolConnector<D, P> {
126 ProtocolConnector { svc_dir, _svc_marker: PhantomData }
127 }
128
129 pub async fn exists(&self) -> Result<bool, Error> {
134 match fuchsia_fs::directory::dir_contains(self.svc_dir.borrow(), P::PROTOCOL_NAME).await {
135 Ok(v) => Ok(v),
136 Err(fuchsia_fs::directory::EnumerateError::Fidl(
139 _,
140 fidl::Error::ClientChannelClosed { status, .. },
141 )) if status == zx::Status::PEER_CLOSED => Ok(false),
142 Err(e) => Err(Error::new(e).context("error checking for service entry in directory")),
143 }
144 }
145
146 pub fn connect_with(self, server_end: zx::Channel) -> Result<(), Error> {
151 #[cfg(fuchsia_api_level_at_least = "27")]
152 return self
153 .svc_dir
154 .borrow()
155 .open(
156 P::PROTOCOL_NAME,
157 fio::Flags::PROTOCOL_SERVICE,
158 &fio::Options::default(),
159 server_end.into(),
160 )
161 .context("error connecting to protocol");
162 #[cfg(not(fuchsia_api_level_at_least = "27"))]
163 return self
164 .svc_dir
165 .borrow()
166 .open3(
167 P::PROTOCOL_NAME,
168 fio::Flags::PROTOCOL_SERVICE,
169 &fio::Options::default(),
170 server_end.into(),
171 )
172 .context("error connecting to protocol");
173 }
174
175 pub fn connect(self) -> Result<P::Proxy, Error> {
180 let (proxy, server_end) = fidl::endpoints::create_proxy::<P>();
181 let () = self
182 .connect_with(server_end.into_channel())
183 .context("error connecting with server channel")?;
184 Ok(proxy)
185 }
186}
187
188pub fn clone_namespace_svc() -> Result<fio::DirectoryProxy, Error> {
190 fuchsia_fs::directory::open_in_namespace(SVC_DIR, fio::Flags::empty())
191 .context("error opening svc directory")
192}
193
194pub fn new_protocol_connector<P: DiscoverableProtocolMarker>()
197-> Result<ProtocolConnector<fio::DirectoryProxy, P>, Error> {
198 new_protocol_connector_at::<P>(SVC_DIR)
199}
200
201pub fn new_protocol_connector_at<P: DiscoverableProtocolMarker>(
206 service_directory_path: &str,
207) -> Result<ProtocolConnector<fio::DirectoryProxy, P>, Error> {
208 let dir = fuchsia_fs::directory::open_in_namespace(service_directory_path, fio::Flags::empty())
209 .context("error opening service directory")?;
210
211 Ok(ProtocolConnector::new(dir))
212}
213
214pub fn new_protocol_connector_in_dir<P: DiscoverableProtocolMarker>(
216 dir: &fio::DirectoryProxy,
217) -> ProtocolConnector<&fio::DirectoryProxy, P> {
218 ProtocolConnector::new(dir)
219}
220
221pub fn connect_channel_to_protocol<P: DiscoverableProtocolMarker>(
223 server_end: zx::Channel,
224) -> Result<(), Error> {
225 connect_channel_to_protocol_at::<P>(server_end, SVC_DIR)
226}
227
228pub fn connect_channel_to_protocol_at<P: DiscoverableProtocolMarker>(
230 server_end: zx::Channel,
231 service_directory_path: &str,
232) -> Result<(), Error> {
233 let protocol_path = format!("{}/{}", service_directory_path, P::PROTOCOL_NAME);
234 connect_channel_to_protocol_at_path(server_end, &protocol_path)
235}
236
237pub fn connect_channel_to_protocol_at_path(
239 server_end: zx::Channel,
240 protocol_path: &str,
241) -> Result<(), Error> {
242 fdio::service_connect(&protocol_path, server_end)
243 .with_context(|| format!("Error connecting to protocol path: {}", protocol_path))
244}
245
246pub fn connect_to_protocol<P: DiscoverableProtocolMarker>() -> Result<P::Proxy, Error> {
248 connect_to_protocol_at::<P>(SVC_DIR)
249}
250
251pub fn connect_to_protocol_sync<P: DiscoverableProtocolMarker>()
257-> Result<P::SynchronousProxy, Error> {
258 connect_to_protocol_sync_at::<P>(SVC_DIR)
259}
260
261pub fn connect_to_protocol_at<P: DiscoverableProtocolMarker>(
263 service_prefix: impl AsRef<str>,
264) -> Result<P::Proxy, Error> {
265 let (proxy, server_end) = fidl::endpoints::create_proxy::<P>();
266 let () =
267 connect_channel_to_protocol_at::<P>(server_end.into_channel(), service_prefix.as_ref())?;
268 Ok(proxy)
269}
270
271pub fn connect_to_protocol_sync_at<P: DiscoverableProtocolMarker>(
277 service_prefix: impl AsRef<str>,
278) -> Result<P::SynchronousProxy, Error> {
279 let (proxy, server_end) = fidl::endpoints::create_sync_proxy::<P>();
280 let () =
281 connect_channel_to_protocol_at::<P>(server_end.into_channel(), service_prefix.as_ref())?;
282 Ok(proxy)
283}
284
285pub fn connect_to_protocol_at_path<P: ProtocolMarker>(
287 protocol_path: impl AsRef<str>,
288) -> Result<P::Proxy, Error> {
289 let (proxy, server_end) = fidl::endpoints::create_proxy::<P>();
290 let () =
291 connect_channel_to_protocol_at_path(server_end.into_channel(), protocol_path.as_ref())?;
292 Ok(proxy)
293}
294
295pub fn connect_to_protocol_at_dir_root<P: DiscoverableProtocolMarker>(
297 directory: &impl AsRefDirectory,
298) -> Result<P::Proxy, Error> {
299 connect_to_named_protocol_at_dir_root::<P>(directory, P::PROTOCOL_NAME)
300}
301
302pub fn connect_to_named_protocol_at_dir_root<P: ProtocolMarker>(
304 directory: &impl AsRefDirectory,
305 filename: &str,
306) -> Result<P::Proxy, Error> {
307 let (proxy, server_end) = fidl::endpoints::create_proxy::<P>();
308 directory.as_ref_directory().open(
309 filename,
310 fio::Flags::PROTOCOL_SERVICE,
311 server_end.into_channel().into(),
312 )?;
313 Ok(proxy)
314}
315
316pub fn connect_to_protocol_at_dir_svc<P: DiscoverableProtocolMarker>(
318 directory: &impl AsRefDirectory,
319) -> Result<P::Proxy, Error> {
320 let protocol_path = format!("{}/{}", SVC_DIR, P::PROTOCOL_NAME);
321 connect_to_named_protocol_at_dir_root::<P>(directory, &protocol_path)
322}
323
324pub struct ServiceInstanceDirectory(pub fio::DirectoryProxy, pub String);
327
328impl MemberOpener for ServiceInstanceDirectory {
329 fn open_member(&self, member: &str, server_end: zx::Channel) -> Result<(), fidl::Error> {
330 let Self(directory, _) = self;
331 #[cfg(fuchsia_api_level_at_least = "27")]
332 return directory.open(
333 member,
334 fio::Flags::PROTOCOL_SERVICE,
335 &fio::Options::default(),
336 server_end,
337 );
338 #[cfg(not(fuchsia_api_level_at_least = "27"))]
339 return directory.open3(
340 member,
341 fio::Flags::PROTOCOL_SERVICE,
342 &fio::Options::default(),
343 server_end,
344 );
345 }
346 fn instance_name(&self) -> &str {
347 let Self(_, instance_name) = self;
348 return &instance_name;
349 }
350}
351
352struct ServiceInstance<S> {
355 name: String,
357 service: Service<S>,
358}
359
360impl<S: ServiceMarker> MemberOpener for ServiceInstance<S> {
361 fn open_member(&self, member: &str, server_end: zx::Channel) -> Result<(), fidl::Error> {
362 self.service.connect_to_instance_member_with_channel(&self.name, member, server_end)
363 }
364 fn instance_name(&self) -> &str {
365 return &self.name;
366 }
367}
368
369pub struct Service<S> {
371 dir: fio::DirectoryProxy,
372 _marker: S,
373}
374
375impl<S: Clone> Clone for Service<S> {
376 fn clone(&self) -> Self {
377 Self { dir: Clone::clone(&self.dir), _marker: self._marker.clone() }
378 }
379}
380
381impl<S> From<fio::DirectoryProxy> for Service<S>
384where
385 S: Default,
386{
387 fn from(dir: fio::DirectoryProxy) -> Self {
388 Self { dir, _marker: S::default() }
389 }
390}
391
392impl<S: ServiceMarker> Service<S> {
393 pub fn from_service_dir_proxy(dir: fio::DirectoryProxy, _marker: S) -> Self {
396 Self { dir, _marker }
397 }
398
399 pub fn open(marker: S) -> Result<Self, Error> {
401 Ok(Self::from_service_dir_proxy(open_service::<S>()?, marker))
402 }
403
404 pub fn open_at(service_name: impl AsRef<str>, marker: S) -> Result<Self, Error> {
406 Ok(Self::from_service_dir_proxy(open_service_at(service_name)?, marker))
407 }
408
409 pub fn open_from_dir(svc_dir: impl AsRefDirectory, marker: S) -> Result<Self, Error> {
411 let dir = open_directory_async(&svc_dir, S::SERVICE_NAME, fio::Rights::empty())?;
412 Ok(Self::from_service_dir_proxy(dir, marker))
413 }
414
415 pub fn open_from_dir_prefix(
418 dir: impl AsRefDirectory,
419 prefix: impl AsRef<str>,
420 marker: S,
421 ) -> Result<Self, Error> {
422 let prefix = prefix.as_ref();
423 let service_path = format!("{prefix}/{}", S::SERVICE_NAME);
424 let service_path = service_path.strip_prefix('/').unwrap_or_else(|| service_path.as_ref());
428 let dir = open_directory_async(&dir, &service_path, fio::Rights::empty())?;
429 Ok(Self::from_service_dir_proxy(dir, marker))
430 }
431
432 pub fn connect_to_instance(&self, name: impl AsRef<str>) -> Result<S::Proxy, Error> {
436 let directory_proxy = fuchsia_fs::directory::open_directory_async(
437 &self.dir,
438 name.as_ref(),
439 fio::Flags::empty(),
440 )?;
441 Ok(S::Proxy::from_member_opener(Box::new(ServiceInstanceDirectory(
442 directory_proxy,
443 name.as_ref().to_string(),
444 ))))
445 }
446
447 fn connect_to_instance_member_with_channel(
451 &self,
452 instance: impl AsRef<str>,
453 member: impl AsRef<str>,
454 server_end: zx::Channel,
455 ) -> Result<(), fidl::Error> {
456 let path = format!("{}/{}", instance.as_ref(), member.as_ref());
457 #[cfg(fuchsia_api_level_at_least = "27")]
458 return self.dir.open(
459 &path,
460 fio::Flags::PROTOCOL_SERVICE,
461 &fio::Options::default(),
462 server_end,
463 );
464 #[cfg(not(fuchsia_api_level_at_least = "27"))]
465 return self.dir.open3(
466 &path,
467 fio::Flags::PROTOCOL_SERVICE,
468 &fio::Options::default(),
469 server_end,
470 );
471 }
472
473 pub async fn watch(self) -> Result<ServiceInstanceStream<S>, Error> {
476 let watcher = Watcher::new(&self.dir).await?;
477 let finished = false;
478 Ok(ServiceInstanceStream { service: self, watcher, finished })
479 }
480
481 pub async fn watch_for_any(self) -> Result<S::Proxy, Error> {
483 self.watch()
484 .await?
485 .next()
486 .await
487 .context("No instances found before service directory was removed")?
488 }
489
490 pub async fn enumerate(self) -> Result<Vec<S::Proxy>, Error> {
492 let instances: Vec<S::Proxy> = fuchsia_fs::directory::readdir(&self.dir)
493 .await?
494 .into_iter()
495 .map(|dirent| {
496 S::Proxy::from_member_opener(Box::new(ServiceInstance {
497 service: self.clone(),
498 name: dirent.name,
499 }))
500 })
501 .collect();
502 Ok(instances)
503 }
504}
505
506#[pin_project]
512pub struct ServiceInstanceStream<S: Clone> {
513 service: Service<S>,
514 watcher: Watcher,
515 finished: bool,
516}
517
518impl<S: ServiceMarker> Stream for ServiceInstanceStream<S> {
519 type Item = Result<S::Proxy, Error>;
520
521 fn poll_next(
522 self: std::pin::Pin<&mut Self>,
523 cx: &mut std::task::Context<'_>,
524 ) -> Poll<Option<Self::Item>> {
525 let this = self.project();
526 use Poll::*;
527 if *this.finished {
528 return Poll::Ready(None);
529 }
530 while let Ready(next) = this.watcher.poll_next_unpin(cx) {
533 match next {
534 Some(Ok(state)) => match state.event {
535 WatchEvent::DELETED => {
536 *this.finished = true;
537 return Ready(None);
538 }
539 WatchEvent::ADD_FILE | WatchEvent::EXISTING => {
540 let filename = state.filename.to_str().unwrap();
541 if filename != "." {
542 let proxy = S::Proxy::from_member_opener(Box::new(ServiceInstance {
543 service: this.service.clone(),
544 name: filename.to_owned(),
545 }));
546 return Ready(Some(Ok(proxy)));
547 }
548 }
549 _ => {}
550 },
551 Some(Err(err)) => {
552 *this.finished = true;
553 return Ready(Some(Err(err.into())));
554 }
555 None => {
556 *this.finished = true;
557 return Ready(None);
558 }
559 }
560 }
561 Pending
562 }
563}
564
565impl<S: ServiceMarker> FusedStream for ServiceInstanceStream<S> {
566 fn is_terminated(&self) -> bool {
567 self.finished
568 }
569}
570
571pub fn connect_to_service_instance<S: ServiceMarker>(instance: &str) -> Result<S::Proxy, Error> {
578 let service_path = format!("{}/{}/{}", SVC_DIR, S::SERVICE_NAME, instance);
579 let directory_proxy =
580 fuchsia_fs::directory::open_in_namespace(&service_path, fio::Flags::empty())?;
581 Ok(S::Proxy::from_member_opener(Box::new(ServiceInstanceDirectory(
582 directory_proxy,
583 instance.to_string(),
584 ))))
585}
586
587pub fn connect_to_service_instance_at_dir<S: ServiceMarker>(
593 directory: &fio::DirectoryProxy,
594 instance: &str,
595) -> Result<S::Proxy, Error> {
596 let service_path = format!("{}/{}", S::SERVICE_NAME, instance);
597 let directory_proxy =
598 fuchsia_fs::directory::open_directory_async(directory, &service_path, fio::Flags::empty())?;
599 Ok(S::Proxy::from_member_opener(Box::new(ServiceInstanceDirectory(
600 directory_proxy,
601 instance.to_string(),
602 ))))
603}
604
605pub fn connect_to_service_instance_at_dir_svc<S: ServiceMarker>(
607 directory: &impl AsRefDirectory,
608 instance: impl AsRef<str>,
609) -> Result<S::Proxy, Error> {
610 let service_path = format!("{SVC_DIR}/{}/{}", S::SERVICE_NAME, instance.as_ref());
611 let service_path = service_path.strip_prefix('/').unwrap();
615 let directory_proxy = open_directory_async(directory, service_path, fio::Rights::empty())?;
616 Ok(S::Proxy::from_member_opener(Box::new(ServiceInstanceDirectory(
617 directory_proxy,
618 instance.as_ref().to_string(),
619 ))))
620}
621
622pub fn open_service<S: ServiceMarker>() -> Result<fio::DirectoryProxy, Error> {
624 let service_path = format!("{}/{}", SVC_DIR, S::SERVICE_NAME);
625 fuchsia_fs::directory::open_in_namespace(&service_path, fio::Flags::empty())
626 .context("namespace open failed")
627}
628
629pub fn open_service_at(service_name: impl AsRef<str>) -> Result<fio::DirectoryProxy, Error> {
631 let service_path = format!("{SVC_DIR}/{}", service_name.as_ref());
632 fuchsia_fs::directory::open_in_namespace(&service_path, fio::Flags::empty())
633 .context("namespace open failed")
634}
635
636pub async fn open_childs_exposed_directory(
639 child_name: impl Into<String>,
640 collection_name: Option<String>,
641) -> Result<fio::DirectoryProxy, Error> {
642 let realm_proxy = connect_to_protocol::<RealmMarker>()?;
643 let (directory_proxy, server_end) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
644 let child_ref = ChildRef { name: child_name.into(), collection: collection_name };
645 realm_proxy.open_exposed_dir(&child_ref, server_end).await?.map_err(|e| {
646 let ChildRef { name, collection } = child_ref;
647 format_err!("failed to bind to child {} in collection {:?}: {:?}", name, collection, e)
648 })?;
649 Ok(directory_proxy)
650}
651
652pub async fn connect_to_childs_protocol<P: DiscoverableProtocolMarker>(
655 child_name: String,
656 collection_name: Option<String>,
657) -> Result<P::Proxy, Error> {
658 let child_exposed_directory =
659 open_childs_exposed_directory(child_name, collection_name).await?;
660 connect_to_protocol_at_dir_root::<P>(&child_exposed_directory)
661}
662
663pub fn realm() -> Result<RealmProxy, Error> {
665 connect_to_protocol::<RealmMarker>()
666}
667
668#[cfg(test)]
669mod tests {
670 use std::collections::HashSet;
671 use std::sync::Arc;
672
673 use super::*;
674 use fidl::endpoints::ServiceMarker as _;
675 use fidl_fuchsia_component_client_test::{
676 ProtocolAMarker, ProtocolAProxy, ProtocolBMarker, ProtocolBProxy, ServiceMarker,
677 };
678 use fuchsia_async::{self as fasync};
679 use futures::{TryStreamExt, future};
680 use vfs::directory::simple::Simple;
681 use vfs::file::vmo::read_only;
682 use vfs::pseudo_directory;
683
684 #[fasync::run_singlethreaded(test)]
685 async fn test_svc_connector_svc_does_not_exist() -> Result<(), Error> {
686 let req = new_protocol_connector::<ProtocolAMarker>().context("error probing service")?;
687 assert_matches::assert_matches!(
688 req.exists().await.context("error checking service"),
689 Ok(false)
690 );
691 let _: ProtocolAProxy = req.connect().context("error connecting to service")?;
692
693 let req = new_protocol_connector_at::<ProtocolAMarker>(SVC_DIR)
694 .context("error probing service at svc dir")?;
695 assert_matches::assert_matches!(
696 req.exists().await.context("error checking service at svc dir"),
697 Ok(false)
698 );
699 let _: ProtocolAProxy = req.connect().context("error connecting to service at svc dir")?;
700
701 Ok(())
702 }
703
704 #[fasync::run_singlethreaded(test)]
705 async fn test_svc_connector_connect_with_dir() -> Result<(), Error> {
706 let dir = pseudo_directory! {
707 ProtocolBMarker::PROTOCOL_NAME => read_only("read_only"),
708 };
709 let dir_proxy = vfs::directory::serve_read_only(dir);
710 let req = new_protocol_connector_in_dir::<ProtocolAMarker>(&dir_proxy);
711 assert_matches::assert_matches!(
712 req.exists().await.context("error probing invalid service"),
713 Ok(false)
714 );
715 let _: ProtocolAProxy = req.connect().context("error connecting to invalid service")?;
716
717 let req = new_protocol_connector_in_dir::<ProtocolBMarker>(&dir_proxy);
718 assert_matches::assert_matches!(
719 req.exists().await.context("error probing service"),
720 Ok(true)
721 );
722 let _: ProtocolBProxy = req.connect().context("error connecting to service")?;
723
724 Ok(())
725 }
726
727 fn make_inner_service_instance_tree() -> Arc<Simple> {
728 pseudo_directory! {
729 ServiceMarker::SERVICE_NAME => pseudo_directory! {
730 "default" => read_only("read_only"),
731 "another_instance" => read_only("read_only"),
732 },
733 }
734 }
735
736 fn make_service_instance_tree() -> Arc<Simple> {
737 pseudo_directory! {
738 "svc" => make_inner_service_instance_tree(),
739 }
740 }
741
742 #[fasync::run_until_stalled(test)]
743 async fn test_service_instance_watcher_from_root() -> Result<(), Error> {
744 let dir_proxy = vfs::directory::serve_read_only(make_service_instance_tree());
745 let watcher = Service::open_from_dir_prefix(&dir_proxy, SVC_DIR, ServiceMarker)?;
746 let found_names: HashSet<_> = watcher
747 .watch()
748 .await?
749 .take(2)
750 .and_then(|proxy| future::ready(Ok(proxy.instance_name().to_owned())))
751 .try_collect()
752 .await?;
753
754 assert_eq!(
755 found_names,
756 HashSet::from_iter(["default".to_owned(), "another_instance".to_owned()])
757 );
758
759 Ok(())
760 }
761
762 #[fasync::run_until_stalled(test)]
763 async fn test_service_instance_watcher_from_svc() -> Result<(), Error> {
764 let dir_proxy = vfs::directory::serve_read_only(make_inner_service_instance_tree());
765 let watcher = Service::open_from_dir(&dir_proxy, ServiceMarker)?;
766 let found_names: HashSet<_> = watcher
767 .watch()
768 .await?
769 .take(2)
770 .and_then(|proxy| future::ready(Ok(proxy.instance_name().to_owned())))
771 .try_collect()
772 .await?;
773
774 assert_eq!(
775 found_names,
776 HashSet::from_iter(["default".to_owned(), "another_instance".to_owned()])
777 );
778
779 Ok(())
780 }
781
782 #[fasync::run_until_stalled(test)]
783 async fn test_connect_to_all_services() -> Result<(), Error> {
784 let dir_proxy = vfs::directory::serve_read_only(make_service_instance_tree());
785 let watcher = Service::open_from_dir_prefix(&dir_proxy, SVC_DIR, ServiceMarker)?;
786 let _: Vec<_> = watcher.watch().await?.take(2).try_collect().await?;
787
788 Ok(())
789 }
790
791 #[fasync::run_until_stalled(test)]
792 async fn test_connect_to_any() -> Result<(), Error> {
793 let dir_proxy = vfs::directory::serve_read_only(make_service_instance_tree());
794 let watcher = Service::open_from_dir_prefix(&dir_proxy, SVC_DIR, ServiceMarker)?;
795 let found = watcher.watch_for_any().await?;
796 assert!(["default", "another_instance"].contains(&found.instance_name()));
797
798 Ok(())
799 }
800}