device_watcher/
lib.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::{Context, Result, format_err};
6use fidl_fuchsia_device::ControllerMarker;
7use fidl_fuchsia_io as fio;
8use fuchsia_fs::directory::{WatchEvent, Watcher};
9use futures::stream::{Stream, StreamExt as _, TryStreamExt as _};
10use std::path::PathBuf;
11
12// Device metadata.
13pub struct DeviceInfo<'a> {
14    // The device's file name within the directory in which it was found.
15    pub filename: &'a str,
16    // The device's topological path.
17    pub topological_path: String,
18}
19
20/// Watches the directory for a device for which the predicate returns `Some(t)`
21/// and returns `t`.
22pub async fn wait_for_device_with<T>(
23    dev_dir: &fio::DirectoryProxy,
24    predicate: impl Fn(DeviceInfo<'_>) -> Option<T>,
25) -> Result<T, anyhow::Error> {
26    let stream = watch_for_files(dev_dir).await?;
27    let stream = stream.try_filter_map(|filename| {
28        let predicate = &predicate;
29        async move {
30            let filename =
31                filename.to_str().ok_or_else(|| format_err!("to_str for filename failed"))?;
32            let controller_filename = filename.to_owned() + "/device_controller";
33
34            let (controller_proxy, server_end) =
35                fidl::endpoints::create_proxy::<ControllerMarker>();
36            #[cfg(fuchsia_api_level_at_least = "27")]
37            if dev_dir
38                .open(
39                    &controller_filename,
40                    fio::Flags::PROTOCOL_SERVICE,
41                    &fio::Options::default(),
42                    server_end.into_channel(),
43                )
44                .is_err()
45            {
46                return Ok(None);
47            }
48            #[cfg(not(fuchsia_api_level_at_least = "27"))]
49            if dev_dir
50                .open3(
51                    &controller_filename,
52                    fio::Flags::PROTOCOL_SERVICE,
53                    &fio::Options::default(),
54                    server_end.into_channel(),
55                )
56                .is_err()
57            {
58                return Ok(None);
59            }
60
61            let topological_path = controller_proxy.get_topological_path().await;
62            let topological_path = match topological_path {
63                Ok(topological_path) => topological_path,
64                // Special case PEER_CLOSED; the peer is expected to close the
65                // connection if it doesn't implement the controller protocol.
66                Err(err) => match err {
67                    fidl::Error::ClientChannelClosed { .. } => return Ok(None),
68                    err => {
69                        return Err(err).with_context(|| {
70                            format!("failed to send get_topological_path on \"{}\"", filename)
71                        });
72                    }
73                },
74            };
75            let topological_path = topological_path
76                .map_err(zx_status::Status::from_raw)
77                .with_context(|| format!("failed to get topological path on \"{}\"", filename))?;
78
79            Ok(predicate(DeviceInfo { filename, topological_path }))
80        }
81    });
82    futures::pin_mut!(stream);
83    let item = stream.try_next().await?;
84    item.ok_or_else(|| format_err!("stream ended prematurely"))
85}
86
87/// Returns a stream that contains the paths of any existing files and
88/// directories in `dir` and any new files or directories created after this
89/// function was invoked. These paths are relative to `dir`.
90pub async fn watch_for_files(
91    dir: &fio::DirectoryProxy,
92) -> Result<impl Stream<Item = Result<PathBuf>> + use<>> {
93    let watcher = Watcher::new(dir).await.context("failed to create watcher")?;
94    Ok(watcher.map(|result| result.context("failed to get watcher event")).try_filter_map(|msg| {
95        futures::future::ok(match msg.event {
96            WatchEvent::EXISTING | WatchEvent::ADD_FILE => {
97                if msg.filename == std::path::Path::new(".") { None } else { Some(msg.filename) }
98            }
99            _ => None,
100        })
101    }))
102}
103
104async fn wait_for_file(dir: &fio::DirectoryProxy, name: &str) -> Result<()> {
105    let mut watcher = fuchsia_fs::directory::Watcher::new(dir).await?;
106    while let Some(msg) = watcher.try_next().await? {
107        if msg.event != fuchsia_fs::directory::WatchEvent::EXISTING
108            && msg.event != fuchsia_fs::directory::WatchEvent::ADD_FILE
109        {
110            continue;
111        }
112        if msg.filename.to_str().unwrap() == name {
113            return Ok(());
114        }
115    }
116    unreachable!();
117}
118
119/// Open the path `name` within `dir`. This function waits for each directory to
120/// be available before it opens it. If the path never appears this function
121/// will wait forever.
122async fn recursive_wait_and_open_with_flags<T, F>(
123    mut dir: fio::DirectoryProxy,
124    name: &str,
125    flags: fio::Flags,
126    op: F,
127) -> Result<T>
128where
129    F: FnOnce(&fio::DirectoryProxy, &str, fio::Flags) -> T,
130{
131    let path = std::path::Path::new(name);
132    let mut components = path.components().peekable();
133    loop {
134        let component =
135            components.next().ok_or_else(|| format_err!("cannot wait for empty path"))?;
136        let file = match component {
137            std::path::Component::Normal(file) => file,
138            // Per fuchsia.io/Directory.Open[0]:
139            //
140            // A leading '/' is allowed (and is treated the same way as if not present, i.e.
141            // "/foo/bar' and "foo/bar" are the same).
142            //
143            // [0] https://cs.opensource.google/fuchsia/fuchsia/+/main:sdk/fidl/fuchsia.io/directory.fidl;l=211-237;drc=02426e16b637b25a21b1e53f9861855d476aaf49
144            std::path::Component::RootDir => continue,
145            component => {
146                return Err(format_err!("path contains non-normal component {:?}", component));
147            }
148        };
149        let file = file.to_str().unwrap();
150        let () = wait_for_file(&dir, file).await?;
151        if components.peek().is_some() {
152            dir = fuchsia_fs::directory::open_directory_async(&dir, file, flags)?;
153        } else {
154            break Ok(op(&dir, file, flags));
155        }
156    }
157}
158
159/// Wait for `name` to be available in `dir`. This function waits for each directory along
160/// the path and returns once it has waited on the final component in the path. If the path
161/// never appears this function will wait forever.
162pub async fn recursive_wait(dir: &fio::DirectoryProxy, name: &str) -> Result<()> {
163    recursive_wait_and_open_with_flags(Clone::clone(dir), name, fio::Flags::empty(), |_, _, _| ())
164        .await
165}
166
167/// Open the path `name` within `dir`. This function waits for each directory to
168/// be available before it opens it. If the path never appears this function
169/// will wait forever.
170pub async fn recursive_wait_and_open_directory(
171    dir: &fio::DirectoryProxy,
172    name: &str,
173) -> Result<fio::DirectoryProxy> {
174    recursive_wait_and_open_with_flags(
175        Clone::clone(dir),
176        name,
177        fio::Flags::PROTOCOL_DIRECTORY,
178        fuchsia_fs::directory::open_async::<fio::DirectoryMarker>,
179    )
180    .await
181    .and_then(|res| res.map_err(Into::into))
182}
183
184/// Connect to an instance of FIDL protocol hosted at `name` within `dir`. This function waits for
185/// each directory to be available before it opens it. If the path never appears this function will
186/// wait forever.
187pub async fn recursive_wait_and_open<P: fidl::endpoints::ProtocolMarker>(
188    dir: &fio::DirectoryProxy,
189    name: &str,
190) -> Result<P::Proxy> {
191    recursive_wait_and_open_with_flags(
192        Clone::clone(dir),
193        name,
194        fio::Flags::empty(),
195        |dir, path, _flags| {
196            // Cannot open services with other flags.
197            fuchsia_fs::directory::open_async::<P>(dir, path, fio::Flags::PROTOCOL_SERVICE)
198        },
199    )
200    .await
201    .and_then(|res| res.map_err(Into::into))
202}
203
204#[cfg(test)]
205mod tests {
206    use super::*;
207    use futures::StreamExt;
208    use std::collections::HashSet;
209    use std::str::FromStr;
210    use std::sync::Arc;
211    use vfs::file::vmo::read_only;
212    use {fidl_fuchsia_device as fdev, fuchsia_async as fasync};
213
214    fn create_controller_service(topo_path: &'static str) -> Arc<vfs::service::Service> {
215        vfs::service::host(move |mut stream: fdev::ControllerRequestStream| async move {
216            match stream.try_next().await.unwrap() {
217                Some(fdev::ControllerRequest::GetTopologicalPath { responder }) => {
218                    let _ = responder.send(Ok(topo_path));
219                }
220                e => panic!("Unexpected request: {:?}", e),
221            }
222        })
223    }
224
225    #[fasync::run_singlethreaded(test)]
226    async fn wait_for_device_by_topological_path() {
227        let dir = vfs::pseudo_directory! {
228          "a" => vfs::pseudo_directory! {
229            "device_controller" => create_controller_service("/dev/test2/a/dev"),
230          },
231          "1" => vfs::pseudo_directory! {
232            "device_controller" => create_controller_service("/dev/test2/1/dev"),
233          },
234          "x" => vfs::pseudo_directory! {
235            "device_controller" => create_controller_service("/dev/test2/x/dev"),
236          },
237          "y" => vfs::pseudo_directory! {
238            "device_controller" => create_controller_service("/dev/test2/y/dev"),
239          },
240        };
241        let dir_proxy = vfs::directory::serve_read_only(dir);
242        let path = wait_for_device_with(&dir_proxy, |DeviceInfo { filename, topological_path }| {
243            (topological_path == "/dev/test2/x/dev").then(|| filename.to_string())
244        })
245        .await
246        .unwrap();
247        assert_eq!("x", path);
248    }
249
250    #[fasync::run_singlethreaded(test)]
251    async fn watch_for_two_files() {
252        let dir = vfs::pseudo_directory! {
253          "a" => read_only(b"/a"),
254          "b" => read_only(b"/b"),
255        };
256
257        let dir_proxy = vfs::directory::serve_read_only(dir);
258
259        let stream = watch_for_files(&dir_proxy).await.unwrap();
260        futures::pin_mut!(stream);
261        let actual: HashSet<PathBuf> =
262            vec![stream.next().await.unwrap().unwrap(), stream.next().await.unwrap().unwrap()]
263                .into_iter()
264                .collect();
265        let expected: HashSet<PathBuf> =
266            vec![PathBuf::from_str("a").unwrap(), PathBuf::from_str("b").unwrap()]
267                .into_iter()
268                .collect();
269        assert_eq!(actual, expected);
270    }
271
272    #[fasync::run_singlethreaded(test)]
273    async fn wait_for_device_topo_path_allows_files_and_dirs() {
274        let dir = vfs::pseudo_directory! {
275          "1" => vfs::pseudo_directory! {
276            "test" => read_only("test file 1"),
277            "test2" => read_only("test file 2"),
278          },
279          "2" => read_only("file 2"),
280          "x" => vfs::pseudo_directory! {
281            "device_controller" => create_controller_service("/dev/test2/x/dev"),
282          },
283          "3" => read_only("file 3"),
284        };
285
286        let dir_proxy = vfs::directory::serve_read_only(dir);
287        let path = wait_for_device_with(&dir_proxy, |DeviceInfo { filename, topological_path }| {
288            (topological_path == "/dev/test2/x/dev").then(|| filename.to_string())
289        })
290        .await
291        .unwrap();
292        assert_eq!("x", path);
293    }
294
295    #[fasync::run_singlethreaded(test)]
296    async fn open_two_directories() {
297        let root = vfs::pseudo_directory! {
298            "test" => vfs::pseudo_directory! {
299                "dir" => vfs::pseudo_directory! {},
300            },
301        };
302        let client = vfs::directory::serve_read_only(root);
303        let directory = recursive_wait_and_open_directory(&client, "test/dir").await.unwrap();
304        let () = directory.close().await.unwrap().unwrap();
305    }
306
307    #[fasync::run_singlethreaded(test)]
308    async fn open_directory_with_leading_slash() {
309        let root = vfs::pseudo_directory! {
310            "test" => vfs::pseudo_directory! {},
311        };
312        let client = vfs::directory::serve_read_only(root);
313        let directory = recursive_wait_and_open_directory(&client, "/test").await.unwrap();
314        let () = directory.close().await.unwrap().unwrap();
315    }
316}