1use anyhow::{Context as _, Error, format_err};
6use fidl::endpoints::{
7 ClientEnd, ControlHandle, DiscoverableProtocolMarker, RequestStream, ServerEnd, ServiceMarker,
8 ServiceProxy, create_request_stream,
9};
10use fidl_fuchsia_component as fcomponent;
11use fidl_fuchsia_component_runner as fcrunner;
12use fidl_fuchsia_component_test as ftest;
13use fidl_fuchsia_data as fdata;
14use fidl_fuchsia_io as fio;
15use fidl_fuchsia_process as fprocess;
16use fuchsia_async as fasync;
17use fuchsia_component::DEFAULT_SERVICE_INSTANCE;
18use fuchsia_component::client::Connect;
19use futures::channel::oneshot;
20use futures::future::BoxFuture;
21use futures::lock::Mutex;
22use futures::{FutureExt, TryStreamExt, select};
23use log::*;
24use runner::get_value as get_dictionary_value;
25use std::collections::HashMap;
26use std::sync::Arc;
27use vfs::execution_scope::ExecutionScope;
28
29pub struct LocalComponentHandles {
32 namespace: HashMap<String, fio::DirectoryProxy>,
33 numbered_handles: HashMap<u32, zx::NullableHandle>,
34
35 stop_notifier: Arc<Mutex<Option<oneshot::Sender<()>>>>,
36
37 pub outgoing_dir: ServerEnd<fio::DirectoryMarker>,
40}
41
42impl LocalComponentHandles {
43 fn new(
44 fidl_namespace: Vec<fcrunner::ComponentNamespaceEntry>,
45 fidl_numbered_handles: Vec<fprocess::HandleInfo>,
46 outgoing_dir: ServerEnd<fio::DirectoryMarker>,
47 ) -> Result<(Self, Arc<Mutex<Option<oneshot::Sender<()>>>>), Error> {
48 let stop_notifier = Arc::new(Mutex::new(None));
49 let mut namespace = HashMap::new();
50 for namespace_entry in fidl_namespace {
51 namespace.insert(
52 namespace_entry.path.ok_or_else(|| format_err!("namespace entry missing path"))?,
53 namespace_entry
54 .directory
55 .ok_or_else(|| format_err!("namespace entry missing directory handle"))?
56 .into_proxy(),
57 );
58 }
59 let numbered_handles =
60 fidl_numbered_handles.into_iter().map(|h| (h.id, h.handle)).collect::<HashMap<_, _>>();
61 Ok((
62 Self {
63 namespace,
64 numbered_handles,
65 outgoing_dir,
66 stop_notifier: stop_notifier.clone(),
67 },
68 stop_notifier,
69 ))
70 }
71
72 pub fn take_numbered_handle(&mut self, id: u32) -> Option<zx::NullableHandle> {
73 self.numbered_handles.remove(&id)
74 }
75
76 pub fn numbered_handles(&self) -> &HashMap<u32, zx::NullableHandle> {
77 &self.numbered_handles
78 }
79
80 pub async fn register_stop_notifier(&self) -> oneshot::Receiver<()> {
91 let mut stop_notifier_guard = self.stop_notifier.lock().await;
92 if stop_notifier_guard.is_some() {
93 panic!("cannot register multiple stop handlers for a single local component");
94 }
95 let (sender, receiver) = oneshot::channel();
96 *stop_notifier_guard = Some(sender);
97 receiver
98 }
99
100 pub fn connect_to_protocol<T: Connect>(&self) -> Result<T, Error> {
102 self.connect_to_named_protocol(T::Protocol::PROTOCOL_NAME)
103 }
104
105 pub fn connect_to_named_protocol<T: Connect>(&self, name: &str) -> Result<T, Error> {
107 let svc_dir_proxy = self.namespace.get("/svc").ok_or_else(|| {
108 format_err!("the component's namespace doesn't have a /svc directory")
109 })?;
110 T::connect_at_dir_root_with_name(svc_dir_proxy, name)
111 }
112
113 pub fn open_service<S: ServiceMarker>(&self) -> Result<fio::DirectoryProxy, Error> {
115 self.open_named_service(S::SERVICE_NAME)
116 }
117
118 pub fn open_named_service(&self, name: &str) -> Result<fio::DirectoryProxy, Error> {
121 let svc_dir_proxy = self.namespace.get("/svc").ok_or_else(|| {
122 format_err!("the component's namespace doesn't have a /svc directory")
123 })?;
124 fuchsia_fs::directory::open_directory_async(&svc_dir_proxy, name, fio::PERM_READABLE)
125 .map_err(Into::into)
126 }
127
128 pub fn connect_to_service<S: ServiceMarker>(&self) -> Result<S::Proxy, Error> {
131 self.connect_to_service_instance::<S>(DEFAULT_SERVICE_INSTANCE)
132 }
133
134 pub fn connect_to_service_instance<S: ServiceMarker>(
138 &self,
139 instance_name: &str,
140 ) -> Result<S::Proxy, Error> {
141 self.connect_to_named_service_instance::<S>(S::SERVICE_NAME, instance_name)
142 }
143
144 pub fn connect_to_named_service_instance<S: ServiceMarker>(
148 &self,
149 service_name: &str,
150 instance_name: &str,
151 ) -> Result<S::Proxy, Error> {
152 let service_dir = self.open_named_service(service_name)?;
153 let directory_proxy = fuchsia_fs::directory::open_directory_async(
154 &service_dir,
155 instance_name,
156 fio::PERM_READABLE,
157 )?;
158 Ok(S::Proxy::from_member_opener(Box::new(
159 fuchsia_component::client::ServiceInstanceDirectory(
160 directory_proxy,
161 instance_name.to_owned(),
162 ),
163 )))
164 }
165
166 pub fn clone_from_namespace(&self, directory_name: &str) -> Result<fio::DirectoryProxy, Error> {
179 let dir_proxy = self.namespace.get(&format!("/{}", directory_name)).ok_or_else(|| {
180 format_err!(
181 "the local component's namespace doesn't have a /{} directory",
182 directory_name
183 )
184 })?;
185 fuchsia_fs::directory::clone(&dir_proxy).context("clone")
186 }
187}
188
189type LocalComponentImplementations = HashMap<
190 String,
191 Arc<
192 dyn Fn(LocalComponentHandles) -> BoxFuture<'static, Result<(), Error>>
193 + Sync
194 + Send
195 + 'static,
196 >,
197>;
198
199#[derive(Clone, Debug)]
200pub struct LocalComponentRunnerBuilder {
201 local_component_implementations: Arc<Mutex<Option<LocalComponentImplementations>>>,
202}
203
204impl LocalComponentRunnerBuilder {
205 pub fn new() -> Self {
206 Self { local_component_implementations: Arc::new(Mutex::new(Some(HashMap::new()))) }
207 }
208
209 pub(crate) async fn register_local_component<I>(
210 &self,
211 name: String,
212 implementation: I,
213 ) -> Result<(), ftest::RealmBuilderError>
214 where
215 I: Fn(LocalComponentHandles) -> BoxFuture<'static, Result<(), Error>>
216 + Sync
217 + Send
218 + 'static,
219 {
220 self.local_component_implementations
221 .lock()
222 .await
223 .as_mut()
224 .ok_or(ftest::RealmBuilderError::BuildAlreadyCalled)?
225 .insert(name, Arc::new(implementation));
226 Ok(())
227 }
228
229 pub(crate) async fn build(
230 self,
231 ) -> Result<
232 (ClientEnd<fcrunner::ComponentRunnerMarker>, fasync::Task<()>),
233 ftest::RealmBuilderError,
234 > {
235 let local_component_implementations = self
236 .local_component_implementations
237 .lock()
238 .await
239 .take()
240 .ok_or(ftest::RealmBuilderError::BuildAlreadyCalled)?;
241 let (runner_client_end, runner_request_stream) =
242 create_request_stream::<fcrunner::ComponentRunnerMarker>();
243 let runner = LocalComponentRunner::new(local_component_implementations);
244 let runner_task = fasync::Task::spawn(async move {
245 if let Err(e) = runner.handle_stream(runner_request_stream).await {
246 error!("failed to run local component runner: {:?}", e);
247 }
248 });
249
250 Ok((runner_client_end, runner_task))
251 }
252}
253
254pub struct LocalComponentRunner {
255 execution_scope: ExecutionScope,
256 local_component_implementations: HashMap<
257 String,
258 Arc<
259 dyn Fn(LocalComponentHandles) -> BoxFuture<'static, Result<(), Error>>
260 + Sync
261 + Send
262 + 'static,
263 >,
264 >,
265}
266
267impl Drop for LocalComponentRunner {
268 fn drop(&mut self) {
269 self.execution_scope.shutdown();
270 }
271}
272
273impl LocalComponentRunner {
274 fn new(
275 local_component_implementations: HashMap<
276 String,
277 Arc<
278 dyn Fn(LocalComponentHandles) -> BoxFuture<'static, Result<(), Error>>
279 + Sync
280 + Send
281 + 'static,
282 >,
283 >,
284 ) -> Self {
285 Self { local_component_implementations, execution_scope: ExecutionScope::new() }
286 }
287
288 async fn handle_stream(
289 &self,
290 mut runner_request_stream: fcrunner::ComponentRunnerRequestStream,
291 ) -> Result<(), Error> {
292 while let Some(req) = runner_request_stream.try_next().await? {
293 match req {
294 fcrunner::ComponentRunnerRequest::Start { start_info, controller, .. } => {
295 let program = start_info
296 .program
297 .ok_or_else(|| format_err!("program is missing from start_info"))?;
298 let namespace = start_info
299 .ns
300 .ok_or_else(|| format_err!("namespace is missing from start_info"))?;
301 let numbered_handles = start_info.numbered_handles.unwrap_or_default();
302 let outgoing_dir = start_info
303 .outgoing_dir
304 .ok_or_else(|| format_err!("outgoing_dir is missing from start_info"))?;
305 let _runtime_dir_server_end: ServerEnd<fio::DirectoryMarker> = start_info
306 .runtime_dir
307 .ok_or_else(|| format_err!("runtime_dir is missing from start_info"))?;
308
309 let local_component_name = extract_local_component_name(program)?;
310 let local_component_implementation = self
311 .local_component_implementations
312 .get(&local_component_name)
313 .ok_or_else(|| {
314 format_err!("no such local component: {:?}", local_component_name)
315 })?
316 .clone();
317 let (component_handles, stop_notifier) =
318 LocalComponentHandles::new(namespace, numbered_handles, outgoing_dir)?;
319
320 let mut controller_request_stream = controller.into_stream();
321 self.execution_scope.spawn(async move {
322 let mut local_component_implementation_fut =
323 (*local_component_implementation)(component_handles).fuse();
324 let controller_control_handle = controller_request_stream.control_handle();
325 let mut controller_request_fut =
326 controller_request_stream.try_next().fuse();
327 loop {
328 select! {
329 res = local_component_implementation_fut => {
330 let epitaph = match res {
331 Err(e) => {
332 error!(
333 "the local component {:?} returned an error: {:?}",
334 local_component_name,
335 e,
336 );
337 zx::Status::from_raw(fcomponent::Error::InstanceDied.into_primitive() as i32)
338 }
339 Ok(()) => zx::Status::OK,
340 };
341 controller_control_handle.shutdown_with_epitaph(epitaph);
342 return;
343 }
344 req_res = controller_request_fut => {
345 match req_res.expect("invalid controller request") {
346 Some(fcrunner::ComponentControllerRequest::Stop { .. }) => {
347 if let Some(stop_notifier) =
348 stop_notifier.lock().await.take()
349 {
350 let _ = stop_notifier.send(());
356
357 controller_request_fut = controller_request_stream.try_next().fuse();
360 } else {
361 controller_control_handle.shutdown_with_epitaph(
362 zx::Status::from_raw(fcomponent::Error::InstanceDied.into_primitive() as i32),
363 );
364 return;
365 }
366 }
367 Some(fcrunner::ComponentControllerRequest::Kill { .. }) => {
368 controller_control_handle.shutdown_with_epitaph(
369 zx::Status::from_raw(fcomponent::Error::InstanceDied.into_primitive() as i32),
370 );
371 return;
372 }
373 _ => return,
374 }
375 }
376 };
377 }
378 });
379 }
380 fcrunner::ComponentRunnerRequest::_UnknownMethod { ordinal, .. } => {
381 warn!(ordinal:%; "Unknown ComponentController request");
382 }
383 }
384 }
385 Ok(())
386 }
387}
388
389fn extract_local_component_name(dict: fdata::Dictionary) -> Result<String, Error> {
390 let entry_value = get_dictionary_value(&dict, ftest::LOCAL_COMPONENT_NAME_KEY)
391 .ok_or_else(|| format_err!("program section is missing component name"))?;
392 if let fdata::DictionaryValue::Str(s) = entry_value {
393 return Ok(s.clone());
394 } else {
395 return Err(format_err!("malformed program section"));
396 }
397}
398
399#[cfg(test)]
400mod tests {
401 use super::*;
402 use assert_matches::assert_matches;
403 use fidl::endpoints::{Proxy as _, create_proxy};
404 use futures::future::pending;
405 use zx::AsHandleRef;
406
407 #[fuchsia::test]
408 async fn runner_builder_correctly_stores_a_function() {
409 let runner_builder = LocalComponentRunnerBuilder::new();
410 let (sender, receiver) = oneshot::channel();
411 let sender = Arc::new(Mutex::new(Some(sender)));
412
413 let component_name = "test".to_string();
414
415 runner_builder
416 .register_local_component(component_name.clone(), move |_handles| {
417 let sender = sender.clone();
418 async move {
419 let sender = sender.lock().await.take().expect("local component invoked twice");
420 sender.send(()).expect("failed to send");
421 Ok(())
422 }
423 .boxed()
424 })
425 .await
426 .unwrap();
427
428 let (_, outgoing_dir) = create_proxy();
429 let handles = LocalComponentHandles {
430 namespace: HashMap::new(),
431 numbered_handles: HashMap::new(),
432 outgoing_dir,
433 stop_notifier: Arc::new(Mutex::new(None)),
434 };
435 let local_component_implementation = runner_builder
436 .local_component_implementations
437 .lock()
438 .await
439 .as_ref()
440 .unwrap()
441 .get(&component_name)
442 .expect("local component missing from runner builder")
443 .clone();
444
445 (*local_component_implementation)(handles)
446 .await
447 .expect("local component implementation failed");
448 let () = receiver.await.expect("failed to receive");
449 }
450
451 struct RunnerAndHandles {
452 _runner_task: fasync::Task<()>,
453 _component_runner_proxy: fcrunner::ComponentRunnerProxy,
454 _runtime_dir_proxy: fio::DirectoryProxy,
455 outgoing_dir_proxy: fio::DirectoryProxy,
456 controller_proxy: fcrunner::ComponentControllerProxy,
457 }
458
459 async fn build_and_start(
460 runner_builder: LocalComponentRunnerBuilder,
461 component_to_start: String,
462 ) -> RunnerAndHandles {
463 let (component_runner_client_end, runner_task) = runner_builder.build().await.unwrap();
464 let component_runner_proxy = component_runner_client_end.into_proxy();
465
466 let (runtime_dir_proxy, runtime_dir_server_end) = create_proxy();
467 let (outgoing_dir_proxy, outgoing_dir_server_end) = create_proxy();
468 let (controller_proxy, controller_server_end) = create_proxy();
469 component_runner_proxy
470 .start(
471 fcrunner::ComponentStartInfo {
472 resolved_url: Some("test://test".to_string()),
473 program: Some(fdata::Dictionary {
474 entries: Some(vec![fdata::DictionaryEntry {
475 key: ftest::LOCAL_COMPONENT_NAME_KEY.to_string(),
476 value: Some(Box::new(fdata::DictionaryValue::Str(component_to_start))),
477 }]),
478 ..Default::default()
479 }),
480 ns: Some(vec![]),
481 outgoing_dir: Some(outgoing_dir_server_end),
482 runtime_dir: Some(runtime_dir_server_end),
483 numbered_handles: Some(vec![]),
484 ..Default::default()
485 },
486 controller_server_end,
487 )
488 .expect("failed to send start");
489
490 RunnerAndHandles {
491 _runner_task: runner_task,
492 _component_runner_proxy: component_runner_proxy,
493 _runtime_dir_proxy: runtime_dir_proxy,
494 outgoing_dir_proxy,
495 controller_proxy,
496 }
497 }
498
499 #[fuchsia::test]
500 async fn the_runner_runs_a_component() {
501 let runner_builder = LocalComponentRunnerBuilder::new();
502 let (sender, receiver) = oneshot::channel();
503 let sender = Arc::new(Mutex::new(Some(sender)));
504
505 let component_name = "test".to_string();
506
507 runner_builder
508 .register_local_component(component_name.clone(), move |_handles| {
509 let sender = sender.clone();
510 async move {
511 let sender = sender.lock().await.take().expect("local component invoked twice");
512 sender.send(()).expect("failed to send");
513 Ok(())
514 }
515 .boxed()
516 })
517 .await
518 .unwrap();
519
520 let _runner_and_handles = build_and_start(runner_builder, component_name).await;
521
522 let () = receiver.await.expect("failed to receive");
523 }
524
525 #[fuchsia::test]
526 async fn the_runner_gives_the_component_its_outgoing_dir() {
527 let runner_builder = LocalComponentRunnerBuilder::new();
528 let (sender, receiver) = oneshot::channel::<ServerEnd<fio::DirectoryMarker>>();
529 let sender = Arc::new(Mutex::new(Some(sender)));
530
531 let component_name = "test".to_string();
532
533 runner_builder
534 .register_local_component(component_name.clone(), move |handles| {
535 let sender = sender.clone();
536 async move {
537 let _ = &handles;
538 sender
539 .lock()
540 .await
541 .take()
542 .expect("local component invoked twice")
543 .send(handles.outgoing_dir)
544 .expect("failed to send");
545 Ok(())
546 }
547 .boxed()
548 })
549 .await
550 .unwrap();
551
552 let runner_and_handles = build_and_start(runner_builder, component_name.clone()).await;
553
554 let outgoing_dir_server_end = receiver.await.expect("failed to receive");
555
556 assert_eq!(
557 outgoing_dir_server_end
558 .into_channel()
559 .as_handle_ref()
560 .basic_info()
561 .expect("failed to get basic info")
562 .koid,
563 runner_and_handles
564 .outgoing_dir_proxy
565 .into_channel()
566 .expect("failed to convert to channel")
567 .as_handle_ref()
568 .basic_info()
569 .expect("failed to get basic info")
570 .related_koid,
571 );
572 }
573
574 #[fuchsia::test]
575 async fn controller_stop_will_stop_a_component() {
576 let runner_builder = LocalComponentRunnerBuilder::new();
577 let (sender, receiver) = oneshot::channel::<()>();
578 let sender = Arc::new(Mutex::new(Some(sender)));
579
580 let component_name = "test".to_string();
581
582 runner_builder
583 .register_local_component(component_name.clone(), move |_handles| {
584 let sender = sender.clone();
585 async move {
586 let _sender =
587 sender.lock().await.take().expect("local component invoked twice");
588 pending().await
591 }
592 .boxed()
593 })
594 .await
595 .unwrap();
596
597 let runner_and_handles = build_and_start(runner_builder, component_name).await;
598 runner_and_handles.controller_proxy.stop().expect("failed to send stop");
599
600 assert_eq!(Err(oneshot::Canceled), receiver.await);
601 }
602
603 #[fuchsia::test]
604 async fn controller_kill_will_kill_a_component() {
605 let runner_builder = LocalComponentRunnerBuilder::new();
606 let (sender, receiver) = oneshot::channel::<()>();
607 let sender = Arc::new(Mutex::new(Some(sender)));
608
609 let component_name = "test".to_string();
610
611 runner_builder
612 .register_local_component(component_name.clone(), move |_handles| {
613 let sender = sender.clone();
614 async move {
615 let _sender =
616 sender.lock().await.take().expect("local component invoked twice");
617 pending().await
620 }
621 .boxed()
622 })
623 .await
624 .unwrap();
625
626 let runner_and_handles = build_and_start(runner_builder, component_name).await;
627 runner_and_handles.controller_proxy.kill().expect("failed to send stop");
628
629 assert_eq!(Err(oneshot::Canceled), receiver.await);
630 }
631
632 #[fuchsia::test]
633 async fn stopping_a_component_calls_the_notifier() {
634 let runner_builder = LocalComponentRunnerBuilder::new();
635 let (notifier_registered_sender, notifier_registered_receiver) = oneshot::channel::<()>();
636 let notifier_registered_sender = Arc::new(Mutex::new(Some(notifier_registered_sender)));
637
638 let (notifier_fired_sender, notifier_fired_receiver) = oneshot::channel::<()>();
639 let notifier_fired_sender = Arc::new(Mutex::new(Some(notifier_fired_sender)));
640
641 let component_name = "test".to_string();
642
643 runner_builder
644 .register_local_component(component_name.clone(), move |handles| {
645 let notifier_registered_sender = notifier_registered_sender.clone();
646 let notifier_fired_sender = notifier_fired_sender.clone();
647 async move {
648 let stop_notifier = handles.register_stop_notifier().await;
649
650 let sender = notifier_registered_sender
651 .lock()
652 .await
653 .take()
654 .expect("local component invoked twice");
655 sender.send(()).expect("failed to send that the stop notifier was registered");
656
657 stop_notifier.await.expect("failed to wait for stop notification");
658
659 let sender = notifier_fired_sender
660 .lock()
661 .await
662 .take()
663 .expect("local component invoked twice");
664 sender
665 .send(())
666 .expect("failed to send that the stop notifier received a message");
667
668 Ok(())
669 }
670 .boxed()
671 })
672 .await
673 .unwrap();
674
675 let runner_and_handles = build_and_start(runner_builder, component_name).await;
676
677 assert_matches!(notifier_registered_receiver.await, Ok(()));
679
680 runner_and_handles.controller_proxy.stop().expect("failed to send stop");
682
683 assert_matches!(notifier_fired_receiver.await, Ok(()));
685 }
686
687 #[fuchsia::test]
688 async fn dropping_the_runner_will_kill_a_component() {
689 let runner_builder = LocalComponentRunnerBuilder::new();
690 let (sender, receiver) = oneshot::channel::<()>();
691 let sender = Arc::new(Mutex::new(Some(sender)));
692
693 let component_name = "test".to_string();
694
695 runner_builder
696 .register_local_component(component_name.clone(), move |_handles| {
697 let sender = sender.clone();
698 async move {
699 let _sender =
700 sender.lock().await.take().expect("local component invoked twice");
701 pending().await
704 }
705 .boxed()
706 })
707 .await
708 .unwrap();
709
710 let runner_and_handles = build_and_start(runner_builder, component_name).await;
711 drop(runner_and_handles);
712
713 assert_eq!(Err(oneshot::Canceled), receiver.await);
714 }
715}