device_watcher/
lib.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::{format_err, Context, Result};
6use fidl_fuchsia_device::ControllerMarker;
7use fidl_fuchsia_io as fio;
8use fuchsia_fs::directory::{WatchEvent, Watcher};
9use futures::stream::{Stream, StreamExt as _, TryStreamExt as _};
10use std::path::PathBuf;
11
12// Device metadata.
13pub struct DeviceInfo<'a> {
14    // The device's file name within the directory in which it was found.
15    pub filename: &'a str,
16    // The device's topological path.
17    pub topological_path: String,
18}
19
20/// Watches the directory for a device for which the predicate returns `Some(t)`
21/// and returns `t`.
22pub async fn wait_for_device_with<T>(
23    dev_dir: &fio::DirectoryProxy,
24    predicate: impl Fn(DeviceInfo<'_>) -> Option<T>,
25) -> Result<T, anyhow::Error> {
26    let stream = watch_for_files(dev_dir).await?;
27    let stream = stream.try_filter_map(|filename| {
28        let predicate = &predicate;
29        async move {
30            let filename =
31                filename.to_str().ok_or_else(|| format_err!("to_str for filename failed"))?;
32            let controller_filename = filename.to_owned() + "/device_controller";
33
34            let (controller_proxy, server_end) =
35                fidl::endpoints::create_proxy::<ControllerMarker>();
36            #[cfg(fuchsia_api_level_at_least = "NEXT")]
37            if dev_dir
38                .open(
39                    &controller_filename,
40                    fio::Flags::PROTOCOL_SERVICE,
41                    &fio::Options::default(),
42                    server_end.into_channel(),
43                )
44                .is_err()
45            {
46                return Ok(None);
47            }
48            #[cfg(not(fuchsia_api_level_at_least = "NEXT"))]
49            if dev_dir
50                .open3(
51                    &controller_filename,
52                    fio::Flags::PROTOCOL_SERVICE,
53                    &fio::Options::default(),
54                    server_end.into_channel(),
55                )
56                .is_err()
57            {
58                return Ok(None);
59            }
60
61            let topological_path = controller_proxy.get_topological_path().await;
62            let topological_path = match topological_path {
63                Ok(topological_path) => topological_path,
64                // Special case PEER_CLOSED; the peer is expected to close the
65                // connection if it doesn't implement the controller protocol.
66                Err(err) => match err {
67                    fidl::Error::ClientChannelClosed { .. } => return Ok(None),
68                    err => {
69                        return Err(err).with_context(|| {
70                            format!("failed to send get_topological_path on \"{}\"", filename)
71                        })
72                    }
73                },
74            };
75            let topological_path = topological_path
76                .map_err(zx_status::Status::from_raw)
77                .with_context(|| format!("failed to get topological path on \"{}\"", filename))?;
78
79            Ok(predicate(DeviceInfo { filename, topological_path }))
80        }
81    });
82    futures::pin_mut!(stream);
83    let item = stream.try_next().await?;
84    item.ok_or_else(|| format_err!("stream ended prematurely"))
85}
86
87/// Returns a stream that contains the paths of any existing files and
88/// directories in `dir` and any new files or directories created after this
89/// function was invoked. These paths are relative to `dir`.
90pub async fn watch_for_files(
91    dir: &fio::DirectoryProxy,
92) -> Result<impl Stream<Item = Result<PathBuf>>> {
93    let watcher = Watcher::new(dir).await.context("failed to create watcher")?;
94    Ok(watcher.map(|result| result.context("failed to get watcher event")).try_filter_map(|msg| {
95        futures::future::ok(match msg.event {
96            WatchEvent::EXISTING | WatchEvent::ADD_FILE => {
97                if msg.filename == std::path::Path::new(".") {
98                    None
99                } else {
100                    Some(msg.filename)
101                }
102            }
103            _ => None,
104        })
105    }))
106}
107
108async fn wait_for_file(dir: &fio::DirectoryProxy, name: &str) -> Result<()> {
109    let mut watcher = fuchsia_fs::directory::Watcher::new(dir).await?;
110    while let Some(msg) = watcher.try_next().await? {
111        if msg.event != fuchsia_fs::directory::WatchEvent::EXISTING
112            && msg.event != fuchsia_fs::directory::WatchEvent::ADD_FILE
113        {
114            continue;
115        }
116        if msg.filename.to_str().unwrap() == name {
117            return Ok(());
118        }
119    }
120    unreachable!();
121}
122
123/// Open the path `name` within `dir`. This function waits for each directory to
124/// be available before it opens it. If the path never appears this function
125/// will wait forever.
126async fn recursive_wait_and_open_with_flags<T, F>(
127    mut dir: fio::DirectoryProxy,
128    name: &str,
129    flags: fio::Flags,
130    op: F,
131) -> Result<T>
132where
133    F: FnOnce(&fio::DirectoryProxy, &str, fio::Flags) -> T,
134{
135    let path = std::path::Path::new(name);
136    let mut components = path.components().peekable();
137    loop {
138        let component =
139            components.next().ok_or_else(|| format_err!("cannot wait for empty path"))?;
140        let file = match component {
141            std::path::Component::Normal(file) => file,
142            // Per fuchsia.io/Directory.Open[0]:
143            //
144            // A leading '/' is allowed (and is treated the same way as if not present, i.e.
145            // "/foo/bar' and "foo/bar" are the same).
146            //
147            // [0] https://cs.opensource.google/fuchsia/fuchsia/+/main:sdk/fidl/fuchsia.io/directory.fidl;l=211-237;drc=02426e16b637b25a21b1e53f9861855d476aaf49
148            std::path::Component::RootDir => continue,
149            component => {
150                return Err(format_err!("path contains non-normal component {:?}", component))
151            }
152        };
153        let file = file.to_str().unwrap();
154        let () = wait_for_file(&dir, file).await?;
155        if components.peek().is_some() {
156            dir = fuchsia_fs::directory::open_directory_async(&dir, file, flags)?;
157        } else {
158            break Ok(op(&dir, file, flags));
159        }
160    }
161}
162
163/// Wait for `name` to be available in `dir`. This function waits for each directory along
164/// the path and returns once it has waited on the final component in the path. If the path
165/// never appears this function will wait forever.
166pub async fn recursive_wait(dir: &fio::DirectoryProxy, name: &str) -> Result<()> {
167    recursive_wait_and_open_with_flags(Clone::clone(dir), name, fio::Flags::empty(), |_, _, _| ())
168        .await
169}
170
171/// Open the path `name` within `dir`. This function waits for each directory to
172/// be available before it opens it. If the path never appears this function
173/// will wait forever.
174pub async fn recursive_wait_and_open_directory(
175    dir: &fio::DirectoryProxy,
176    name: &str,
177) -> Result<fio::DirectoryProxy> {
178    recursive_wait_and_open_with_flags(
179        Clone::clone(dir),
180        name,
181        fio::Flags::PROTOCOL_DIRECTORY,
182        fuchsia_fs::directory::open_async::<fio::DirectoryMarker>,
183    )
184    .await
185    .and_then(|res| res.map_err(Into::into))
186}
187
188/// Connect to an instance of FIDL protocol hosted at `name` within `dir`. This function waits for
189/// each directory to be available before it opens it. If the path never appears this function will
190/// wait forever.
191pub async fn recursive_wait_and_open<P: fidl::endpoints::ProtocolMarker>(
192    dir: &fio::DirectoryProxy,
193    name: &str,
194) -> Result<P::Proxy> {
195    recursive_wait_and_open_with_flags(
196        Clone::clone(dir),
197        name,
198        fio::Flags::empty(),
199        |dir, path, _flags| {
200            // Cannot open services with other flags.
201            fuchsia_fs::directory::open_async::<P>(dir, path, fio::Flags::PROTOCOL_SERVICE)
202        },
203    )
204    .await
205    .and_then(|res| res.map_err(Into::into))
206}
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211    use futures::StreamExt;
212    use std::collections::HashSet;
213    use std::str::FromStr;
214    use std::sync::Arc;
215    use vfs::directory::entry_container::Directory;
216    use vfs::file::vmo::read_only;
217    use {fidl_fuchsia_device as fdev, fuchsia_async as fasync};
218
219    fn create_controller_service(topo_path: &'static str) -> Arc<vfs::service::Service> {
220        vfs::service::host(move |mut stream: fdev::ControllerRequestStream| async move {
221            match stream.try_next().await.unwrap() {
222                Some(fdev::ControllerRequest::GetTopologicalPath { responder }) => {
223                    let _ = responder.send(Ok(topo_path));
224                }
225                e => panic!("Unexpected request: {:?}", e),
226            }
227        })
228    }
229
230    #[fasync::run_singlethreaded(test)]
231    async fn wait_for_device_by_topological_path() {
232        let dir = vfs::pseudo_directory! {
233          "a" => vfs::pseudo_directory! {
234            "device_controller" => create_controller_service("/dev/test2/a/dev"),
235          },
236          "1" => vfs::pseudo_directory! {
237            "device_controller" => create_controller_service("/dev/test2/1/dev"),
238          },
239          "x" => vfs::pseudo_directory! {
240            "device_controller" => create_controller_service("/dev/test2/x/dev"),
241          },
242          "y" => vfs::pseudo_directory! {
243            "device_controller" => create_controller_service("/dev/test2/y/dev"),
244          },
245        };
246        let dir_proxy = vfs::directory::serve_read_only(dir);
247        let path = wait_for_device_with(&dir_proxy, |DeviceInfo { filename, topological_path }| {
248            (topological_path == "/dev/test2/x/dev").then(|| filename.to_string())
249        })
250        .await
251        .unwrap();
252        assert_eq!("x", path);
253    }
254
255    #[fasync::run_singlethreaded(test)]
256    async fn watch_for_two_files() {
257        let dir = vfs::pseudo_directory! {
258          "a" => read_only(b"/a"),
259          "b" => read_only(b"/b"),
260        };
261
262        let dir_proxy = vfs::directory::serve_read_only(dir);
263
264        let stream = watch_for_files(&dir_proxy).await.unwrap();
265        futures::pin_mut!(stream);
266        let actual: HashSet<PathBuf> =
267            vec![stream.next().await.unwrap().unwrap(), stream.next().await.unwrap().unwrap()]
268                .into_iter()
269                .collect();
270        let expected: HashSet<PathBuf> =
271            vec![PathBuf::from_str("a").unwrap(), PathBuf::from_str("b").unwrap()]
272                .into_iter()
273                .collect();
274        assert_eq!(actual, expected);
275    }
276
277    #[fasync::run_singlethreaded(test)]
278    async fn wait_for_device_topo_path_allows_files_and_dirs() {
279        let dir = vfs::pseudo_directory! {
280          "1" => vfs::pseudo_directory! {
281            "test" => read_only("test file 1"),
282            "test2" => read_only("test file 2"),
283          },
284          "2" => read_only("file 2"),
285          "x" => vfs::pseudo_directory! {
286            "device_controller" => create_controller_service("/dev/test2/x/dev"),
287          },
288          "3" => read_only("file 3"),
289        };
290
291        let dir_proxy = vfs::directory::serve_read_only(dir);
292        let path = wait_for_device_with(&dir_proxy, |DeviceInfo { filename, topological_path }| {
293            (topological_path == "/dev/test2/x/dev").then(|| filename.to_string())
294        })
295        .await
296        .unwrap();
297        assert_eq!("x", path);
298    }
299
300    #[fasync::run_singlethreaded(test)]
301    async fn open_two_directories() {
302        let (client, server) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
303
304        let root = vfs::pseudo_directory! {
305            "test" => vfs::pseudo_directory! {
306                "dir" => vfs::pseudo_directory! {},
307            },
308        };
309        let flags: fio::Flags =
310            fio::Flags::PROTOCOL_DIRECTORY | fio::PERM_READABLE | fio::Flags::PERM_EXECUTE;
311        let object_request =
312            vfs::ObjectRequest::new(flags, &Default::default(), server.into_channel());
313        object_request.handle(|object_request| {
314            root.clone().open3(
315                vfs::execution_scope::ExecutionScope::new(),
316                vfs::path::Path::dot(),
317                fio::Flags::PROTOCOL_DIRECTORY | fio::PERM_READABLE | fio::Flags::PERM_EXECUTE,
318                object_request,
319            )
320        });
321
322        let directory = recursive_wait_and_open_directory(&client, "test/dir").await.unwrap();
323        let () = directory.close().await.unwrap().unwrap();
324    }
325
326    #[fasync::run_singlethreaded(test)]
327    async fn open_directory_with_leading_slash() {
328        let (client, server) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
329
330        let root = vfs::pseudo_directory! {
331            "test" => vfs::pseudo_directory! {},
332        };
333        let flags: fio::Flags =
334            fio::Flags::PROTOCOL_DIRECTORY | fio::PERM_READABLE | fio::Flags::PERM_EXECUTE;
335        let object_request =
336            vfs::ObjectRequest::new(flags, &Default::default(), server.into_channel());
337        object_request.handle(|object_request| {
338            root.open3(
339                vfs::execution_scope::ExecutionScope::new(),
340                vfs::path::Path::dot(),
341                flags,
342                object_request,
343            )
344        });
345
346        let directory = recursive_wait_and_open_directory(&client, "/test").await.unwrap();
347        let () = directory.close().await.unwrap().unwrap();
348    }
349}