device_watcher/
lib.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::{format_err, Context, Result};
6use fidl_fuchsia_device::ControllerMarker;
7use fidl_fuchsia_io as fio;
8use fuchsia_fs::directory::{WatchEvent, Watcher};
9use futures::stream::{Stream, StreamExt as _, TryStreamExt as _};
10use std::path::PathBuf;
11
12// Device metadata.
13pub struct DeviceInfo<'a> {
14    // The device's file name within the directory in which it was found.
15    pub filename: &'a str,
16    // The device's topological path.
17    pub topological_path: String,
18}
19
20/// Watches the directory for a device for which the predicate returns `Some(t)`
21/// and returns `t`.
22pub async fn wait_for_device_with<T>(
23    dev_dir: &fio::DirectoryProxy,
24    predicate: impl Fn(DeviceInfo<'_>) -> Option<T>,
25) -> Result<T, anyhow::Error> {
26    let stream = watch_for_files(dev_dir).await?;
27    let stream = stream.try_filter_map(|filename| {
28        let predicate = &predicate;
29        async move {
30            let filename =
31                filename.to_str().ok_or_else(|| format_err!("to_str for filename failed"))?;
32            let controller_filename = filename.to_owned() + "/device_controller";
33
34            let (controller_proxy, server_end) =
35                fidl::endpoints::create_proxy::<ControllerMarker>();
36            #[cfg(fuchsia_api_level_at_least = "NEXT")]
37            if dev_dir
38                .open(
39                    &controller_filename,
40                    fio::Flags::PROTOCOL_SERVICE,
41                    &fio::Options::default(),
42                    server_end.into_channel(),
43                )
44                .is_err()
45            {
46                return Ok(None);
47            }
48            #[cfg(not(fuchsia_api_level_at_least = "NEXT"))]
49            if dev_dir
50                .open3(
51                    &controller_filename,
52                    fio::Flags::PROTOCOL_SERVICE,
53                    &fio::Options::default(),
54                    server_end.into_channel(),
55                )
56                .is_err()
57            {
58                return Ok(None);
59            }
60
61            let topological_path = controller_proxy.get_topological_path().await;
62            let topological_path = match topological_path {
63                Ok(topological_path) => topological_path,
64                // Special case PEER_CLOSED; the peer is expected to close the
65                // connection if it doesn't implement the controller protocol.
66                Err(err) => match err {
67                    fidl::Error::ClientChannelClosed { .. } => return Ok(None),
68                    err => {
69                        return Err(err).with_context(|| {
70                            format!("failed to send get_topological_path on \"{}\"", filename)
71                        })
72                    }
73                },
74            };
75            let topological_path = topological_path
76                .map_err(zx_status::Status::from_raw)
77                .with_context(|| format!("failed to get topological path on \"{}\"", filename))?;
78
79            Ok(predicate(DeviceInfo { filename, topological_path }))
80        }
81    });
82    futures::pin_mut!(stream);
83    let item = stream.try_next().await?;
84    item.ok_or_else(|| format_err!("stream ended prematurely"))
85}
86
87/// Returns a stream that contains the paths of any existing files and
88/// directories in `dir` and any new files or directories created after this
89/// function was invoked. These paths are relative to `dir`.
90pub async fn watch_for_files(
91    dir: &fio::DirectoryProxy,
92) -> Result<impl Stream<Item = Result<PathBuf>>> {
93    let watcher = Watcher::new(dir).await.context("failed to create watcher")?;
94    Ok(watcher.map(|result| result.context("failed to get watcher event")).try_filter_map(|msg| {
95        futures::future::ok(match msg.event {
96            WatchEvent::EXISTING | WatchEvent::ADD_FILE => {
97                if msg.filename == std::path::Path::new(".") {
98                    None
99                } else {
100                    Some(msg.filename)
101                }
102            }
103            _ => None,
104        })
105    }))
106}
107
108async fn wait_for_file(dir: &fio::DirectoryProxy, name: &str) -> Result<()> {
109    let mut watcher = fuchsia_fs::directory::Watcher::new(dir).await?;
110    while let Some(msg) = watcher.try_next().await? {
111        if msg.event != fuchsia_fs::directory::WatchEvent::EXISTING
112            && msg.event != fuchsia_fs::directory::WatchEvent::ADD_FILE
113        {
114            continue;
115        }
116        if msg.filename.to_str().unwrap() == name {
117            return Ok(());
118        }
119    }
120    unreachable!();
121}
122
123/// Open the path `name` within `dir`. This function waits for each directory to
124/// be available before it opens it. If the path never appears this function
125/// will wait forever.
126async fn recursive_wait_and_open_with_flags<T, F>(
127    mut dir: fio::DirectoryProxy,
128    name: &str,
129    flags: fio::Flags,
130    op: F,
131) -> Result<T>
132where
133    F: FnOnce(&fio::DirectoryProxy, &str, fio::Flags) -> T,
134{
135    let path = std::path::Path::new(name);
136    let mut components = path.components().peekable();
137    loop {
138        let component =
139            components.next().ok_or_else(|| format_err!("cannot wait for empty path"))?;
140        let file = match component {
141            std::path::Component::Normal(file) => file,
142            // Per fuchsia.io/Directory.Open[0]:
143            //
144            // A leading '/' is allowed (and is treated the same way as if not present, i.e.
145            // "/foo/bar' and "foo/bar" are the same).
146            //
147            // [0] https://cs.opensource.google/fuchsia/fuchsia/+/main:sdk/fidl/fuchsia.io/directory.fidl;l=211-237;drc=02426e16b637b25a21b1e53f9861855d476aaf49
148            std::path::Component::RootDir => continue,
149            component => {
150                return Err(format_err!("path contains non-normal component {:?}", component))
151            }
152        };
153        let file = file.to_str().unwrap();
154        let () = wait_for_file(&dir, file).await?;
155        if components.peek().is_some() {
156            dir = fuchsia_fs::directory::open_directory_async(&dir, file, flags)?;
157        } else {
158            break Ok(op(&dir, file, flags));
159        }
160    }
161}
162
163/// Wait for `name` to be available in `dir`. This function waits for each directory along
164/// the path and returns once it has waited on the final component in the path. If the path
165/// never appears this function will wait forever.
166pub async fn recursive_wait(dir: &fio::DirectoryProxy, name: &str) -> Result<()> {
167    recursive_wait_and_open_with_flags(Clone::clone(dir), name, fio::Flags::empty(), |_, _, _| ())
168        .await
169}
170
171/// Open the path `name` within `dir`. This function waits for each directory to
172/// be available before it opens it. If the path never appears this function
173/// will wait forever.
174pub async fn recursive_wait_and_open_directory(
175    dir: &fio::DirectoryProxy,
176    name: &str,
177) -> Result<fio::DirectoryProxy> {
178    recursive_wait_and_open_with_flags(
179        Clone::clone(dir),
180        name,
181        fio::Flags::PROTOCOL_DIRECTORY,
182        fuchsia_fs::directory::open_async::<fio::DirectoryMarker>,
183    )
184    .await
185    .and_then(|res| res.map_err(Into::into))
186}
187
188/// Connect to an instance of FIDL protocol hosted at `name` within `dir`. This function waits for
189/// each directory to be available before it opens it. If the path never appears this function will
190/// wait forever.
191pub async fn recursive_wait_and_open<P: fidl::endpoints::ProtocolMarker>(
192    dir: &fio::DirectoryProxy,
193    name: &str,
194) -> Result<P::Proxy> {
195    recursive_wait_and_open_with_flags(
196        Clone::clone(dir),
197        name,
198        fio::Flags::empty(),
199        |dir, path, _flags| {
200            // Cannot open services with other flags.
201            fuchsia_fs::directory::open_async::<P>(dir, path, fio::Flags::PROTOCOL_SERVICE)
202        },
203    )
204    .await
205    .and_then(|res| res.map_err(Into::into))
206}
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211    use futures::StreamExt;
212    use std::collections::HashSet;
213    use std::str::FromStr;
214    use std::sync::Arc;
215    use vfs::file::vmo::read_only;
216    use {fidl_fuchsia_device as fdev, fuchsia_async as fasync};
217
218    fn create_controller_service(topo_path: &'static str) -> Arc<vfs::service::Service> {
219        vfs::service::host(move |mut stream: fdev::ControllerRequestStream| async move {
220            match stream.try_next().await.unwrap() {
221                Some(fdev::ControllerRequest::GetTopologicalPath { responder }) => {
222                    let _ = responder.send(Ok(topo_path));
223                }
224                e => panic!("Unexpected request: {:?}", e),
225            }
226        })
227    }
228
229    #[fasync::run_singlethreaded(test)]
230    async fn wait_for_device_by_topological_path() {
231        let dir = vfs::pseudo_directory! {
232          "a" => vfs::pseudo_directory! {
233            "device_controller" => create_controller_service("/dev/test2/a/dev"),
234          },
235          "1" => vfs::pseudo_directory! {
236            "device_controller" => create_controller_service("/dev/test2/1/dev"),
237          },
238          "x" => vfs::pseudo_directory! {
239            "device_controller" => create_controller_service("/dev/test2/x/dev"),
240          },
241          "y" => vfs::pseudo_directory! {
242            "device_controller" => create_controller_service("/dev/test2/y/dev"),
243          },
244        };
245        let dir_proxy = vfs::directory::serve_read_only(dir);
246        let path = wait_for_device_with(&dir_proxy, |DeviceInfo { filename, topological_path }| {
247            (topological_path == "/dev/test2/x/dev").then(|| filename.to_string())
248        })
249        .await
250        .unwrap();
251        assert_eq!("x", path);
252    }
253
254    #[fasync::run_singlethreaded(test)]
255    async fn watch_for_two_files() {
256        let dir = vfs::pseudo_directory! {
257          "a" => read_only(b"/a"),
258          "b" => read_only(b"/b"),
259        };
260
261        let dir_proxy = vfs::directory::serve_read_only(dir);
262
263        let stream = watch_for_files(&dir_proxy).await.unwrap();
264        futures::pin_mut!(stream);
265        let actual: HashSet<PathBuf> =
266            vec![stream.next().await.unwrap().unwrap(), stream.next().await.unwrap().unwrap()]
267                .into_iter()
268                .collect();
269        let expected: HashSet<PathBuf> =
270            vec![PathBuf::from_str("a").unwrap(), PathBuf::from_str("b").unwrap()]
271                .into_iter()
272                .collect();
273        assert_eq!(actual, expected);
274    }
275
276    #[fasync::run_singlethreaded(test)]
277    async fn wait_for_device_topo_path_allows_files_and_dirs() {
278        let dir = vfs::pseudo_directory! {
279          "1" => vfs::pseudo_directory! {
280            "test" => read_only("test file 1"),
281            "test2" => read_only("test file 2"),
282          },
283          "2" => read_only("file 2"),
284          "x" => vfs::pseudo_directory! {
285            "device_controller" => create_controller_service("/dev/test2/x/dev"),
286          },
287          "3" => read_only("file 3"),
288        };
289
290        let dir_proxy = vfs::directory::serve_read_only(dir);
291        let path = wait_for_device_with(&dir_proxy, |DeviceInfo { filename, topological_path }| {
292            (topological_path == "/dev/test2/x/dev").then(|| filename.to_string())
293        })
294        .await
295        .unwrap();
296        assert_eq!("x", path);
297    }
298
299    #[fasync::run_singlethreaded(test)]
300    async fn open_two_directories() {
301        let root = vfs::pseudo_directory! {
302            "test" => vfs::pseudo_directory! {
303                "dir" => vfs::pseudo_directory! {},
304            },
305        };
306        let client = vfs::directory::serve_read_only(root);
307        let directory = recursive_wait_and_open_directory(&client, "test/dir").await.unwrap();
308        let () = directory.close().await.unwrap().unwrap();
309    }
310
311    #[fasync::run_singlethreaded(test)]
312    async fn open_directory_with_leading_slash() {
313        let root = vfs::pseudo_directory! {
314            "test" => vfs::pseudo_directory! {},
315        };
316        let client = vfs::directory::serve_read_only(root);
317        let directory = recursive_wait_and_open_directory(&client, "/test").await.unwrap();
318        let () = directory.close().await.unwrap().unwrap();
319    }
320}