1use anyhow::{Context, Result, bail, format_err};
6use async_utils::async_once::Once;
7use cm_types::Name;
8use fidl::AsHandleRef;
9use fidl::endpoints::ServerEnd;
10use fidl_fuchsia_data as fdata;
11use fidl_fuchsia_io as fio;
12use fidl_fuchsia_process as fprocess;
13use fidl_fuchsia_process_lifecycle as fpl;
14use fuchsia_component::directory::AsRefDirectory;
15use fuchsia_component::server::{ServiceFs, ServiceObj, ServiceObjTrait};
16use fuchsia_fs::directory::{WatchEvent, Watcher};
17use futures::prelude::*;
18use std::borrow::Cow;
19use std::path::Path;
20use std::sync::Arc;
21
22fn extract_event_filename<'a>(path: &'a Path) -> Option<Cow<'a, str>> {
25 let s = path.to_str()?;
26 if s == "." {
27 Some(Cow::Borrowed("."))
29 } else {
30 Some(Cow::Borrowed(s))
33 }
34}
35
36async fn wait_for_first_instance(svc: &fio::DirectoryProxy) -> Result<String> {
37 const INPUT_SERVICE: &str = "input";
38 let (service_dir, request) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
39 svc.as_ref_directory().open(INPUT_SERVICE, fio::Flags::PROTOCOL_DIRECTORY, request.into())?;
40 let watcher = Watcher::new(&service_dir).await.context("failed to create watcher")?;
41
42 let mut stream =
43 watcher.map(|result| result.context("failed to get watcher event")).try_filter_map(|msg| {
44 futures::future::ok(match msg.event {
45 WatchEvent::EXISTING | WatchEvent::ADD_FILE => {
46 let filename = extract_event_filename(msg.filename.as_path())
47 .expect("filename must be valid utf8");
48
49 if filename.as_ref() == "." { None } else { Some(filename.into_owned()) }
50 }
51 _ => None,
52 })
53 });
54
55 let first = stream.try_next().await?.ok_or_else(|| {
56 format_err!("Watcher stream closed unexpectedly before finding an instance")
57 })?;
58
59 Ok(format!("{INPUT_SERVICE}/{first}"))
60}
61
62async fn connect_request(
63 svc: &fio::DirectoryProxy,
64 request: zx::Channel,
65 protocol_name: &Name,
66 instance_dir: &str,
67) {
68 let target_path = format!("{instance_dir}/{}", protocol_name.as_str());
69
70 if let Err(e) = svc.as_ref_directory().open(&target_path, fio::Flags::PROTOCOL_SERVICE, request)
71 {
72 log::error!("[service-broker] Failed to forward connection to {target_path}: {e}");
73 }
74}
75
76async fn first_instance_to_protocol<'a>(
77 svc: fio::DirectoryProxy,
78 fs: &mut ServiceFs<ServiceObj<'a, ()>>,
79 protocol_name: Name,
80 scope: &'a fuchsia_async::Scope,
81) -> Result<()> {
82 let cached_instance: Arc<Once<String>> = Arc::new(Once::new());
83 let svc_arc = Arc::new(svc);
84
85 fs.dir("svc").add_service_at("output", move |request: zx::Channel| {
86 let svc = Arc::clone(&svc_arc);
87 let protocol_name = protocol_name.clone();
88 let cached_instance = Arc::clone(&cached_instance);
89
90 scope.spawn(async move {
91 let init_future = async || wait_for_first_instance(&svc).await;
94
95 match cached_instance.get_or_try_init(init_future).await {
96 Ok(instance_dir) => {
97 connect_request(&svc, request, &protocol_name, instance_dir).await;
98 }
99 Err(e) => {
100 log::error!(
101 "[service-broker] Failed to resolve first instance: {e}, {protocol_name}"
102 );
103 }
104 }
105 });
106
107 Some(())
108 });
109
110 Ok(())
111}
112
113async fn first_instance_to_default<T: ServiceObjTrait>(
114 svc: fio::DirectoryProxy,
115 fs: &mut ServiceFs<T>,
116) -> Result<()> {
117 let instance_dir_path = wait_for_first_instance(&svc).await?;
120 let (instance_dir, request) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
121 svc.as_ref_directory().open(
122 &instance_dir_path,
123 fio::Flags::PROTOCOL_DIRECTORY,
124 request.into(),
125 )?;
126
127 fs.dir("svc").dir("output").add_remote("default", instance_dir);
128 Ok(())
129}
130
131async fn filter_and_rename<T: ServiceObjTrait>(
132 _svc: fio::DirectoryProxy,
133 _fs: &mut ServiceFs<T>,
134 _filter: &Vec<String>,
135 _rename: &Vec<String>,
136) -> Result<()> {
137 bail!("filter_and_rename policy is not yet implemented");
138 }
140
141fn get_value<'a>(dict: &'a fdata::Dictionary, key: &str) -> Option<&'a fdata::DictionaryValue> {
142 match &dict.entries {
143 Some(entries) => {
144 for entry in entries {
145 if entry.key == key {
146 return entry.value.as_ref().map(|val| &**val);
147 }
148 }
149 None
150 }
151 _ => None,
152 }
153}
154
155fn get_program_string<'a>(program: &'a fdata::Dictionary, key: &str) -> Result<&'a str> {
156 if let Some(fdata::DictionaryValue::Str(value)) = get_value(program, key) {
157 Ok(value)
158 } else {
159 Err(format_err!("{key} not found in program or is not a string"))
160 }
161}
162
163fn get_program_strvec<'a>(
164 program: &'a fdata::Dictionary,
165 key: &str,
166) -> Result<Option<&'a Vec<String>>> {
167 match get_value(program, key) {
168 Some(args_value) => match args_value {
169 fdata::DictionaryValue::StrVec(vec) => Ok(Some(vec)),
170 _ => Err(format_err!(
171 "Expected {key} in program to be vector of strings, found something else"
172 )),
173 },
174 None => Ok(None),
175 }
176}
177
178pub async fn main(
179 ns_entries: Vec<fprocess::NameInfo>,
180 directory_request: ServerEnd<fio::DirectoryMarker>,
181 lifecycle: ServerEnd<fpl::LifecycleMarker>,
182 program: Option<fdata::Dictionary>,
183) -> Result<()> {
184 drop(lifecycle);
185 if directory_request.as_handle_ref().is_invalid() {
186 bail!("No valid handle found for outgoing directory");
187 }
188 let Some(svc) = ns_entries.into_iter().find(|e| e.path == "/svc") else {
189 bail!("No /svc in namespace");
190 };
191 let Some(program) = program else {
192 bail!("No program section provided");
193 };
194 let scope = fuchsia_async::Scope::new();
195 let svc = svc.directory.into_proxy();
196 let mut fs = ServiceFs::new();
197 match get_program_string(&program, "policy")? {
198 "first_instance_to_protocol" => {
199 let protocol_name_str = get_program_string(&program, "protocol_name")?;
200
201 let protocol_name = Name::new(protocol_name_str).map_err(|e| {
202 format_err!("Invalid protocol_name '{protocol_name_str}' in program dict: {e}")
203 })?;
204
205 first_instance_to_protocol(svc, &mut fs, protocol_name, &scope).await
206 }
207 "first_instance_to_default" => first_instance_to_default(svc, &mut fs).await,
208 "filter_and_rename" => {
209 let empty = vec![];
210 let filter = get_program_strvec(&program, "filter")?.unwrap_or(&empty);
211 let rename = get_program_strvec(&program, "rename")?.unwrap_or(&empty);
212 filter_and_rename(svc, &mut fs, filter, rename).await
213 }
214 policy => Err(format_err!("Unsupported policy specified: {policy}")),
215 }?;
216
217 log::debug!("[service-broker] Initialized.");
218
219 fs.serve_connection(directory_request).context("failed to serve outgoing namespace")?;
220 fs.collect::<()>().await;
221 Ok(())
222}
223
224#[cfg(test)]
225mod tests {
226 use super::*;
227 use fidl::endpoints::{Proxy, create_endpoints, create_proxy};
228 use fidl_fuchsia_data as fdata;
229 use fuchsia_async as fasync;
230 use futures::StreamExt;
231
232 fn make_program_dict(entries: Vec<(&str, fdata::DictionaryValue)>) -> fdata::Dictionary {
233 let entries = entries
234 .into_iter()
235 .map(|(k, v)| fdata::DictionaryEntry { key: k.to_string(), value: Some(Box::new(v)) })
236 .collect();
237 fdata::Dictionary { entries: Some(entries), ..Default::default() }
238 }
239
240 #[test]
241 fn test_get_program_string() {
242 let dict = make_program_dict(vec![(
243 "policy",
244 fdata::DictionaryValue::Str("first_instance_to_protocol".to_string()),
245 )]);
246
247 assert_eq!(get_program_string(&dict, "policy").unwrap(), "first_instance_to_protocol");
248
249 let err = get_program_string(&dict, "missing_key").unwrap_err();
250 assert_eq!(err.to_string(), "missing_key not found in program or is not a string");
251 }
252
253 #[test]
254 fn test_get_program_strvec() {
255 let dict = make_program_dict(vec![(
256 "filter",
257 fdata::DictionaryValue::StrVec(vec!["fuchsia.foo.Bar".to_string()]),
258 )]);
259
260 let vec = get_program_strvec(&dict, "filter").unwrap().unwrap();
261 assert_eq!(vec.len(), 1);
262 assert_eq!(vec[0], "fuchsia.foo.Bar");
263
264 assert!(get_program_strvec(&dict, "rename").unwrap().is_none());
265 }
266
267 #[fasync::run_singlethreaded(test)]
268 async fn test_filter_and_rename_graceful_failure() {
269 let (dir_proxy, _server_end) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
270 let mut fs = ServiceFs::<ServiceObj<'_, ()>>::new();
271
272 let result = filter_and_rename(dir_proxy, &mut fs, &vec![], &vec![]).await;
273 assert!(result.is_err());
274 assert_eq!(
275 result.unwrap_err().to_string(),
276 "filter_and_rename policy is not yet implemented"
277 );
278 }
279
280 #[fasync::run_singlethreaded(test)]
281 async fn test_broker_caching_and_routing_end_to_end() {
282 let (svc_dir, svc_server_end) = create_proxy::<fio::DirectoryMarker>();
283 let mut fake_svc_fs = ServiceFs::new();
284
285 fake_svc_fs.dir("input").dir("instance_123").add_service_at(
286 "my_protocol",
287 |req: zx::Channel| {
288 let _ = req.write(&[1], &mut []);
289 Some(())
290 },
291 );
292
293 fasync::Task::spawn(async move {
294 fake_svc_fs.serve_connection(svc_server_end).unwrap();
295 fake_svc_fs.collect::<()>().await;
296 })
297 .detach();
298
299 let ns_entries = vec![fprocess::NameInfo {
300 path: "/svc".to_string(),
301 directory: svc_dir.into_channel().unwrap().into_zx_channel().into(),
302 }];
303
304 let (out_dir, out_server_end) = create_proxy::<fio::DirectoryMarker>();
305 let (_, lifecycle_server_end) = create_endpoints::<fpl::LifecycleMarker>();
306
307 let program_dict = make_program_dict(vec![
308 ("policy", fdata::DictionaryValue::Str("first_instance_to_protocol".to_string())),
309 ("protocol_name", fdata::DictionaryValue::Str("my_protocol".to_string())),
310 ]);
311
312 fasync::Task::spawn(async move {
313 let res =
314 main(ns_entries, out_server_end, lifecycle_server_end, Some(program_dict)).await;
315 assert!(res.is_ok(), "Broker main task failed");
316 })
317 .detach();
318
319 let (client_end, server_end) = zx::Channel::create();
320
321 out_dir
322 .open("svc/output", fio::Flags::PROTOCOL_SERVICE, &fio::Options::default(), server_end)
323 .expect("Failed to send open request to broker");
324
325 let signals = fasync::OnSignals::new(&client_end, zx::Signals::CHANNEL_READABLE)
326 .await
327 .expect("Failed waiting for signal. Routing may have dropped the channel.");
328
329 assert!(signals.contains(zx::Signals::CHANNEL_READABLE));
330
331 let (client_end2, server_end2) = zx::Channel::create();
332 out_dir
333 .open("svc/output", fio::Flags::PROTOCOL_SERVICE, &fio::Options::default(), server_end2)
334 .unwrap();
335
336 let _ = fasync::OnSignals::new(&client_end2, zx::Signals::CHANNEL_READABLE).await.unwrap();
337 }
338}