1use anyhow::{format_err, Context, Result};
6use fidl_fuchsia_device::ControllerMarker;
7use fidl_fuchsia_io as fio;
8use fuchsia_fs::directory::{WatchEvent, Watcher};
9use futures::stream::{Stream, StreamExt as _, TryStreamExt as _};
10use std::path::PathBuf;
11
12pub struct DeviceInfo<'a> {
14 pub filename: &'a str,
16 pub topological_path: String,
18}
19
20pub async fn wait_for_device_with<T>(
23 dev_dir: &fio::DirectoryProxy,
24 predicate: impl Fn(DeviceInfo<'_>) -> Option<T>,
25) -> Result<T, anyhow::Error> {
26 let stream = watch_for_files(dev_dir).await?;
27 let stream = stream.try_filter_map(|filename| {
28 let predicate = &predicate;
29 async move {
30 let filename =
31 filename.to_str().ok_or_else(|| format_err!("to_str for filename failed"))?;
32 let controller_filename = filename.to_owned() + "/device_controller";
33
34 let (controller_proxy, server_end) =
35 fidl::endpoints::create_proxy::<ControllerMarker>();
36 #[cfg(fuchsia_api_level_at_least = "NEXT")]
37 if dev_dir
38 .open(
39 &controller_filename,
40 fio::Flags::PROTOCOL_SERVICE,
41 &fio::Options::default(),
42 server_end.into_channel(),
43 )
44 .is_err()
45 {
46 return Ok(None);
47 }
48 #[cfg(not(fuchsia_api_level_at_least = "NEXT"))]
49 if dev_dir
50 .open3(
51 &controller_filename,
52 fio::Flags::PROTOCOL_SERVICE,
53 &fio::Options::default(),
54 server_end.into_channel(),
55 )
56 .is_err()
57 {
58 return Ok(None);
59 }
60
61 let topological_path = controller_proxy.get_topological_path().await;
62 let topological_path = match topological_path {
63 Ok(topological_path) => topological_path,
64 Err(err) => match err {
67 fidl::Error::ClientChannelClosed { .. } => return Ok(None),
68 err => {
69 return Err(err).with_context(|| {
70 format!("failed to send get_topological_path on \"{}\"", filename)
71 })
72 }
73 },
74 };
75 let topological_path = topological_path
76 .map_err(zx_status::Status::from_raw)
77 .with_context(|| format!("failed to get topological path on \"{}\"", filename))?;
78
79 Ok(predicate(DeviceInfo { filename, topological_path }))
80 }
81 });
82 futures::pin_mut!(stream);
83 let item = stream.try_next().await?;
84 item.ok_or_else(|| format_err!("stream ended prematurely"))
85}
86
87pub async fn watch_for_files(
91 dir: &fio::DirectoryProxy,
92) -> Result<impl Stream<Item = Result<PathBuf>>> {
93 let watcher = Watcher::new(dir).await.context("failed to create watcher")?;
94 Ok(watcher.map(|result| result.context("failed to get watcher event")).try_filter_map(|msg| {
95 futures::future::ok(match msg.event {
96 WatchEvent::EXISTING | WatchEvent::ADD_FILE => {
97 if msg.filename == std::path::Path::new(".") {
98 None
99 } else {
100 Some(msg.filename)
101 }
102 }
103 _ => None,
104 })
105 }))
106}
107
108async fn wait_for_file(dir: &fio::DirectoryProxy, name: &str) -> Result<()> {
109 let mut watcher = fuchsia_fs::directory::Watcher::new(dir).await?;
110 while let Some(msg) = watcher.try_next().await? {
111 if msg.event != fuchsia_fs::directory::WatchEvent::EXISTING
112 && msg.event != fuchsia_fs::directory::WatchEvent::ADD_FILE
113 {
114 continue;
115 }
116 if msg.filename.to_str().unwrap() == name {
117 return Ok(());
118 }
119 }
120 unreachable!();
121}
122
123async fn recursive_wait_and_open_with_flags<T, F>(
127 mut dir: fio::DirectoryProxy,
128 name: &str,
129 flags: fio::Flags,
130 op: F,
131) -> Result<T>
132where
133 F: FnOnce(&fio::DirectoryProxy, &str, fio::Flags) -> T,
134{
135 let path = std::path::Path::new(name);
136 let mut components = path.components().peekable();
137 loop {
138 let component =
139 components.next().ok_or_else(|| format_err!("cannot wait for empty path"))?;
140 let file = match component {
141 std::path::Component::Normal(file) => file,
142 std::path::Component::RootDir => continue,
149 component => {
150 return Err(format_err!("path contains non-normal component {:?}", component))
151 }
152 };
153 let file = file.to_str().unwrap();
154 let () = wait_for_file(&dir, file).await?;
155 if components.peek().is_some() {
156 dir = fuchsia_fs::directory::open_directory_async(&dir, file, flags)?;
157 } else {
158 break Ok(op(&dir, file, flags));
159 }
160 }
161}
162
163pub async fn recursive_wait(dir: &fio::DirectoryProxy, name: &str) -> Result<()> {
167 recursive_wait_and_open_with_flags(Clone::clone(dir), name, fio::Flags::empty(), |_, _, _| ())
168 .await
169}
170
171pub async fn recursive_wait_and_open_directory(
175 dir: &fio::DirectoryProxy,
176 name: &str,
177) -> Result<fio::DirectoryProxy> {
178 recursive_wait_and_open_with_flags(
179 Clone::clone(dir),
180 name,
181 fio::Flags::PROTOCOL_DIRECTORY,
182 fuchsia_fs::directory::open_async::<fio::DirectoryMarker>,
183 )
184 .await
185 .and_then(|res| res.map_err(Into::into))
186}
187
188pub async fn recursive_wait_and_open<P: fidl::endpoints::ProtocolMarker>(
192 dir: &fio::DirectoryProxy,
193 name: &str,
194) -> Result<P::Proxy> {
195 recursive_wait_and_open_with_flags(
196 Clone::clone(dir),
197 name,
198 fio::Flags::empty(),
199 |dir, path, _flags| {
200 fuchsia_fs::directory::open_async::<P>(dir, path, fio::Flags::PROTOCOL_SERVICE)
202 },
203 )
204 .await
205 .and_then(|res| res.map_err(Into::into))
206}
207
208#[cfg(test)]
209mod tests {
210 use super::*;
211 use futures::StreamExt;
212 use std::collections::HashSet;
213 use std::str::FromStr;
214 use std::sync::Arc;
215 use vfs::directory::entry_container::Directory;
216 use vfs::file::vmo::read_only;
217 use {fidl_fuchsia_device as fdev, fuchsia_async as fasync};
218
219 fn create_controller_service(topo_path: &'static str) -> Arc<vfs::service::Service> {
220 vfs::service::host(move |mut stream: fdev::ControllerRequestStream| async move {
221 match stream.try_next().await.unwrap() {
222 Some(fdev::ControllerRequest::GetTopologicalPath { responder }) => {
223 let _ = responder.send(Ok(topo_path));
224 }
225 e => panic!("Unexpected request: {:?}", e),
226 }
227 })
228 }
229
230 #[fasync::run_singlethreaded(test)]
231 async fn wait_for_device_by_topological_path() {
232 let dir = vfs::pseudo_directory! {
233 "a" => vfs::pseudo_directory! {
234 "device_controller" => create_controller_service("/dev/test2/a/dev"),
235 },
236 "1" => vfs::pseudo_directory! {
237 "device_controller" => create_controller_service("/dev/test2/1/dev"),
238 },
239 "x" => vfs::pseudo_directory! {
240 "device_controller" => create_controller_service("/dev/test2/x/dev"),
241 },
242 "y" => vfs::pseudo_directory! {
243 "device_controller" => create_controller_service("/dev/test2/y/dev"),
244 },
245 };
246 let dir_proxy = vfs::directory::serve_read_only(dir);
247 let path = wait_for_device_with(&dir_proxy, |DeviceInfo { filename, topological_path }| {
248 (topological_path == "/dev/test2/x/dev").then(|| filename.to_string())
249 })
250 .await
251 .unwrap();
252 assert_eq!("x", path);
253 }
254
255 #[fasync::run_singlethreaded(test)]
256 async fn watch_for_two_files() {
257 let dir = vfs::pseudo_directory! {
258 "a" => read_only(b"/a"),
259 "b" => read_only(b"/b"),
260 };
261
262 let dir_proxy = vfs::directory::serve_read_only(dir);
263
264 let stream = watch_for_files(&dir_proxy).await.unwrap();
265 futures::pin_mut!(stream);
266 let actual: HashSet<PathBuf> =
267 vec![stream.next().await.unwrap().unwrap(), stream.next().await.unwrap().unwrap()]
268 .into_iter()
269 .collect();
270 let expected: HashSet<PathBuf> =
271 vec![PathBuf::from_str("a").unwrap(), PathBuf::from_str("b").unwrap()]
272 .into_iter()
273 .collect();
274 assert_eq!(actual, expected);
275 }
276
277 #[fasync::run_singlethreaded(test)]
278 async fn wait_for_device_topo_path_allows_files_and_dirs() {
279 let dir = vfs::pseudo_directory! {
280 "1" => vfs::pseudo_directory! {
281 "test" => read_only("test file 1"),
282 "test2" => read_only("test file 2"),
283 },
284 "2" => read_only("file 2"),
285 "x" => vfs::pseudo_directory! {
286 "device_controller" => create_controller_service("/dev/test2/x/dev"),
287 },
288 "3" => read_only("file 3"),
289 };
290
291 let dir_proxy = vfs::directory::serve_read_only(dir);
292 let path = wait_for_device_with(&dir_proxy, |DeviceInfo { filename, topological_path }| {
293 (topological_path == "/dev/test2/x/dev").then(|| filename.to_string())
294 })
295 .await
296 .unwrap();
297 assert_eq!("x", path);
298 }
299
300 #[fasync::run_singlethreaded(test)]
301 async fn open_two_directories() {
302 let (client, server) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
303
304 let root = vfs::pseudo_directory! {
305 "test" => vfs::pseudo_directory! {
306 "dir" => vfs::pseudo_directory! {},
307 },
308 };
309 let flags: fio::Flags =
310 fio::Flags::PROTOCOL_DIRECTORY | fio::PERM_READABLE | fio::Flags::PERM_EXECUTE;
311 let object_request =
312 vfs::ObjectRequest::new(flags, &Default::default(), server.into_channel());
313 object_request.handle(|object_request| {
314 root.clone().open3(
315 vfs::execution_scope::ExecutionScope::new(),
316 vfs::path::Path::dot(),
317 fio::Flags::PROTOCOL_DIRECTORY | fio::PERM_READABLE | fio::Flags::PERM_EXECUTE,
318 object_request,
319 )
320 });
321
322 let directory = recursive_wait_and_open_directory(&client, "test/dir").await.unwrap();
323 let () = directory.close().await.unwrap().unwrap();
324 }
325
326 #[fasync::run_singlethreaded(test)]
327 async fn open_directory_with_leading_slash() {
328 let (client, server) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
329
330 let root = vfs::pseudo_directory! {
331 "test" => vfs::pseudo_directory! {},
332 };
333 let flags: fio::Flags =
334 fio::Flags::PROTOCOL_DIRECTORY | fio::PERM_READABLE | fio::Flags::PERM_EXECUTE;
335 let object_request =
336 vfs::ObjectRequest::new(flags, &Default::default(), server.into_channel());
337 object_request.handle(|object_request| {
338 root.open3(
339 vfs::execution_scope::ExecutionScope::new(),
340 vfs::path::Path::dot(),
341 flags,
342 object_request,
343 )
344 });
345
346 let directory = recursive_wait_and_open_directory(&client, "/test").await.unwrap();
347 let () = directory.close().await.unwrap().unwrap();
348 }
349}