1use anyhow::{Context, Result, format_err};
6use fidl_fuchsia_device::ControllerMarker;
7use fidl_fuchsia_io as fio;
8use fuchsia_fs::directory::{WatchEvent, Watcher};
9use futures::stream::{Stream, StreamExt as _, TryStreamExt as _};
10use std::path::PathBuf;
11
12pub struct DeviceInfo<'a> {
14 pub filename: &'a str,
16 pub topological_path: String,
18}
19
20pub async fn wait_for_device_with<T>(
23 dev_dir: &fio::DirectoryProxy,
24 predicate: impl Fn(DeviceInfo<'_>) -> Option<T>,
25) -> Result<T, anyhow::Error> {
26 let stream = watch_for_files(dev_dir).await?;
27 let stream = stream.try_filter_map(|filename| {
28 let predicate = &predicate;
29 async move {
30 let filename =
31 filename.to_str().ok_or_else(|| format_err!("to_str for filename failed"))?;
32 let controller_filename = filename.to_owned() + "/device_controller";
33
34 let (controller_proxy, server_end) =
35 fidl::endpoints::create_proxy::<ControllerMarker>();
36 #[cfg(fuchsia_api_level_at_least = "27")]
37 if dev_dir
38 .open(
39 &controller_filename,
40 fio::Flags::PROTOCOL_SERVICE,
41 &fio::Options::default(),
42 server_end.into_channel(),
43 )
44 .is_err()
45 {
46 return Ok(None);
47 }
48 #[cfg(not(fuchsia_api_level_at_least = "27"))]
49 if dev_dir
50 .open3(
51 &controller_filename,
52 fio::Flags::PROTOCOL_SERVICE,
53 &fio::Options::default(),
54 server_end.into_channel(),
55 )
56 .is_err()
57 {
58 return Ok(None);
59 }
60
61 let topological_path = controller_proxy.get_topological_path().await;
62 let topological_path = match topological_path {
63 Ok(topological_path) => topological_path,
64 Err(err) => match err {
67 fidl::Error::ClientChannelClosed { .. } => return Ok(None),
68 err => {
69 return Err(err).with_context(|| {
70 format!("failed to send get_topological_path on \"{}\"", filename)
71 });
72 }
73 },
74 };
75 let topological_path = topological_path
76 .map_err(zx_status::Status::from_raw)
77 .with_context(|| format!("failed to get topological path on \"{}\"", filename))?;
78
79 Ok(predicate(DeviceInfo { filename, topological_path }))
80 }
81 });
82 futures::pin_mut!(stream);
83 let item = stream.try_next().await?;
84 item.ok_or_else(|| format_err!("stream ended prematurely"))
85}
86
87pub async fn watch_for_files(
91 dir: &fio::DirectoryProxy,
92) -> Result<impl Stream<Item = Result<PathBuf>> + use<>> {
93 let watcher = Watcher::new(dir).await.context("failed to create watcher")?;
94 Ok(watcher.map(|result| result.context("failed to get watcher event")).try_filter_map(|msg| {
95 futures::future::ok(match msg.event {
96 WatchEvent::EXISTING | WatchEvent::ADD_FILE => {
97 if msg.filename == std::path::Path::new(".") { None } else { Some(msg.filename) }
98 }
99 _ => None,
100 })
101 }))
102}
103
104async fn wait_for_file(dir: &fio::DirectoryProxy, name: &str) -> Result<()> {
105 let mut watcher = fuchsia_fs::directory::Watcher::new(dir).await?;
106 while let Some(msg) = watcher.try_next().await? {
107 if msg.event != fuchsia_fs::directory::WatchEvent::EXISTING
108 && msg.event != fuchsia_fs::directory::WatchEvent::ADD_FILE
109 {
110 continue;
111 }
112 if msg.filename.to_str().unwrap() == name {
113 return Ok(());
114 }
115 }
116 unreachable!();
117}
118
119async fn recursive_wait_and_open_with_flags<T, F>(
123 mut dir: fio::DirectoryProxy,
124 name: &str,
125 flags: fio::Flags,
126 op: F,
127) -> Result<T>
128where
129 F: FnOnce(&fio::DirectoryProxy, &str, fio::Flags) -> T,
130{
131 let path = std::path::Path::new(name);
132 let mut components = path.components().peekable();
133 loop {
134 let component =
135 components.next().ok_or_else(|| format_err!("cannot wait for empty path"))?;
136 let file = match component {
137 std::path::Component::Normal(file) => file,
138 std::path::Component::RootDir => continue,
145 component => {
146 return Err(format_err!("path contains non-normal component {:?}", component));
147 }
148 };
149 let file = file.to_str().unwrap();
150 let () = wait_for_file(&dir, file).await?;
151 if components.peek().is_some() {
152 dir = fuchsia_fs::directory::open_directory_async(&dir, file, flags)?;
153 } else {
154 break Ok(op(&dir, file, flags));
155 }
156 }
157}
158
159pub async fn recursive_wait(dir: &fio::DirectoryProxy, name: &str) -> Result<()> {
163 recursive_wait_and_open_with_flags(Clone::clone(dir), name, fio::Flags::empty(), |_, _, _| ())
164 .await
165}
166
167pub async fn recursive_wait_and_open_directory(
171 dir: &fio::DirectoryProxy,
172 name: &str,
173) -> Result<fio::DirectoryProxy> {
174 recursive_wait_and_open_with_flags(
175 Clone::clone(dir),
176 name,
177 fio::Flags::PROTOCOL_DIRECTORY,
178 fuchsia_fs::directory::open_async::<fio::DirectoryMarker>,
179 )
180 .await
181 .and_then(|res| res.map_err(Into::into))
182}
183
184pub async fn recursive_wait_and_open<P: fidl::endpoints::ProtocolMarker>(
188 dir: &fio::DirectoryProxy,
189 name: &str,
190) -> Result<P::Proxy> {
191 recursive_wait_and_open_with_flags(
192 Clone::clone(dir),
193 name,
194 fio::Flags::empty(),
195 |dir, path, _flags| {
196 fuchsia_fs::directory::open_async::<P>(dir, path, fio::Flags::PROTOCOL_SERVICE)
198 },
199 )
200 .await
201 .and_then(|res| res.map_err(Into::into))
202}
203
204#[cfg(test)]
205mod tests {
206 use super::*;
207 use futures::StreamExt;
208 use std::collections::HashSet;
209 use std::str::FromStr;
210 use std::sync::Arc;
211 use vfs::file::vmo::read_only;
212 use {fidl_fuchsia_device as fdev, fuchsia_async as fasync};
213
214 fn create_controller_service(topo_path: &'static str) -> Arc<vfs::service::Service> {
215 vfs::service::host(move |mut stream: fdev::ControllerRequestStream| async move {
216 match stream.try_next().await.unwrap() {
217 Some(fdev::ControllerRequest::GetTopologicalPath { responder }) => {
218 let _ = responder.send(Ok(topo_path));
219 }
220 e => panic!("Unexpected request: {:?}", e),
221 }
222 })
223 }
224
225 #[fasync::run_singlethreaded(test)]
226 async fn wait_for_device_by_topological_path() {
227 let dir = vfs::pseudo_directory! {
228 "a" => vfs::pseudo_directory! {
229 "device_controller" => create_controller_service("/dev/test2/a/dev"),
230 },
231 "1" => vfs::pseudo_directory! {
232 "device_controller" => create_controller_service("/dev/test2/1/dev"),
233 },
234 "x" => vfs::pseudo_directory! {
235 "device_controller" => create_controller_service("/dev/test2/x/dev"),
236 },
237 "y" => vfs::pseudo_directory! {
238 "device_controller" => create_controller_service("/dev/test2/y/dev"),
239 },
240 };
241 let dir_proxy = vfs::directory::serve_read_only(dir);
242 let path = wait_for_device_with(&dir_proxy, |DeviceInfo { filename, topological_path }| {
243 (topological_path == "/dev/test2/x/dev").then(|| filename.to_string())
244 })
245 .await
246 .unwrap();
247 assert_eq!("x", path);
248 }
249
250 #[fasync::run_singlethreaded(test)]
251 async fn watch_for_two_files() {
252 let dir = vfs::pseudo_directory! {
253 "a" => read_only(b"/a"),
254 "b" => read_only(b"/b"),
255 };
256
257 let dir_proxy = vfs::directory::serve_read_only(dir);
258
259 let stream = watch_for_files(&dir_proxy).await.unwrap();
260 futures::pin_mut!(stream);
261 let actual: HashSet<PathBuf> =
262 vec![stream.next().await.unwrap().unwrap(), stream.next().await.unwrap().unwrap()]
263 .into_iter()
264 .collect();
265 let expected: HashSet<PathBuf> =
266 vec![PathBuf::from_str("a").unwrap(), PathBuf::from_str("b").unwrap()]
267 .into_iter()
268 .collect();
269 assert_eq!(actual, expected);
270 }
271
272 #[fasync::run_singlethreaded(test)]
273 async fn wait_for_device_topo_path_allows_files_and_dirs() {
274 let dir = vfs::pseudo_directory! {
275 "1" => vfs::pseudo_directory! {
276 "test" => read_only("test file 1"),
277 "test2" => read_only("test file 2"),
278 },
279 "2" => read_only("file 2"),
280 "x" => vfs::pseudo_directory! {
281 "device_controller" => create_controller_service("/dev/test2/x/dev"),
282 },
283 "3" => read_only("file 3"),
284 };
285
286 let dir_proxy = vfs::directory::serve_read_only(dir);
287 let path = wait_for_device_with(&dir_proxy, |DeviceInfo { filename, topological_path }| {
288 (topological_path == "/dev/test2/x/dev").then(|| filename.to_string())
289 })
290 .await
291 .unwrap();
292 assert_eq!("x", path);
293 }
294
295 #[fasync::run_singlethreaded(test)]
296 async fn open_two_directories() {
297 let root = vfs::pseudo_directory! {
298 "test" => vfs::pseudo_directory! {
299 "dir" => vfs::pseudo_directory! {},
300 },
301 };
302 let client = vfs::directory::serve_read_only(root);
303 let directory = recursive_wait_and_open_directory(&client, "test/dir").await.unwrap();
304 let () = directory.close().await.unwrap().unwrap();
305 }
306
307 #[fasync::run_singlethreaded(test)]
308 async fn open_directory_with_leading_slash() {
309 let root = vfs::pseudo_directory! {
310 "test" => vfs::pseudo_directory! {},
311 };
312 let client = vfs::directory::serve_read_only(root);
313 let directory = recursive_wait_and_open_directory(&client, "/test").await.unwrap();
314 let () = directory.close().await.unwrap().unwrap();
315 }
316}