1use anyhow::{Context, Result, format_err};
6use fidl_fuchsia_device::ControllerMarker;
7use fidl_fuchsia_io as fio;
8use fuchsia_fs::directory::{WatchEvent, Watcher};
9use futures::stream::{Stream, StreamExt as _, TryStreamExt as _};
10use std::path::PathBuf;
11
12pub struct DeviceInfo<'a> {
14 pub filename: &'a str,
16 pub topological_path: String,
18}
19
20pub async fn wait_for_device_with<T>(
23 dev_dir: &fio::DirectoryProxy,
24 predicate: impl Fn(DeviceInfo<'_>) -> Option<T>,
25) -> Result<T, anyhow::Error> {
26 let stream = watch_for_files(dev_dir).await?;
27 let stream = stream.try_filter_map(|filename| {
28 let predicate = &predicate;
29 async move {
30 let filename =
31 filename.to_str().ok_or_else(|| format_err!("to_str for filename failed"))?;
32 let controller_filename = filename.to_owned() + "/device_controller";
33
34 let (controller_proxy, server_end) =
35 fidl::endpoints::create_proxy::<ControllerMarker>();
36 #[cfg(fuchsia_api_level_at_least = "27")]
37 if dev_dir
38 .open(
39 &controller_filename,
40 fio::Flags::PROTOCOL_SERVICE,
41 &fio::Options::default(),
42 server_end.into_channel(),
43 )
44 .is_err()
45 {
46 return Ok(None);
47 }
48 #[cfg(not(fuchsia_api_level_at_least = "27"))]
49 if dev_dir
50 .open3(
51 &controller_filename,
52 fio::Flags::PROTOCOL_SERVICE,
53 &fio::Options::default(),
54 server_end.into_channel(),
55 )
56 .is_err()
57 {
58 return Ok(None);
59 }
60
61 let topological_path = controller_proxy.get_topological_path().await;
62 let topological_path = match topological_path {
63 Ok(topological_path) => topological_path,
64 Err(err) => match err {
67 fidl::Error::ClientChannelClosed { .. } => return Ok(None),
68 err => {
69 return Err(err).with_context(|| {
70 format!("failed to send get_topological_path on \"{}\"", filename)
71 });
72 }
73 },
74 };
75 let topological_path = topological_path
76 .map_err(zx_status::Status::from_raw)
77 .with_context(|| format!("failed to get topological path on \"{}\"", filename))?;
78
79 Ok(predicate(DeviceInfo { filename, topological_path }))
80 }
81 });
82 futures::pin_mut!(stream);
83 let item = stream.try_next().await?;
84 item.ok_or_else(|| format_err!("stream ended prematurely"))
85}
86
87pub async fn watch_for_files(
91 dir: &fio::DirectoryProxy,
92) -> Result<impl Stream<Item = Result<PathBuf>> + use<>> {
93 let watcher = Watcher::new(dir).await.context("failed to create watcher")?;
94 Ok(watcher.map(|result| result.context("failed to get watcher event")).try_filter_map(|msg| {
95 futures::future::ok(match msg.event {
96 WatchEvent::EXISTING | WatchEvent::ADD_FILE => {
97 if msg.filename == std::path::Path::new(".") { None } else { Some(msg.filename) }
98 }
99 _ => None,
100 })
101 }))
102}
103
104async fn wait_for_file(dir: &fio::DirectoryProxy, name: &str) -> Result<()> {
105 let mut watcher = fuchsia_fs::directory::Watcher::new(dir).await?;
106 while let Some(msg) = watcher.try_next().await? {
107 if msg.event != fuchsia_fs::directory::WatchEvent::EXISTING
108 && msg.event != fuchsia_fs::directory::WatchEvent::ADD_FILE
109 {
110 continue;
111 }
112 if msg.filename.to_str().unwrap() == name {
113 return Ok(());
114 }
115 }
116 unreachable!();
117}
118
119async fn recursive_wait_and_open_with_flags<T, F>(
123 mut dir: fio::DirectoryProxy,
124 name: &str,
125 flags: fio::Flags,
126 op: F,
127) -> Result<T>
128where
129 F: FnOnce(&fio::DirectoryProxy, &str, fio::Flags) -> T,
130{
131 let path = std::path::Path::new(name);
132 let mut components = path.components().peekable();
133 loop {
134 let component =
135 components.next().ok_or_else(|| format_err!("cannot wait for empty path"))?;
136 let file = match component {
137 std::path::Component::Normal(file) => file,
138 std::path::Component::RootDir => continue,
145 component => {
146 return Err(format_err!("path contains non-normal component {:?}", component));
147 }
148 };
149 let file = file.to_str().unwrap();
150 let () = wait_for_file(&dir, file).await?;
151 if components.peek().is_some() {
152 dir = fuchsia_fs::directory::open_directory_async(&dir, file, flags)?;
153 } else {
154 break Ok(op(&dir, file, flags));
155 }
156 }
157}
158
159pub async fn recursive_wait(dir: &fio::DirectoryProxy, name: &str) -> Result<()> {
163 recursive_wait_and_open_with_flags(Clone::clone(dir), name, fio::Flags::empty(), |_, _, _| ())
164 .await
165}
166
167pub async fn recursive_wait_and_open_directory(
171 dir: &fio::DirectoryProxy,
172 name: &str,
173) -> Result<fio::DirectoryProxy> {
174 recursive_wait_and_open_with_flags(
175 Clone::clone(dir),
176 name,
177 fio::Flags::PROTOCOL_DIRECTORY,
178 fuchsia_fs::directory::open_async::<fio::DirectoryMarker>,
179 )
180 .await
181 .and_then(|res| res.map_err(Into::into))
182}
183
184pub async fn recursive_wait_and_open<P: fidl::endpoints::ProtocolMarker>(
188 dir: &fio::DirectoryProxy,
189 name: &str,
190) -> Result<P::Proxy> {
191 recursive_wait_and_open_with_flags(
192 Clone::clone(dir),
193 name,
194 fio::Flags::empty(),
195 |dir, path, _flags| {
196 fuchsia_fs::directory::open_async::<P>(dir, path, fio::Flags::PROTOCOL_SERVICE)
198 },
199 )
200 .await
201 .and_then(|res| res.map_err(Into::into))
202}
203
204#[cfg(test)]
205mod tests {
206 use super::*;
207 use fidl_fuchsia_device as fdev;
208 use fuchsia_async as fasync;
209 use futures::StreamExt;
210 use std::collections::HashSet;
211 use std::str::FromStr;
212 use std::sync::Arc;
213 use vfs::file::vmo::read_only;
214
215 fn create_controller_service(topo_path: &'static str) -> Arc<vfs::service::Service> {
216 vfs::service::host(move |mut stream: fdev::ControllerRequestStream| async move {
217 match stream.try_next().await.unwrap() {
218 Some(fdev::ControllerRequest::GetTopologicalPath { responder }) => {
219 let _ = responder.send(Ok(topo_path));
220 }
221 e => panic!("Unexpected request: {:?}", e),
222 }
223 })
224 }
225
226 #[fasync::run_singlethreaded(test)]
227 async fn wait_for_device_by_topological_path() {
228 let dir = vfs::pseudo_directory! {
229 "a" => vfs::pseudo_directory! {
230 "device_controller" => create_controller_service("/dev/test2/a/dev"),
231 },
232 "1" => vfs::pseudo_directory! {
233 "device_controller" => create_controller_service("/dev/test2/1/dev"),
234 },
235 "x" => vfs::pseudo_directory! {
236 "device_controller" => create_controller_service("/dev/test2/x/dev"),
237 },
238 "y" => vfs::pseudo_directory! {
239 "device_controller" => create_controller_service("/dev/test2/y/dev"),
240 },
241 };
242 let dir_proxy =
243 vfs::directory::serve_read_only(dir, vfs::execution_scope::ExecutionScope::new());
244 let path = wait_for_device_with(&dir_proxy, |DeviceInfo { filename, topological_path }| {
245 (topological_path == "/dev/test2/x/dev").then(|| filename.to_string())
246 })
247 .await
248 .unwrap();
249 assert_eq!("x", path);
250 }
251
252 #[fasync::run_singlethreaded(test)]
253 async fn watch_for_two_files() {
254 let dir = vfs::pseudo_directory! {
255 "a" => read_only(b"/a"),
256 "b" => read_only(b"/b"),
257 };
258
259 let dir_proxy =
260 vfs::directory::serve_read_only(dir, vfs::execution_scope::ExecutionScope::new());
261
262 let stream = watch_for_files(&dir_proxy).await.unwrap();
263 futures::pin_mut!(stream);
264 let actual: HashSet<PathBuf> =
265 vec![stream.next().await.unwrap().unwrap(), stream.next().await.unwrap().unwrap()]
266 .into_iter()
267 .collect();
268 let expected: HashSet<PathBuf> =
269 vec![PathBuf::from_str("a").unwrap(), PathBuf::from_str("b").unwrap()]
270 .into_iter()
271 .collect();
272 assert_eq!(actual, expected);
273 }
274
275 #[fasync::run_singlethreaded(test)]
276 async fn wait_for_device_topo_path_allows_files_and_dirs() {
277 let dir = vfs::pseudo_directory! {
278 "1" => vfs::pseudo_directory! {
279 "test" => read_only("test file 1"),
280 "test2" => read_only("test file 2"),
281 },
282 "2" => read_only("file 2"),
283 "x" => vfs::pseudo_directory! {
284 "device_controller" => create_controller_service("/dev/test2/x/dev"),
285 },
286 "3" => read_only("file 3"),
287 };
288
289 let dir_proxy =
290 vfs::directory::serve_read_only(dir, vfs::execution_scope::ExecutionScope::new());
291 let path = wait_for_device_with(&dir_proxy, |DeviceInfo { filename, topological_path }| {
292 (topological_path == "/dev/test2/x/dev").then(|| filename.to_string())
293 })
294 .await
295 .unwrap();
296 assert_eq!("x", path);
297 }
298
299 #[fasync::run_singlethreaded(test)]
300 async fn open_two_directories() {
301 let root = vfs::pseudo_directory! {
302 "test" => vfs::pseudo_directory! {
303 "dir" => vfs::pseudo_directory! {},
304 },
305 };
306 let client =
307 vfs::directory::serve_read_only(root, vfs::execution_scope::ExecutionScope::new());
308 let directory = recursive_wait_and_open_directory(&client, "test/dir").await.unwrap();
309 let () = directory.close().await.unwrap().unwrap();
310 }
311
312 #[fasync::run_singlethreaded(test)]
313 async fn open_directory_with_leading_slash() {
314 let root = vfs::pseudo_directory! {
315 "test" => vfs::pseudo_directory! {},
316 };
317 let client =
318 vfs::directory::serve_read_only(root, vfs::execution_scope::ExecutionScope::new());
319 let directory = recursive_wait_and_open_directory(&client, "/test").await.unwrap();
320 let () = directory.close().await.unwrap().unwrap();
321 }
322}