1use anyhow::{format_err, Context, Result};
6use fidl_fuchsia_device::ControllerMarker;
7use fidl_fuchsia_io as fio;
8use fuchsia_fs::directory::{WatchEvent, Watcher};
9use futures::stream::{Stream, StreamExt as _, TryStreamExt as _};
10use std::path::PathBuf;
11
12pub struct DeviceInfo<'a> {
14 pub filename: &'a str,
16 pub topological_path: String,
18}
19
20pub async fn wait_for_device_with<T>(
23 dev_dir: &fio::DirectoryProxy,
24 predicate: impl Fn(DeviceInfo<'_>) -> Option<T>,
25) -> Result<T, anyhow::Error> {
26 let stream = watch_for_files(dev_dir).await?;
27 let stream = stream.try_filter_map(|filename| {
28 let predicate = &predicate;
29 async move {
30 let filename =
31 filename.to_str().ok_or_else(|| format_err!("to_str for filename failed"))?;
32 let controller_filename = filename.to_owned() + "/device_controller";
33
34 let (controller_proxy, server_end) =
35 fidl::endpoints::create_proxy::<ControllerMarker>();
36 #[cfg(fuchsia_api_level_at_least = "NEXT")]
37 if dev_dir
38 .open(
39 &controller_filename,
40 fio::Flags::PROTOCOL_SERVICE,
41 &fio::Options::default(),
42 server_end.into_channel(),
43 )
44 .is_err()
45 {
46 return Ok(None);
47 }
48 #[cfg(not(fuchsia_api_level_at_least = "NEXT"))]
49 if dev_dir
50 .open3(
51 &controller_filename,
52 fio::Flags::PROTOCOL_SERVICE,
53 &fio::Options::default(),
54 server_end.into_channel(),
55 )
56 .is_err()
57 {
58 return Ok(None);
59 }
60
61 let topological_path = controller_proxy.get_topological_path().await;
62 let topological_path = match topological_path {
63 Ok(topological_path) => topological_path,
64 Err(err) => match err {
67 fidl::Error::ClientChannelClosed { .. } => return Ok(None),
68 err => {
69 return Err(err).with_context(|| {
70 format!("failed to send get_topological_path on \"{}\"", filename)
71 })
72 }
73 },
74 };
75 let topological_path = topological_path
76 .map_err(zx_status::Status::from_raw)
77 .with_context(|| format!("failed to get topological path on \"{}\"", filename))?;
78
79 Ok(predicate(DeviceInfo { filename, topological_path }))
80 }
81 });
82 futures::pin_mut!(stream);
83 let item = stream.try_next().await?;
84 item.ok_or_else(|| format_err!("stream ended prematurely"))
85}
86
87pub async fn watch_for_files(
91 dir: &fio::DirectoryProxy,
92) -> Result<impl Stream<Item = Result<PathBuf>>> {
93 let watcher = Watcher::new(dir).await.context("failed to create watcher")?;
94 Ok(watcher.map(|result| result.context("failed to get watcher event")).try_filter_map(|msg| {
95 futures::future::ok(match msg.event {
96 WatchEvent::EXISTING | WatchEvent::ADD_FILE => {
97 if msg.filename == std::path::Path::new(".") {
98 None
99 } else {
100 Some(msg.filename)
101 }
102 }
103 _ => None,
104 })
105 }))
106}
107
108async fn wait_for_file(dir: &fio::DirectoryProxy, name: &str) -> Result<()> {
109 let mut watcher = fuchsia_fs::directory::Watcher::new(dir).await?;
110 while let Some(msg) = watcher.try_next().await? {
111 if msg.event != fuchsia_fs::directory::WatchEvent::EXISTING
112 && msg.event != fuchsia_fs::directory::WatchEvent::ADD_FILE
113 {
114 continue;
115 }
116 if msg.filename.to_str().unwrap() == name {
117 return Ok(());
118 }
119 }
120 unreachable!();
121}
122
123async fn recursive_wait_and_open_with_flags<T, F>(
127 mut dir: fio::DirectoryProxy,
128 name: &str,
129 flags: fio::Flags,
130 op: F,
131) -> Result<T>
132where
133 F: FnOnce(&fio::DirectoryProxy, &str, fio::Flags) -> T,
134{
135 let path = std::path::Path::new(name);
136 let mut components = path.components().peekable();
137 loop {
138 let component =
139 components.next().ok_or_else(|| format_err!("cannot wait for empty path"))?;
140 let file = match component {
141 std::path::Component::Normal(file) => file,
142 std::path::Component::RootDir => continue,
149 component => {
150 return Err(format_err!("path contains non-normal component {:?}", component))
151 }
152 };
153 let file = file.to_str().unwrap();
154 let () = wait_for_file(&dir, file).await?;
155 if components.peek().is_some() {
156 dir = fuchsia_fs::directory::open_directory_async(&dir, file, flags)?;
157 } else {
158 break Ok(op(&dir, file, flags));
159 }
160 }
161}
162
163pub async fn recursive_wait(dir: &fio::DirectoryProxy, name: &str) -> Result<()> {
167 recursive_wait_and_open_with_flags(Clone::clone(dir), name, fio::Flags::empty(), |_, _, _| ())
168 .await
169}
170
171pub async fn recursive_wait_and_open_directory(
175 dir: &fio::DirectoryProxy,
176 name: &str,
177) -> Result<fio::DirectoryProxy> {
178 recursive_wait_and_open_with_flags(
179 Clone::clone(dir),
180 name,
181 fio::Flags::PROTOCOL_DIRECTORY,
182 fuchsia_fs::directory::open_async::<fio::DirectoryMarker>,
183 )
184 .await
185 .and_then(|res| res.map_err(Into::into))
186}
187
188pub async fn recursive_wait_and_open<P: fidl::endpoints::ProtocolMarker>(
192 dir: &fio::DirectoryProxy,
193 name: &str,
194) -> Result<P::Proxy> {
195 recursive_wait_and_open_with_flags(
196 Clone::clone(dir),
197 name,
198 fio::Flags::empty(),
199 |dir, path, _flags| {
200 fuchsia_fs::directory::open_async::<P>(dir, path, fio::Flags::PROTOCOL_SERVICE)
202 },
203 )
204 .await
205 .and_then(|res| res.map_err(Into::into))
206}
207
208#[cfg(test)]
209mod tests {
210 use super::*;
211 use futures::StreamExt;
212 use std::collections::HashSet;
213 use std::str::FromStr;
214 use std::sync::Arc;
215 use vfs::file::vmo::read_only;
216 use {fidl_fuchsia_device as fdev, fuchsia_async as fasync};
217
218 fn create_controller_service(topo_path: &'static str) -> Arc<vfs::service::Service> {
219 vfs::service::host(move |mut stream: fdev::ControllerRequestStream| async move {
220 match stream.try_next().await.unwrap() {
221 Some(fdev::ControllerRequest::GetTopologicalPath { responder }) => {
222 let _ = responder.send(Ok(topo_path));
223 }
224 e => panic!("Unexpected request: {:?}", e),
225 }
226 })
227 }
228
229 #[fasync::run_singlethreaded(test)]
230 async fn wait_for_device_by_topological_path() {
231 let dir = vfs::pseudo_directory! {
232 "a" => vfs::pseudo_directory! {
233 "device_controller" => create_controller_service("/dev/test2/a/dev"),
234 },
235 "1" => vfs::pseudo_directory! {
236 "device_controller" => create_controller_service("/dev/test2/1/dev"),
237 },
238 "x" => vfs::pseudo_directory! {
239 "device_controller" => create_controller_service("/dev/test2/x/dev"),
240 },
241 "y" => vfs::pseudo_directory! {
242 "device_controller" => create_controller_service("/dev/test2/y/dev"),
243 },
244 };
245 let dir_proxy = vfs::directory::serve_read_only(dir);
246 let path = wait_for_device_with(&dir_proxy, |DeviceInfo { filename, topological_path }| {
247 (topological_path == "/dev/test2/x/dev").then(|| filename.to_string())
248 })
249 .await
250 .unwrap();
251 assert_eq!("x", path);
252 }
253
254 #[fasync::run_singlethreaded(test)]
255 async fn watch_for_two_files() {
256 let dir = vfs::pseudo_directory! {
257 "a" => read_only(b"/a"),
258 "b" => read_only(b"/b"),
259 };
260
261 let dir_proxy = vfs::directory::serve_read_only(dir);
262
263 let stream = watch_for_files(&dir_proxy).await.unwrap();
264 futures::pin_mut!(stream);
265 let actual: HashSet<PathBuf> =
266 vec![stream.next().await.unwrap().unwrap(), stream.next().await.unwrap().unwrap()]
267 .into_iter()
268 .collect();
269 let expected: HashSet<PathBuf> =
270 vec![PathBuf::from_str("a").unwrap(), PathBuf::from_str("b").unwrap()]
271 .into_iter()
272 .collect();
273 assert_eq!(actual, expected);
274 }
275
276 #[fasync::run_singlethreaded(test)]
277 async fn wait_for_device_topo_path_allows_files_and_dirs() {
278 let dir = vfs::pseudo_directory! {
279 "1" => vfs::pseudo_directory! {
280 "test" => read_only("test file 1"),
281 "test2" => read_only("test file 2"),
282 },
283 "2" => read_only("file 2"),
284 "x" => vfs::pseudo_directory! {
285 "device_controller" => create_controller_service("/dev/test2/x/dev"),
286 },
287 "3" => read_only("file 3"),
288 };
289
290 let dir_proxy = vfs::directory::serve_read_only(dir);
291 let path = wait_for_device_with(&dir_proxy, |DeviceInfo { filename, topological_path }| {
292 (topological_path == "/dev/test2/x/dev").then(|| filename.to_string())
293 })
294 .await
295 .unwrap();
296 assert_eq!("x", path);
297 }
298
299 #[fasync::run_singlethreaded(test)]
300 async fn open_two_directories() {
301 let root = vfs::pseudo_directory! {
302 "test" => vfs::pseudo_directory! {
303 "dir" => vfs::pseudo_directory! {},
304 },
305 };
306 let client = vfs::directory::serve_read_only(root);
307 let directory = recursive_wait_and_open_directory(&client, "test/dir").await.unwrap();
308 let () = directory.close().await.unwrap().unwrap();
309 }
310
311 #[fasync::run_singlethreaded(test)]
312 async fn open_directory_with_leading_slash() {
313 let root = vfs::pseudo_directory! {
314 "test" => vfs::pseudo_directory! {},
315 };
316 let client = vfs::directory::serve_read_only(root);
317 let directory = recursive_wait_and_open_directory(&client, "/test").await.unwrap();
318 let () = directory.close().await.unwrap().unwrap();
319 }
320}