Skip to main content

service_broker/
lib.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::{Context, Result, bail, format_err};
6use async_utils::async_once::Once;
7use cm_types::Name;
8use fidl::AsHandleRef;
9use fidl::endpoints::ServerEnd;
10use fidl_fuchsia_data as fdata;
11use fidl_fuchsia_io as fio;
12use fidl_fuchsia_process as fprocess;
13use fidl_fuchsia_process_lifecycle as fpl;
14use fuchsia_component::directory::AsRefDirectory;
15use fuchsia_component::server::{ServiceFs, ServiceObj, ServiceObjTrait};
16use fuchsia_fs::directory::{WatchEvent, Watcher};
17use futures::prelude::*;
18use std::borrow::Cow;
19use std::path::Path;
20use std::sync::Arc;
21
22/// Safely extracts a string from a VFS event without allocating
23/// unless ownership is explicitly required.
24fn extract_event_filename<'a>(path: &'a Path) -> Option<Cow<'a, str>> {
25    let s = path.to_str()?;
26    if s == "." {
27        // Return a borrowed reference, no allocation
28        Some(Cow::Borrowed("."))
29    } else {
30        // We can still borrow here, deferring .to_owned() until the caller
31        // proves they need it.
32        Some(Cow::Borrowed(s))
33    }
34}
35
36async fn wait_for_first_instance(svc: &fio::DirectoryProxy) -> Result<String> {
37    const INPUT_SERVICE: &str = "input";
38    let (service_dir, request) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
39    svc.as_ref_directory().open(INPUT_SERVICE, fio::Flags::PROTOCOL_DIRECTORY, request.into())?;
40    let watcher = Watcher::new(&service_dir).await.context("failed to create watcher")?;
41
42    let mut stream =
43        watcher.map(|result| result.context("failed to get watcher event")).try_filter_map(|msg| {
44            futures::future::ok(match msg.event {
45                WatchEvent::EXISTING | WatchEvent::ADD_FILE => {
46                    let filename = extract_event_filename(msg.filename.as_path())
47                        .expect("filename must be valid utf8");
48
49                    if filename.as_ref() == "." { None } else { Some(filename.into_owned()) }
50                }
51                _ => None,
52            })
53        });
54
55    let first = stream.try_next().await?.ok_or_else(|| {
56        format_err!("Watcher stream closed unexpectedly before finding an instance")
57    })?;
58
59    Ok(format!("{INPUT_SERVICE}/{first}"))
60}
61
62async fn connect_request(
63    svc: &fio::DirectoryProxy,
64    request: zx::Channel,
65    protocol_name: &Name,
66    instance_dir: &str,
67) {
68    let target_path = format!("{instance_dir}/{}", protocol_name.as_str());
69
70    if let Err(e) = svc.as_ref_directory().open(&target_path, fio::Flags::PROTOCOL_SERVICE, request)
71    {
72        log::error!("[service-broker] Failed to forward connection to {target_path}: {e}");
73    }
74}
75
76async fn first_instance_to_protocol<'a>(
77    svc: fio::DirectoryProxy,
78    fs: &mut ServiceFs<ServiceObj<'a, ()>>,
79    protocol_name: Name,
80    scope: &'a fuchsia_async::Scope,
81) -> Result<()> {
82    let cached_instance: Arc<Once<String>> = Arc::new(Once::new());
83    let svc_arc = Arc::new(svc);
84
85    fs.dir("svc").add_service_at("output", move |request: zx::Channel| {
86        let svc = Arc::clone(&svc_arc);
87        let protocol_name = protocol_name.clone();
88        let cached_instance = Arc::clone(&cached_instance);
89
90        scope.spawn(async move {
91            // Safely initializes the path once. All subsequent connection requests
92            // will instantly resolve the string without hitting the filesystem.
93            let init_future = async || wait_for_first_instance(&svc).await;
94
95            match cached_instance.get_or_try_init(init_future).await {
96                Ok(instance_dir) => {
97                    connect_request(&svc, request, &protocol_name, instance_dir).await;
98                }
99                Err(e) => {
100                    log::error!(
101                        "[service-broker] Failed to resolve first instance: {e}, {protocol_name}"
102                    );
103                }
104            }
105        });
106
107        Some(())
108    });
109
110    Ok(())
111}
112
113async fn first_instance_to_default<T: ServiceObjTrait>(
114    svc: fio::DirectoryProxy,
115    fs: &mut ServiceFs<T>,
116) -> Result<()> {
117    // TODO(surajmalhotra): Do this wait every time we get a connection request to handle cases
118    // where the instance goes away and comes back.
119    let instance_dir_path = wait_for_first_instance(&svc).await?;
120    let (instance_dir, request) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
121    svc.as_ref_directory().open(
122        &instance_dir_path,
123        fio::Flags::PROTOCOL_DIRECTORY,
124        request.into(),
125    )?;
126
127    fs.dir("svc").dir("output").add_remote("default", instance_dir);
128    Ok(())
129}
130
131async fn filter_and_rename<T: ServiceObjTrait>(
132    _svc: fio::DirectoryProxy,
133    _fs: &mut ServiceFs<T>,
134    _filter: &Vec<String>,
135    _rename: &Vec<String>,
136) -> Result<()> {
137    bail!("filter_and_rename policy is not yet implemented");
138    // Add a bunch of directories which forward requests?
139}
140
141fn get_value<'a>(dict: &'a fdata::Dictionary, key: &str) -> Option<&'a fdata::DictionaryValue> {
142    match &dict.entries {
143        Some(entries) => {
144            for entry in entries {
145                if entry.key == key {
146                    return entry.value.as_ref().map(|val| &**val);
147                }
148            }
149            None
150        }
151        _ => None,
152    }
153}
154
155fn get_program_string<'a>(program: &'a fdata::Dictionary, key: &str) -> Result<&'a str> {
156    if let Some(fdata::DictionaryValue::Str(value)) = get_value(program, key) {
157        Ok(value)
158    } else {
159        Err(format_err!("{key} not found in program or is not a string"))
160    }
161}
162
163fn get_program_strvec<'a>(
164    program: &'a fdata::Dictionary,
165    key: &str,
166) -> Result<Option<&'a Vec<String>>> {
167    match get_value(program, key) {
168        Some(args_value) => match args_value {
169            fdata::DictionaryValue::StrVec(vec) => Ok(Some(vec)),
170            _ => Err(format_err!(
171                "Expected {key} in program to be vector of strings, found something else"
172            )),
173        },
174        None => Ok(None),
175    }
176}
177
178pub async fn main(
179    ns_entries: Vec<fprocess::NameInfo>,
180    directory_request: ServerEnd<fio::DirectoryMarker>,
181    lifecycle: ServerEnd<fpl::LifecycleMarker>,
182    program: Option<fdata::Dictionary>,
183) -> Result<()> {
184    drop(lifecycle);
185    if directory_request.as_handle_ref().is_invalid() {
186        bail!("No valid handle found for outgoing directory");
187    }
188    let Some(svc) = ns_entries.into_iter().find(|e| e.path == "/svc") else {
189        bail!("No /svc in namespace");
190    };
191    let Some(program) = program else {
192        bail!("No program section provided");
193    };
194    let scope = fuchsia_async::Scope::new();
195    let svc = svc.directory.into_proxy();
196    let mut fs = ServiceFs::new();
197    match get_program_string(&program, "policy")? {
198        "first_instance_to_protocol" => {
199            let protocol_name_str = get_program_string(&program, "protocol_name")?;
200
201            let protocol_name = Name::new(protocol_name_str).map_err(|e| {
202                format_err!("Invalid protocol_name '{protocol_name_str}' in program dict: {e}")
203            })?;
204
205            first_instance_to_protocol(svc, &mut fs, protocol_name, &scope).await
206        }
207        "first_instance_to_default" => first_instance_to_default(svc, &mut fs).await,
208        "filter_and_rename" => {
209            let empty = vec![];
210            let filter = get_program_strvec(&program, "filter")?.unwrap_or(&empty);
211            let rename = get_program_strvec(&program, "rename")?.unwrap_or(&empty);
212            filter_and_rename(svc, &mut fs, filter, rename).await
213        }
214        policy => Err(format_err!("Unsupported policy specified: {policy}")),
215    }?;
216
217    log::debug!("[service-broker] Initialized.");
218
219    fs.serve_connection(directory_request).context("failed to serve outgoing namespace")?;
220    fs.collect::<()>().await;
221    Ok(())
222}
223
224#[cfg(test)]
225mod tests {
226    use super::*;
227    use fidl::endpoints::{Proxy, create_endpoints, create_proxy};
228    use fidl_fuchsia_data as fdata;
229    use fuchsia_async as fasync;
230    use futures::StreamExt;
231
232    fn make_program_dict(entries: Vec<(&str, fdata::DictionaryValue)>) -> fdata::Dictionary {
233        let entries = entries
234            .into_iter()
235            .map(|(k, v)| fdata::DictionaryEntry { key: k.to_string(), value: Some(Box::new(v)) })
236            .collect();
237        fdata::Dictionary { entries: Some(entries), ..Default::default() }
238    }
239
240    #[test]
241    fn test_get_program_string() {
242        let dict = make_program_dict(vec![(
243            "policy",
244            fdata::DictionaryValue::Str("first_instance_to_protocol".to_string()),
245        )]);
246
247        assert_eq!(get_program_string(&dict, "policy").unwrap(), "first_instance_to_protocol");
248
249        let err = get_program_string(&dict, "missing_key").unwrap_err();
250        assert_eq!(err.to_string(), "missing_key not found in program or is not a string");
251    }
252
253    #[test]
254    fn test_get_program_strvec() {
255        let dict = make_program_dict(vec![(
256            "filter",
257            fdata::DictionaryValue::StrVec(vec!["fuchsia.foo.Bar".to_string()]),
258        )]);
259
260        let vec = get_program_strvec(&dict, "filter").unwrap().unwrap();
261        assert_eq!(vec.len(), 1);
262        assert_eq!(vec[0], "fuchsia.foo.Bar");
263
264        assert!(get_program_strvec(&dict, "rename").unwrap().is_none());
265    }
266
267    #[fasync::run_singlethreaded(test)]
268    async fn test_filter_and_rename_graceful_failure() {
269        let (dir_proxy, _server_end) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
270        let mut fs = ServiceFs::<ServiceObj<'_, ()>>::new();
271
272        let result = filter_and_rename(dir_proxy, &mut fs, &vec![], &vec![]).await;
273        assert!(result.is_err());
274        assert_eq!(
275            result.unwrap_err().to_string(),
276            "filter_and_rename policy is not yet implemented"
277        );
278    }
279
280    #[fasync::run_singlethreaded(test)]
281    async fn test_broker_caching_and_routing_end_to_end() {
282        let (svc_dir, svc_server_end) = create_proxy::<fio::DirectoryMarker>();
283        let mut fake_svc_fs = ServiceFs::new();
284
285        fake_svc_fs.dir("input").dir("instance_123").add_service_at(
286            "my_protocol",
287            |req: zx::Channel| {
288                let _ = req.write(&[1], &mut []);
289                Some(())
290            },
291        );
292
293        fasync::Task::spawn(async move {
294            fake_svc_fs.serve_connection(svc_server_end).unwrap();
295            fake_svc_fs.collect::<()>().await;
296        })
297        .detach();
298
299        let ns_entries = vec![fprocess::NameInfo {
300            path: "/svc".to_string(),
301            directory: svc_dir.into_channel().unwrap().into_zx_channel().into(),
302        }];
303
304        let (out_dir, out_server_end) = create_proxy::<fio::DirectoryMarker>();
305        let (_, lifecycle_server_end) = create_endpoints::<fpl::LifecycleMarker>();
306
307        let program_dict = make_program_dict(vec![
308            ("policy", fdata::DictionaryValue::Str("first_instance_to_protocol".to_string())),
309            ("protocol_name", fdata::DictionaryValue::Str("my_protocol".to_string())),
310        ]);
311
312        fasync::Task::spawn(async move {
313            let res =
314                main(ns_entries, out_server_end, lifecycle_server_end, Some(program_dict)).await;
315            assert!(res.is_ok(), "Broker main task failed");
316        })
317        .detach();
318
319        let (client_end, server_end) = zx::Channel::create();
320
321        out_dir
322            .open("svc/output", fio::Flags::PROTOCOL_SERVICE, &fio::Options::default(), server_end)
323            .expect("Failed to send open request to broker");
324
325        let signals = fasync::OnSignals::new(&client_end, zx::Signals::CHANNEL_READABLE)
326            .await
327            .expect("Failed waiting for signal. Routing may have dropped the channel.");
328
329        assert!(signals.contains(zx::Signals::CHANNEL_READABLE));
330
331        let (client_end2, server_end2) = zx::Channel::create();
332        out_dir
333            .open("svc/output", fio::Flags::PROTOCOL_SERVICE, &fio::Options::default(), server_end2)
334            .unwrap();
335
336        let _ = fasync::OnSignals::new(&client_end2, zx::Signals::CHANNEL_READABLE).await.unwrap();
337    }
338}