1#![deny(missing_docs)]
8
9pub mod fidl_next;
10
11use anyhow::{Context as _, Error, format_err};
12use fidl::endpoints::{
13 DiscoverableProtocolMarker, FromClient, MemberOpener, ProtocolMarker, ServiceMarker,
14 ServiceProxy,
15};
16use fidl_fuchsia_component::{RealmMarker, RealmProxy};
17use fidl_fuchsia_component_decl::ChildRef;
18use fidl_fuchsia_io as fio;
19use fuchsia_component_directory::{AsRefDirectory, open_directory_async};
20use fuchsia_fs::directory::{WatchEvent, Watcher};
21use futures::stream::FusedStream;
22use futures::{Stream, StreamExt};
23use pin_project::pin_project;
24use std::borrow::Borrow;
25use std::marker::PhantomData;
26use std::pin::pin;
27use std::task::Poll;
28
29pub trait Connect: Sized + FromClient<Protocol: DiscoverableProtocolMarker> {
31 fn connect() -> Result<Self, Error> {
33 Self::connect_at(SVC_DIR)
34 }
35
36 fn connect_at(service_prefix: impl AsRef<str>) -> Result<Self, Error> {
38 let (client, server_end) = fidl::endpoints::create_endpoints::<Self::Protocol>();
39 let () = connect_channel_to_protocol_at::<Self::Protocol>(
40 server_end.into_channel(),
41 service_prefix.as_ref(),
42 )?;
43 Ok(Self::from_client(client))
44 }
45
46 fn connect_at_dir_svc(directory: &impl AsRefDirectory) -> Result<Self, Error> {
48 let protocol_path = format!("{}/{}", SVC_DIR, Self::Protocol::PROTOCOL_NAME);
49 Self::connect_at_dir_root_with_name(directory, &protocol_path)
50 }
51
52 fn connect_at_dir_root(directory: &impl AsRefDirectory) -> Result<Self, Error> {
54 Self::connect_at_dir_root_with_name(directory, Self::Protocol::PROTOCOL_NAME)
55 }
56
57 fn connect_at_dir_root_with_name(
59 directory: &impl AsRefDirectory,
60 filename: &str,
61 ) -> Result<Self, Error> {
62 let (client, server) = fidl::endpoints::create_endpoints::<Self::Protocol>();
63 directory.as_ref_directory().open(
64 filename,
65 fio::Flags::PROTOCOL_SERVICE,
66 server.into_channel().into(),
67 )?;
68 Ok(Self::from_client(client))
69 }
70}
71
72impl<T: FromClient<Protocol: DiscoverableProtocolMarker>> Connect for T {}
73
74pub mod connect {
80 use super::*;
81
82 pub fn connect_to_protocol<T: Connect>() -> Result<T, Error> {
84 T::connect()
85 }
86
87 pub fn connect_to_protocol_at<T: Connect>(service_prefix: impl AsRef<str>) -> Result<T, Error> {
89 T::connect_at(service_prefix)
90 }
91
92 pub fn connect_to_protocol_at_dir_svc<T: Connect>(
94 directory: &impl AsRefDirectory,
95 ) -> Result<T, Error> {
96 T::connect_at_dir_svc(directory)
97 }
98
99 pub fn connect_to_protocol_at_dir_root<T: Connect>(
101 directory: &impl AsRefDirectory,
102 ) -> Result<T, Error> {
103 T::connect_at_dir_root(directory)
104 }
105
106 pub fn connect_to_named_protocol_at_dir_root<T: Connect>(
108 directory: &impl AsRefDirectory,
109 filename: &str,
110 ) -> Result<T, Error> {
111 T::connect_at_dir_root_with_name(directory, filename)
112 }
113}
114
115pub const SVC_DIR: &'static str = "/svc";
117
118pub struct ProtocolConnector<D: Borrow<fio::DirectoryProxy>, P: DiscoverableProtocolMarker> {
120 svc_dir: D,
121 _svc_marker: PhantomData<P>,
122}
123
124impl<D: Borrow<fio::DirectoryProxy>, P: DiscoverableProtocolMarker> ProtocolConnector<D, P> {
125 fn new(svc_dir: D) -> ProtocolConnector<D, P> {
127 ProtocolConnector { svc_dir, _svc_marker: PhantomData }
128 }
129
130 pub async fn exists(&self) -> Result<bool, Error> {
135 match fuchsia_fs::directory::dir_contains(self.svc_dir.borrow(), P::PROTOCOL_NAME).await {
136 Ok(v) => Ok(v),
137 Err(fuchsia_fs::directory::EnumerateError::Fidl(
140 _,
141 fidl::Error::ClientChannelClosed { status, .. },
142 )) if status == zx::Status::PEER_CLOSED => Ok(false),
143 Err(e) => Err(Error::new(e).context("error checking for service entry in directory")),
144 }
145 }
146
147 pub fn connect_with(self, server_end: zx::Channel) -> Result<(), Error> {
152 #[cfg(fuchsia_api_level_at_least = "27")]
153 return self
154 .svc_dir
155 .borrow()
156 .open(
157 P::PROTOCOL_NAME,
158 fio::Flags::PROTOCOL_SERVICE,
159 &fio::Options::default(),
160 server_end.into(),
161 )
162 .context("error connecting to protocol");
163 #[cfg(not(fuchsia_api_level_at_least = "27"))]
164 return self
165 .svc_dir
166 .borrow()
167 .open3(
168 P::PROTOCOL_NAME,
169 fio::Flags::PROTOCOL_SERVICE,
170 &fio::Options::default(),
171 server_end.into(),
172 )
173 .context("error connecting to protocol");
174 }
175
176 pub fn connect(self) -> Result<P::Proxy, Error> {
181 let (proxy, server_end) = fidl::endpoints::create_proxy::<P>();
182 let () = self
183 .connect_with(server_end.into_channel())
184 .context("error connecting with server channel")?;
185 Ok(proxy)
186 }
187}
188
189pub fn clone_namespace_svc() -> Result<fio::DirectoryProxy, Error> {
191 fuchsia_fs::directory::open_in_namespace(SVC_DIR, fio::Flags::empty())
192 .context("error opening svc directory")
193}
194
195pub fn new_protocol_connector<P: DiscoverableProtocolMarker>()
198-> Result<ProtocolConnector<fio::DirectoryProxy, P>, Error> {
199 new_protocol_connector_at::<P>(SVC_DIR)
200}
201
202pub fn new_protocol_connector_at<P: DiscoverableProtocolMarker>(
207 service_directory_path: &str,
208) -> Result<ProtocolConnector<fio::DirectoryProxy, P>, Error> {
209 let dir = fuchsia_fs::directory::open_in_namespace(service_directory_path, fio::Flags::empty())
210 .context("error opening service directory")?;
211
212 Ok(ProtocolConnector::new(dir))
213}
214
215pub fn new_protocol_connector_in_dir<P: DiscoverableProtocolMarker>(
217 dir: &fio::DirectoryProxy,
218) -> ProtocolConnector<&fio::DirectoryProxy, P> {
219 ProtocolConnector::new(dir)
220}
221
222pub fn connect_channel_to_protocol<P: DiscoverableProtocolMarker>(
224 server_end: zx::Channel,
225) -> Result<(), Error> {
226 connect_channel_to_protocol_at::<P>(server_end, SVC_DIR)
227}
228
229pub fn connect_channel_to_protocol_at<P: DiscoverableProtocolMarker>(
231 server_end: zx::Channel,
232 service_directory_path: &str,
233) -> Result<(), Error> {
234 let protocol_path = format!("{}/{}", service_directory_path, P::PROTOCOL_NAME);
235 connect_channel_to_protocol_at_path(server_end, &protocol_path)
236}
237
238pub fn connect_channel_to_protocol_at_path(
240 server_end: zx::Channel,
241 protocol_path: &str,
242) -> Result<(), Error> {
243 fdio::service_connect(&protocol_path, server_end)
244 .with_context(|| format!("Error connecting to protocol path: {}", protocol_path))
245}
246
247pub fn connect_to_protocol<P: DiscoverableProtocolMarker>() -> Result<P::Proxy, Error> {
249 connect_to_protocol_at::<P>(SVC_DIR)
250}
251
252pub fn connect_to_protocol_sync<P: DiscoverableProtocolMarker>()
258-> Result<P::SynchronousProxy, Error> {
259 connect_to_protocol_sync_at::<P>(SVC_DIR)
260}
261
262pub fn connect_to_protocol_at<P: DiscoverableProtocolMarker>(
264 service_prefix: impl AsRef<str>,
265) -> Result<P::Proxy, Error> {
266 let (proxy, server_end) = fidl::endpoints::create_proxy::<P>();
267 let () =
268 connect_channel_to_protocol_at::<P>(server_end.into_channel(), service_prefix.as_ref())?;
269 Ok(proxy)
270}
271
272pub fn connect_to_protocol_sync_at<P: DiscoverableProtocolMarker>(
278 service_prefix: impl AsRef<str>,
279) -> Result<P::SynchronousProxy, Error> {
280 let (proxy, server_end) = fidl::endpoints::create_sync_proxy::<P>();
281 let () =
282 connect_channel_to_protocol_at::<P>(server_end.into_channel(), service_prefix.as_ref())?;
283 Ok(proxy)
284}
285
286pub fn connect_to_protocol_at_path<P: ProtocolMarker>(
288 protocol_path: impl AsRef<str>,
289) -> Result<P::Proxy, Error> {
290 let (proxy, server_end) = fidl::endpoints::create_proxy::<P>();
291 let () =
292 connect_channel_to_protocol_at_path(server_end.into_channel(), protocol_path.as_ref())?;
293 Ok(proxy)
294}
295
296pub fn connect_to_protocol_at_dir_root<P: DiscoverableProtocolMarker>(
298 directory: &impl AsRefDirectory,
299) -> Result<P::Proxy, Error> {
300 connect_to_named_protocol_at_dir_root::<P>(directory, P::PROTOCOL_NAME)
301}
302
303pub fn connect_to_named_protocol_at_dir_root<P: ProtocolMarker>(
305 directory: &impl AsRefDirectory,
306 filename: &str,
307) -> Result<P::Proxy, Error> {
308 let (proxy, server_end) = fidl::endpoints::create_proxy::<P>();
309 directory.as_ref_directory().open(
310 filename,
311 fio::Flags::PROTOCOL_SERVICE,
312 server_end.into_channel().into(),
313 )?;
314 Ok(proxy)
315}
316
317pub fn connect_to_protocol_at_dir_svc<P: DiscoverableProtocolMarker>(
319 directory: &impl AsRefDirectory,
320) -> Result<P::Proxy, Error> {
321 let protocol_path = format!("{}/{}", SVC_DIR, P::PROTOCOL_NAME);
322 connect_to_named_protocol_at_dir_root::<P>(directory, &protocol_path)
323}
324
325pub struct ServiceInstanceDirectory(pub fio::DirectoryProxy, pub String);
328
329impl MemberOpener for ServiceInstanceDirectory {
330 fn open_member(&self, member: &str, server_end: zx::Channel) -> Result<(), fidl::Error> {
331 let Self(directory, _) = self;
332 #[cfg(fuchsia_api_level_at_least = "27")]
333 return directory.open(
334 member,
335 fio::Flags::PROTOCOL_SERVICE,
336 &fio::Options::default(),
337 server_end,
338 );
339 #[cfg(not(fuchsia_api_level_at_least = "27"))]
340 return directory.open3(
341 member,
342 fio::Flags::PROTOCOL_SERVICE,
343 &fio::Options::default(),
344 server_end,
345 );
346 }
347 fn instance_name(&self) -> &str {
348 let Self(_, instance_name) = self;
349 return &instance_name;
350 }
351}
352
353struct ServiceInstance<S> {
356 name: String,
358 service: Service<S>,
359}
360
361impl<S: ServiceMarker> MemberOpener for ServiceInstance<S> {
362 fn open_member(&self, member: &str, server_end: zx::Channel) -> Result<(), fidl::Error> {
363 self.service.connect_to_instance_member_with_channel(&self.name, member, server_end)
364 }
365 fn instance_name(&self) -> &str {
366 return &self.name;
367 }
368}
369
370pub struct Service<S> {
372 dir: fio::DirectoryProxy,
373 _marker: S,
374}
375
376impl<S: Clone> Clone for Service<S> {
377 fn clone(&self) -> Self {
378 Self { dir: Clone::clone(&self.dir), _marker: self._marker.clone() }
379 }
380}
381
382impl<S> From<fio::DirectoryProxy> for Service<S>
385where
386 S: Default,
387{
388 fn from(dir: fio::DirectoryProxy) -> Self {
389 Self { dir, _marker: S::default() }
390 }
391}
392
393impl<S: ServiceMarker> Service<S> {
394 pub fn from_service_dir_proxy(dir: fio::DirectoryProxy, _marker: S) -> Self {
397 Self { dir, _marker }
398 }
399
400 pub fn open(marker: S) -> Result<Self, Error> {
402 Ok(Self::from_service_dir_proxy(open_service::<S>()?, marker))
403 }
404
405 pub fn open_at(service_name: impl AsRef<str>, marker: S) -> Result<Self, Error> {
407 Ok(Self::from_service_dir_proxy(open_service_at(service_name)?, marker))
408 }
409
410 pub fn open_from_dir(svc_dir: impl AsRefDirectory, marker: S) -> Result<Self, Error> {
412 let dir = open_directory_async(&svc_dir, S::SERVICE_NAME, fio::Rights::empty())?;
413 Ok(Self::from_service_dir_proxy(dir, marker))
414 }
415
416 pub fn open_from_dir_prefix(
419 dir: impl AsRefDirectory,
420 prefix: impl AsRef<str>,
421 marker: S,
422 ) -> Result<Self, Error> {
423 let prefix = prefix.as_ref();
424 let service_path = format!("{prefix}/{}", S::SERVICE_NAME);
425 let service_path = service_path.strip_prefix('/').unwrap_or_else(|| service_path.as_ref());
429 let dir = open_directory_async(&dir, &service_path, fio::Rights::empty())?;
430 Ok(Self::from_service_dir_proxy(dir, marker))
431 }
432
433 pub fn connect_to_instance(&self, name: impl AsRef<str>) -> Result<S::Proxy, Error> {
437 let directory_proxy = fuchsia_fs::directory::open_directory_async(
438 &self.dir,
439 name.as_ref(),
440 fio::Flags::empty(),
441 )?;
442 Ok(S::Proxy::from_member_opener(Box::new(ServiceInstanceDirectory(
443 directory_proxy,
444 name.as_ref().to_string(),
445 ))))
446 }
447
448 fn connect_to_instance_member_with_channel(
452 &self,
453 instance: impl AsRef<str>,
454 member: impl AsRef<str>,
455 server_end: zx::Channel,
456 ) -> Result<(), fidl::Error> {
457 let path = format!("{}/{}", instance.as_ref(), member.as_ref());
458 #[cfg(fuchsia_api_level_at_least = "27")]
459 return self.dir.open(
460 &path,
461 fio::Flags::PROTOCOL_SERVICE,
462 &fio::Options::default(),
463 server_end,
464 );
465 #[cfg(not(fuchsia_api_level_at_least = "27"))]
466 return self.dir.open3(
467 &path,
468 fio::Flags::PROTOCOL_SERVICE,
469 &fio::Options::default(),
470 server_end,
471 );
472 }
473
474 pub async fn watch(self) -> Result<ServiceInstanceStream<S>, Error> {
477 let watcher = Watcher::new(&self.dir).await?;
478 let finished = false;
479 Ok(ServiceInstanceStream { service: self, watcher, finished })
480 }
481
482 pub async fn watch_for_any(self) -> Result<S::Proxy, Error> {
484 self.watch()
485 .await?
486 .next()
487 .await
488 .context("No instances found before service directory was removed")?
489 }
490
491 pub async fn enumerate(self) -> Result<Vec<S::Proxy>, Error> {
493 let instances: Vec<S::Proxy> = fuchsia_fs::directory::readdir(&self.dir)
494 .await?
495 .into_iter()
496 .map(|dirent| {
497 S::Proxy::from_member_opener(Box::new(ServiceInstance {
498 service: self.clone(),
499 name: dirent.name,
500 }))
501 })
502 .collect();
503 Ok(instances)
504 }
505}
506
507#[pin_project]
513pub struct ServiceInstanceStream<S: Clone> {
514 service: Service<S>,
515 watcher: Watcher,
516 finished: bool,
517}
518
519impl<S: ServiceMarker> Stream for ServiceInstanceStream<S> {
520 type Item = Result<S::Proxy, Error>;
521
522 fn poll_next(
523 self: std::pin::Pin<&mut Self>,
524 cx: &mut std::task::Context<'_>,
525 ) -> Poll<Option<Self::Item>> {
526 let this = self.project();
527 use Poll::*;
528 if *this.finished {
529 return Poll::Ready(None);
530 }
531 while let Ready(next) = this.watcher.poll_next_unpin(cx) {
534 match next {
535 Some(Ok(state)) => match state.event {
536 WatchEvent::DELETED => {
537 *this.finished = true;
538 return Ready(None);
539 }
540 WatchEvent::ADD_FILE | WatchEvent::EXISTING => {
541 let filename = state.filename.to_str().unwrap();
542 if filename != "." {
543 let proxy = S::Proxy::from_member_opener(Box::new(ServiceInstance {
544 service: this.service.clone(),
545 name: filename.to_owned(),
546 }));
547 return Ready(Some(Ok(proxy)));
548 }
549 }
550 _ => {}
551 },
552 Some(Err(err)) => {
553 *this.finished = true;
554 return Ready(Some(Err(err.into())));
555 }
556 None => {
557 *this.finished = true;
558 return Ready(None);
559 }
560 }
561 }
562 Pending
563 }
564}
565
566impl<S: ServiceMarker> FusedStream for ServiceInstanceStream<S> {
567 fn is_terminated(&self) -> bool {
568 self.finished
569 }
570}
571
572pub fn connect_to_service_instance<S: ServiceMarker>(instance: &str) -> Result<S::Proxy, Error> {
579 let service_path = format!("{}/{}/{}", SVC_DIR, S::SERVICE_NAME, instance);
580 let directory_proxy =
581 fuchsia_fs::directory::open_in_namespace(&service_path, fio::Flags::empty())?;
582 Ok(S::Proxy::from_member_opener(Box::new(ServiceInstanceDirectory(
583 directory_proxy,
584 instance.to_string(),
585 ))))
586}
587
588pub fn connect_to_service_instance_at_dir<S: ServiceMarker>(
594 directory: &fio::DirectoryProxy,
595 instance: &str,
596) -> Result<S::Proxy, Error> {
597 let service_path = format!("{}/{}", S::SERVICE_NAME, instance);
598 let directory_proxy =
599 fuchsia_fs::directory::open_directory_async(directory, &service_path, fio::Flags::empty())?;
600 Ok(S::Proxy::from_member_opener(Box::new(ServiceInstanceDirectory(
601 directory_proxy,
602 instance.to_string(),
603 ))))
604}
605
606pub fn connect_to_service_instance_at_dir_svc<S: ServiceMarker>(
608 directory: &impl AsRefDirectory,
609 instance: impl AsRef<str>,
610) -> Result<S::Proxy, Error> {
611 let service_path = format!("{SVC_DIR}/{}/{}", S::SERVICE_NAME, instance.as_ref());
612 let service_path = service_path.strip_prefix('/').unwrap();
616 let directory_proxy = open_directory_async(directory, service_path, fio::Rights::empty())?;
617 Ok(S::Proxy::from_member_opener(Box::new(ServiceInstanceDirectory(
618 directory_proxy,
619 instance.as_ref().to_string(),
620 ))))
621}
622
623pub fn open_service<S: ServiceMarker>() -> Result<fio::DirectoryProxy, Error> {
625 let service_path = format!("{}/{}", SVC_DIR, S::SERVICE_NAME);
626 fuchsia_fs::directory::open_in_namespace(&service_path, fio::Flags::empty())
627 .context("namespace open failed")
628}
629
630pub fn open_service_at(service_name: impl AsRef<str>) -> Result<fio::DirectoryProxy, Error> {
632 let service_path = format!("{SVC_DIR}/{}", service_name.as_ref());
633 fuchsia_fs::directory::open_in_namespace(&service_path, fio::Flags::empty())
634 .context("namespace open failed")
635}
636
637pub async fn open_childs_exposed_directory(
640 child_name: impl Into<String>,
641 collection_name: Option<String>,
642) -> Result<fio::DirectoryProxy, Error> {
643 let realm_proxy = connect_to_protocol::<RealmMarker>()?;
644 let (directory_proxy, server_end) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
645 let child_ref = ChildRef { name: child_name.into(), collection: collection_name };
646 realm_proxy.open_exposed_dir(&child_ref, server_end).await?.map_err(|e| {
647 let ChildRef { name, collection } = child_ref;
648 format_err!("failed to bind to child {} in collection {:?}: {:?}", name, collection, e)
649 })?;
650 Ok(directory_proxy)
651}
652
653pub async fn connect_to_childs_protocol<P: DiscoverableProtocolMarker>(
656 child_name: String,
657 collection_name: Option<String>,
658) -> Result<P::Proxy, Error> {
659 let child_exposed_directory =
660 open_childs_exposed_directory(child_name, collection_name).await?;
661 connect_to_protocol_at_dir_root::<P>(&child_exposed_directory)
662}
663
664pub fn realm() -> Result<RealmProxy, Error> {
666 connect_to_protocol::<RealmMarker>()
667}
668
669#[cfg(test)]
670mod tests {
671 use std::collections::HashSet;
672 use std::sync::Arc;
673
674 use super::*;
675 use fidl::endpoints::ServiceMarker as _;
676 use fidl_fuchsia_component_client_test::{
677 ProtocolAMarker, ProtocolAProxy, ProtocolBMarker, ProtocolBProxy, ServiceMarker,
678 };
679 use fuchsia_async::{self as fasync};
680 use futures::{TryStreamExt, future};
681 use vfs::directory::simple::Simple;
682 use vfs::file::vmo::read_only;
683 use vfs::pseudo_directory;
684
685 #[fasync::run_singlethreaded(test)]
686 async fn test_svc_connector_svc_does_not_exist() -> Result<(), Error> {
687 let req = new_protocol_connector::<ProtocolAMarker>().context("error probing service")?;
688 assert_matches::assert_matches!(
689 req.exists().await.context("error checking service"),
690 Ok(false)
691 );
692 let _: ProtocolAProxy = req.connect().context("error connecting to service")?;
693
694 let req = new_protocol_connector_at::<ProtocolAMarker>(SVC_DIR)
695 .context("error probing service at svc dir")?;
696 assert_matches::assert_matches!(
697 req.exists().await.context("error checking service at svc dir"),
698 Ok(false)
699 );
700 let _: ProtocolAProxy = req.connect().context("error connecting to service at svc dir")?;
701
702 Ok(())
703 }
704
705 #[fasync::run_singlethreaded(test)]
706 async fn test_svc_connector_connect_with_dir() -> Result<(), Error> {
707 let dir = pseudo_directory! {
708 ProtocolBMarker::PROTOCOL_NAME => read_only("read_only"),
709 };
710 let dir_proxy = vfs::directory::serve_read_only(dir);
711 let req = new_protocol_connector_in_dir::<ProtocolAMarker>(&dir_proxy);
712 assert_matches::assert_matches!(
713 req.exists().await.context("error probing invalid service"),
714 Ok(false)
715 );
716 let _: ProtocolAProxy = req.connect().context("error connecting to invalid service")?;
717
718 let req = new_protocol_connector_in_dir::<ProtocolBMarker>(&dir_proxy);
719 assert_matches::assert_matches!(
720 req.exists().await.context("error probing service"),
721 Ok(true)
722 );
723 let _: ProtocolBProxy = req.connect().context("error connecting to service")?;
724
725 Ok(())
726 }
727
728 fn make_inner_service_instance_tree() -> Arc<Simple> {
729 pseudo_directory! {
730 ServiceMarker::SERVICE_NAME => pseudo_directory! {
731 "default" => read_only("read_only"),
732 "another_instance" => read_only("read_only"),
733 },
734 }
735 }
736
737 fn make_service_instance_tree() -> Arc<Simple> {
738 pseudo_directory! {
739 "svc" => make_inner_service_instance_tree(),
740 }
741 }
742
743 #[fasync::run_until_stalled(test)]
744 async fn test_service_instance_watcher_from_root() -> Result<(), Error> {
745 let dir_proxy = vfs::directory::serve_read_only(make_service_instance_tree());
746 let watcher = Service::open_from_dir_prefix(&dir_proxy, SVC_DIR, ServiceMarker)?;
747 let found_names: HashSet<_> = watcher
748 .watch()
749 .await?
750 .take(2)
751 .and_then(|proxy| future::ready(Ok(proxy.instance_name().to_owned())))
752 .try_collect()
753 .await?;
754
755 assert_eq!(
756 found_names,
757 HashSet::from_iter(["default".to_owned(), "another_instance".to_owned()])
758 );
759
760 Ok(())
761 }
762
763 #[fasync::run_until_stalled(test)]
764 async fn test_service_instance_watcher_from_svc() -> Result<(), Error> {
765 let dir_proxy = vfs::directory::serve_read_only(make_inner_service_instance_tree());
766 let watcher = Service::open_from_dir(&dir_proxy, ServiceMarker)?;
767 let found_names: HashSet<_> = watcher
768 .watch()
769 .await?
770 .take(2)
771 .and_then(|proxy| future::ready(Ok(proxy.instance_name().to_owned())))
772 .try_collect()
773 .await?;
774
775 assert_eq!(
776 found_names,
777 HashSet::from_iter(["default".to_owned(), "another_instance".to_owned()])
778 );
779
780 Ok(())
781 }
782
783 #[fasync::run_until_stalled(test)]
784 async fn test_connect_to_all_services() -> Result<(), Error> {
785 let dir_proxy = vfs::directory::serve_read_only(make_service_instance_tree());
786 let watcher = Service::open_from_dir_prefix(&dir_proxy, SVC_DIR, ServiceMarker)?;
787 let _: Vec<_> = watcher.watch().await?.take(2).try_collect().await?;
788
789 Ok(())
790 }
791
792 #[fasync::run_until_stalled(test)]
793 async fn test_connect_to_any() -> Result<(), Error> {
794 let dir_proxy = vfs::directory::serve_read_only(make_service_instance_tree());
795 let watcher = Service::open_from_dir_prefix(&dir_proxy, SVC_DIR, ServiceMarker)?;
796 let found = watcher.watch_for_any().await?;
797 assert!(["default", "another_instance"].contains(&found.instance_name()));
798
799 Ok(())
800 }
801}