1#![deny(missing_docs)]
8
9use flex_client::{MessageBuf, ProxyHasDomain};
10use flex_fuchsia_io as fio;
11use futures::stream::{FusedStream, Stream};
12use std::ffi::OsStr;
13use std::os::unix::ffi::OsStrExt;
14use std::path::PathBuf;
15use std::pin::Pin;
16use std::task::{Context, Poll};
17use thiserror::Error;
18use zx_status::assoc_values;
19
20#[cfg(not(feature = "fdomain"))]
21use fuchsia_async as fasync;
22
23#[derive(Debug, Error, Clone)]
24#[allow(missing_docs)]
25pub enum WatcherCreateError {
26 #[error("while sending watch request: {0}")]
27 SendWatchRequest(#[source] fidl::Error),
28
29 #[error("watch failed with status: {0}")]
30 WatchError(#[source] zx_status::Status),
31
32 #[error("while converting client end to fasync channel: {0}")]
33 ChannelConversion(#[source] zx_status::Status),
34}
35
36#[derive(Debug, Error)]
37#[cfg_attr(not(feature = "fdomain"), derive(Eq, PartialEq))]
38#[allow(missing_docs)]
39pub enum WatcherStreamError {
40 #[cfg(not(feature = "fdomain"))]
41 #[error("read from watch channel failed with status: {0}")]
42 ChannelRead(#[from] zx_status::Status),
43 #[cfg(feature = "fdomain")]
44 #[error("read from watch channel failed: {0}")]
45 ChannelRead(#[from] flex_client::Error),
46}
47
48#[repr(C)]
50#[derive(Copy, Clone, Eq, PartialEq)]
51pub struct WatchEvent(fio::WatchEvent);
52
53assoc_values!(WatchEvent, [
54 DELETED = fio::WatchEvent::Deleted;
57 ADD_FILE = fio::WatchEvent::Added;
59 REMOVE_FILE = fio::WatchEvent::Removed;
61 EXISTING = fio::WatchEvent::Existing;
63 IDLE = fio::WatchEvent::Idle;
65]);
66
67#[derive(Debug, Eq, PartialEq)]
70pub struct WatchMessage {
71 pub event: WatchEvent,
73 pub filename: PathBuf,
75}
76
77#[derive(Debug, Eq, PartialEq)]
78enum WatcherState {
79 Watching,
80 TerminateOnNextPoll,
81 Terminated,
82}
83
84#[derive(Debug)]
88#[must_use = "futures/streams must be polled"]
89pub struct Watcher {
90 ch: flex_client::AsyncChannel,
91 buf: MessageBuf,
93 idx: usize,
94 state: WatcherState,
95}
96
97impl Unpin for Watcher {}
98
99impl Watcher {
100 pub async fn new(dir: &fio::DirectoryProxy) -> Result<Watcher, WatcherCreateError> {
102 let (client_end, server_end) = dir.domain().create_endpoints();
103 let options = 0u32;
104 let status = dir
105 .watch(fio::WatchMask::all(), options, server_end)
106 .await
107 .map_err(WatcherCreateError::SendWatchRequest)?;
108 zx_status::Status::ok(status).map_err(WatcherCreateError::WatchError)?;
109 let mut buf = MessageBuf::new();
110 buf.ensure_capacity_bytes(fio::MAX_BUF as usize);
111 Ok(Watcher {
112 #[cfg(not(feature = "fdomain"))]
113 ch: fasync::Channel::from_channel(client_end.into_channel()),
114 #[cfg(feature = "fdomain")]
115 ch: client_end.into_channel(),
116 buf,
117 idx: 0,
118 state: WatcherState::Watching,
119 })
120 }
121
122 fn reset_buf(&mut self) {
123 self.idx = 0;
124 self.buf.clear();
125 }
126
127 fn get_next_msg(&mut self) -> WatchMessage {
128 assert!(self.idx < self.buf.bytes().len());
129 let next_msg = VfsWatchMsg::from_raw(&self.buf.bytes()[self.idx..])
130 .expect("Invalid buffer received by Watcher!");
131 self.idx += next_msg.len();
132
133 let mut pathbuf = PathBuf::new();
134 pathbuf.push(OsStr::from_bytes(next_msg.name()));
135 let event = next_msg.event();
136 WatchMessage { event, filename: pathbuf }
137 }
138}
139
140impl FusedStream for Watcher {
141 fn is_terminated(&self) -> bool {
142 self.state == WatcherState::Terminated
143 }
144}
145
146impl Stream for Watcher {
147 type Item = Result<WatchMessage, WatcherStreamError>;
148
149 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
150 let this = &mut *self;
151 if this.state == WatcherState::TerminateOnNextPoll {
154 this.state = WatcherState::Terminated;
155 }
156 if this.state == WatcherState::Terminated {
157 return Poll::Ready(None);
158 }
159 if this.idx >= this.buf.bytes().len() {
160 this.reset_buf();
161 }
162 if this.idx == 0 {
163 match this.ch.recv_from(cx, &mut this.buf) {
164 Poll::Ready(Ok(())) => {}
165 Poll::Ready(Err(e)) => {
166 self.state = WatcherState::TerminateOnNextPoll;
167 return Poll::Ready(Some(Err(e.into())));
168 }
169 Poll::Pending => return Poll::Pending,
170 }
171 }
172 Poll::Ready(Some(Ok(this.get_next_msg())))
173 }
174}
175
176#[repr(C)]
177#[derive(Default)]
178struct IncompleteArrayField<T>(::std::marker::PhantomData<T>);
179impl<T> IncompleteArrayField<T> {
180 #[inline]
181 pub unsafe fn as_ptr(&self) -> *const T {
182 ::std::mem::transmute(self)
183 }
184 #[inline]
185 pub unsafe fn as_slice(&self, len: usize) -> &[T] {
186 ::std::slice::from_raw_parts(self.as_ptr(), len)
187 }
188}
189impl<T> ::std::fmt::Debug for IncompleteArrayField<T> {
190 fn fmt(&self, fmt: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
191 fmt.write_str("IncompleteArrayField")
192 }
193}
194
195#[repr(C)]
196#[derive(Debug)]
197struct vfs_watch_msg_t {
198 event: fio::WatchEvent,
199 len: u8,
200 name: IncompleteArrayField<u8>,
201}
202
203#[derive(Debug)]
204struct VfsWatchMsg<'a> {
205 inner: &'a vfs_watch_msg_t,
206}
207
208impl<'a> VfsWatchMsg<'a> {
209 fn from_raw(buf: &'a [u8]) -> Option<VfsWatchMsg<'a>> {
210 if buf.len() < ::std::mem::size_of::<vfs_watch_msg_t>() {
211 return None;
212 }
213 let m = unsafe { VfsWatchMsg { inner: &*(buf.as_ptr() as *const vfs_watch_msg_t) } };
217 if buf.len() < ::std::mem::size_of::<vfs_watch_msg_t>() + m.namelen() {
218 return None;
219 }
220 Some(m)
221 }
222
223 fn len(&self) -> usize {
224 ::std::mem::size_of::<vfs_watch_msg_t>() + self.namelen()
225 }
226
227 fn event(&self) -> WatchEvent {
228 WatchEvent(self.inner.event)
229 }
230
231 fn namelen(&self) -> usize {
232 self.inner.len as usize
233 }
234
235 fn name(&self) -> &'a [u8] {
236 unsafe { self.inner.name.as_slice(self.namelen()) }
239 }
240}
241
242#[cfg(test)]
243mod tests {
244 use super::*;
245 use assert_matches::assert_matches;
246 use fuchsia_async::{DurationExt, TimeoutExt};
247
248 use futures::prelude::*;
249 use std::fmt::Debug;
250 use std::fs::File;
251 use std::path::Path;
252 use std::sync::Arc;
253 use tempfile::tempdir;
254 use vfs::directory::dirents_sink;
255 use vfs::directory::entry::{EntryInfo, GetEntryInfo};
256 use vfs::directory::entry_container::{Directory, DirectoryWatcher};
257 use vfs::directory::immutable::connection::ImmutableConnection;
258 use vfs::directory::traversal_position::TraversalPosition;
259 use vfs::execution_scope::ExecutionScope;
260 use vfs::node::Node;
261 use vfs::ObjectRequestRef;
262
263 fn one_step<'a, S, OK, ERR>(s: &'a mut S) -> impl Future<Output = OK> + 'a
264 where
265 S: Stream<Item = Result<OK, ERR>> + Unpin,
266 ERR: Debug,
267 {
268 let f = s.next();
269 let f = f.on_timeout(zx::MonotonicDuration::from_millis(500).after_now(), || {
270 panic!("timeout waiting for watcher")
271 });
272 f.map(|next| {
273 next.expect("the stream yielded no next item")
274 .unwrap_or_else(|e| panic!("Error waiting for watcher: {:?}", e))
275 })
276 }
277
278 #[fuchsia::test]
279 async fn test_existing() {
280 let tmp_dir = tempdir().unwrap();
281 let _ = File::create(tmp_dir.path().join("file1")).unwrap();
282
283 let dir = crate::directory::open_in_namespace(
284 tmp_dir.path().to_str().unwrap(),
285 fio::PERM_READABLE,
286 )
287 .unwrap();
288 let mut w = Watcher::new(&dir).await.unwrap();
289
290 let msg = one_step(&mut w).await;
291 assert_eq!(WatchEvent::EXISTING, msg.event);
292 assert_eq!(Path::new("."), msg.filename);
293
294 let msg = one_step(&mut w).await;
295 assert_eq!(WatchEvent::EXISTING, msg.event);
296 assert_eq!(Path::new("file1"), msg.filename);
297
298 let msg = one_step(&mut w).await;
299 assert_eq!(WatchEvent::IDLE, msg.event);
300 }
301
302 #[fuchsia::test]
303 async fn test_add() {
304 let tmp_dir = tempdir().unwrap();
305
306 let dir = crate::directory::open_in_namespace(
307 tmp_dir.path().to_str().unwrap(),
308 fio::PERM_READABLE,
309 )
310 .unwrap();
311 let mut w = Watcher::new(&dir).await.unwrap();
312
313 loop {
314 let msg = one_step(&mut w).await;
315 match msg.event {
316 WatchEvent::EXISTING => continue,
317 WatchEvent::IDLE => break,
318 _ => panic!("Unexpected watch event!"),
319 }
320 }
321
322 let _ = File::create(tmp_dir.path().join("file1")).unwrap();
323 let msg = one_step(&mut w).await;
324 assert_eq!(WatchEvent::ADD_FILE, msg.event);
325 assert_eq!(Path::new("file1"), msg.filename);
326 }
327
328 #[fuchsia::test]
329 async fn test_remove() {
330 let tmp_dir = tempdir().unwrap();
331
332 let filename = "file1";
333 let filepath = tmp_dir.path().join(filename);
334 let _ = File::create(&filepath).unwrap();
335
336 let dir = crate::directory::open_in_namespace(
337 tmp_dir.path().to_str().unwrap(),
338 fio::PERM_READABLE,
339 )
340 .unwrap();
341 let mut w = Watcher::new(&dir).await.unwrap();
342
343 loop {
344 let msg = one_step(&mut w).await;
345 match msg.event {
346 WatchEvent::EXISTING => continue,
347 WatchEvent::IDLE => break,
348 _ => panic!("Unexpected watch event!"),
349 }
350 }
351
352 ::std::fs::remove_file(&filepath).unwrap();
353 let msg = one_step(&mut w).await;
354 assert_eq!(WatchEvent::REMOVE_FILE, msg.event);
355 assert_eq!(Path::new(filename), msg.filename);
356 }
357
358 struct MockDirectory;
359
360 impl MockDirectory {
361 fn new() -> Arc<Self> {
362 Arc::new(Self)
363 }
364 }
365
366 impl GetEntryInfo for MockDirectory {
367 fn entry_info(&self) -> EntryInfo {
368 EntryInfo::new(fio::INO_UNKNOWN, fio::DirentType::Directory)
369 }
370 }
371
372 impl Node for MockDirectory {
373 async fn get_attributes(
374 &self,
375 _query: fio::NodeAttributesQuery,
376 ) -> Result<fio::NodeAttributes2, zx::Status> {
377 unimplemented!();
378 }
379
380 fn close(self: Arc<Self>) {}
381 }
382
383 impl Directory for MockDirectory {
384 fn open(
385 self: Arc<Self>,
386 scope: ExecutionScope,
387 _path: vfs::path::Path,
388 flags: fio::Flags,
389 object_request: ObjectRequestRef<'_>,
390 ) -> Result<(), zx::Status> {
391 object_request.take().create_connection_sync::<ImmutableConnection<_>, _>(
392 scope,
393 self.clone(),
394 flags,
395 );
396 Ok(())
397 }
398
399 async fn read_dirents<'a>(
400 &'a self,
401 _pos: &'a TraversalPosition,
402 _sink: Box<dyn dirents_sink::Sink>,
403 ) -> Result<(TraversalPosition, Box<dyn dirents_sink::Sealed>), zx::Status> {
404 unimplemented!("Not implemented");
405 }
406
407 fn register_watcher(
408 self: Arc<Self>,
409 _scope: ExecutionScope,
410 _mask: fio::WatchMask,
411 _watcher: DirectoryWatcher,
412 ) -> Result<(), zx::Status> {
413 Ok(())
416 }
417
418 fn unregister_watcher(self: Arc<Self>, _key: usize) {
419 unimplemented!("Not implemented");
420 }
421 }
422
423 #[fuchsia::test]
424 async fn test_error() {
425 let test_dir = MockDirectory::new();
426 let client = vfs::directory::serve_read_only(test_dir);
427 let mut w = Watcher::new(&client).await.unwrap();
428 let msg = w.next().await.expect("the stream yielded no next item");
429 assert!(!w.is_terminated());
430 assert_matches!(msg, Err(WatcherStreamError::ChannelRead(zx::Status::PEER_CLOSED)));
431 assert!(!w.is_terminated());
432 assert_matches!(w.next().await, None);
433 assert!(w.is_terminated());
434 }
435}