fuchsia_fs/directory/
watcher.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Stream-based Fuchsia VFS directory watcher
6
7#![deny(missing_docs)]
8
9use flex_client::{MessageBuf, ProxyHasDomain};
10use flex_fuchsia_io as fio;
11use futures::stream::{FusedStream, Stream};
12use std::ffi::OsStr;
13use std::os::unix::ffi::OsStrExt;
14use std::path::PathBuf;
15use std::pin::Pin;
16use std::task::{Context, Poll};
17use thiserror::Error;
18use zx_status::assoc_values;
19
20#[cfg(not(feature = "fdomain"))]
21use fuchsia_async as fasync;
22
23#[derive(Debug, Error, Clone)]
24#[allow(missing_docs)]
25pub enum WatcherCreateError {
26    #[error("while sending watch request: {0}")]
27    SendWatchRequest(#[source] fidl::Error),
28
29    #[error("watch failed with status: {0}")]
30    WatchError(#[source] zx_status::Status),
31
32    #[error("while converting client end to fasync channel: {0}")]
33    ChannelConversion(#[source] zx_status::Status),
34}
35
36#[derive(Debug, Error)]
37#[cfg_attr(not(feature = "fdomain"), derive(Eq, PartialEq))]
38#[allow(missing_docs)]
39pub enum WatcherStreamError {
40    #[cfg(not(feature = "fdomain"))]
41    #[error("read from watch channel failed with status: {0}")]
42    ChannelRead(#[from] zx_status::Status),
43    #[cfg(feature = "fdomain")]
44    #[error("read from watch channel failed: {0}")]
45    ChannelRead(#[from] flex_client::Error),
46}
47
48/// Describes the type of event that occurred in the directory being watched.
49#[repr(C)]
50#[derive(Copy, Clone, Eq, PartialEq)]
51pub struct WatchEvent(fio::WatchEvent);
52
53assoc_values!(WatchEvent, [
54    /// The directory being watched has been deleted. The name returned for this event
55    /// will be `.` (dot), as it is referring to the directory itself.
56    DELETED     = fio::WatchEvent::Deleted;
57    /// A file was added.
58    ADD_FILE    = fio::WatchEvent::Added;
59    /// A file was removed.
60    REMOVE_FILE = fio::WatchEvent::Removed;
61    /// A file existed at the time the Watcher was created.
62    EXISTING    = fio::WatchEvent::Existing;
63    /// All existing files have been enumerated.
64    IDLE        = fio::WatchEvent::Idle;
65]);
66
67/// A message containing a `WatchEvent` and the filename (relative to the directory being watched)
68/// that triggered the event.
69#[derive(Debug, Eq, PartialEq)]
70pub struct WatchMessage {
71    /// The event that occurred.
72    pub event: WatchEvent,
73    /// The filename that triggered the message.
74    pub filename: PathBuf,
75}
76
77#[derive(Debug, Eq, PartialEq)]
78enum WatcherState {
79    Watching,
80    TerminateOnNextPoll,
81    Terminated,
82}
83
84/// Provides a Stream of WatchMessages corresponding to filesystem events for a given directory.
85/// After receiving an error, the stream will return the error, and then will terminate. After it's
86/// terminated, the stream is fused and will continue to return None when polled.
87#[derive(Debug)]
88#[must_use = "futures/streams must be polled"]
89pub struct Watcher {
90    ch: flex_client::AsyncChannel,
91    // If idx >= buf.bytes().len(), you must call reset_buf() before get_next_msg().
92    buf: MessageBuf,
93    idx: usize,
94    state: WatcherState,
95}
96
97impl Unpin for Watcher {}
98
99impl Watcher {
100    /// Creates a new `Watcher` for the directory given by `dir`.
101    pub async fn new(dir: &fio::DirectoryProxy) -> Result<Watcher, WatcherCreateError> {
102        let (client_end, server_end) = dir.domain().create_endpoints();
103        let options = 0u32;
104        let status = dir
105            .watch(fio::WatchMask::all(), options, server_end)
106            .await
107            .map_err(WatcherCreateError::SendWatchRequest)?;
108        zx_status::Status::ok(status).map_err(WatcherCreateError::WatchError)?;
109        let mut buf = MessageBuf::new();
110        buf.ensure_capacity_bytes(fio::MAX_BUF as usize);
111        Ok(Watcher {
112            #[cfg(not(feature = "fdomain"))]
113            ch: fasync::Channel::from_channel(client_end.into_channel()),
114            #[cfg(feature = "fdomain")]
115            ch: client_end.into_channel(),
116            buf,
117            idx: 0,
118            state: WatcherState::Watching,
119        })
120    }
121
122    fn reset_buf(&mut self) {
123        self.idx = 0;
124        self.buf.clear();
125    }
126
127    fn get_next_msg(&mut self) -> WatchMessage {
128        assert!(self.idx < self.buf.bytes().len());
129        let next_msg = VfsWatchMsg::from_raw(&self.buf.bytes()[self.idx..])
130            .expect("Invalid buffer received by Watcher!");
131        self.idx += next_msg.len();
132
133        let mut pathbuf = PathBuf::new();
134        pathbuf.push(OsStr::from_bytes(next_msg.name()));
135        let event = next_msg.event();
136        WatchMessage { event, filename: pathbuf }
137    }
138}
139
140impl FusedStream for Watcher {
141    fn is_terminated(&self) -> bool {
142        self.state == WatcherState::Terminated
143    }
144}
145
146impl Stream for Watcher {
147    type Item = Result<WatchMessage, WatcherStreamError>;
148
149    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
150        let this = &mut *self;
151        // Once this stream has hit an error, it's likely unrecoverable at this level and should be
152        // closed. Clients can attempt to recover by creating a new Watcher.
153        if this.state == WatcherState::TerminateOnNextPoll {
154            this.state = WatcherState::Terminated;
155        }
156        if this.state == WatcherState::Terminated {
157            return Poll::Ready(None);
158        }
159        if this.idx >= this.buf.bytes().len() {
160            this.reset_buf();
161        }
162        if this.idx == 0 {
163            match this.ch.recv_from(cx, &mut this.buf) {
164                Poll::Ready(Ok(())) => {}
165                Poll::Ready(Err(e)) => {
166                    self.state = WatcherState::TerminateOnNextPoll;
167                    return Poll::Ready(Some(Err(e.into())));
168                }
169                Poll::Pending => return Poll::Pending,
170            }
171        }
172        Poll::Ready(Some(Ok(this.get_next_msg())))
173    }
174}
175
176#[repr(C)]
177#[derive(Default)]
178struct IncompleteArrayField<T>(::std::marker::PhantomData<T>);
179impl<T> IncompleteArrayField<T> {
180    #[inline]
181    pub unsafe fn as_ptr(&self) -> *const T {
182        ::std::mem::transmute(self)
183    }
184    #[inline]
185    pub unsafe fn as_slice(&self, len: usize) -> &[T] {
186        ::std::slice::from_raw_parts(self.as_ptr(), len)
187    }
188}
189impl<T> ::std::fmt::Debug for IncompleteArrayField<T> {
190    fn fmt(&self, fmt: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
191        fmt.write_str("IncompleteArrayField")
192    }
193}
194
195#[repr(C)]
196#[derive(Debug)]
197struct vfs_watch_msg_t {
198    event: fio::WatchEvent,
199    len: u8,
200    name: IncompleteArrayField<u8>,
201}
202
203#[derive(Debug)]
204struct VfsWatchMsg<'a> {
205    inner: &'a vfs_watch_msg_t,
206}
207
208impl<'a> VfsWatchMsg<'a> {
209    fn from_raw(buf: &'a [u8]) -> Option<VfsWatchMsg<'a>> {
210        if buf.len() < ::std::mem::size_of::<vfs_watch_msg_t>() {
211            return None;
212        }
213        // This is safe as long as the buffer is at least as large as a vfs_watch_msg_t, which we
214        // just verified. Further, we verify that the buffer has enough bytes to hold the
215        // "incomplete array field" member.
216        let m = unsafe { VfsWatchMsg { inner: &*(buf.as_ptr() as *const vfs_watch_msg_t) } };
217        if buf.len() < ::std::mem::size_of::<vfs_watch_msg_t>() + m.namelen() {
218            return None;
219        }
220        Some(m)
221    }
222
223    fn len(&self) -> usize {
224        ::std::mem::size_of::<vfs_watch_msg_t>() + self.namelen()
225    }
226
227    fn event(&self) -> WatchEvent {
228        WatchEvent(self.inner.event)
229    }
230
231    fn namelen(&self) -> usize {
232        self.inner.len as usize
233    }
234
235    fn name(&self) -> &'a [u8] {
236        // This is safe because we verified during construction that the inner name field has at
237        // least namelen() bytes in it.
238        unsafe { self.inner.name.as_slice(self.namelen()) }
239    }
240}
241
242#[cfg(test)]
243mod tests {
244    use super::*;
245    use assert_matches::assert_matches;
246    use fuchsia_async::{DurationExt, TimeoutExt};
247
248    use futures::prelude::*;
249    use std::fmt::Debug;
250    use std::fs::File;
251    use std::path::Path;
252    use std::sync::Arc;
253    use tempfile::tempdir;
254    use vfs::directory::dirents_sink;
255    use vfs::directory::entry::{EntryInfo, GetEntryInfo};
256    use vfs::directory::entry_container::{Directory, DirectoryWatcher};
257    use vfs::directory::immutable::connection::ImmutableConnection;
258    use vfs::directory::traversal_position::TraversalPosition;
259    use vfs::execution_scope::ExecutionScope;
260    use vfs::node::Node;
261    use vfs::ObjectRequestRef;
262
263    fn one_step<'a, S, OK, ERR>(s: &'a mut S) -> impl Future<Output = OK> + 'a
264    where
265        S: Stream<Item = Result<OK, ERR>> + Unpin,
266        ERR: Debug,
267    {
268        let f = s.next();
269        let f = f.on_timeout(zx::MonotonicDuration::from_millis(500).after_now(), || {
270            panic!("timeout waiting for watcher")
271        });
272        f.map(|next| {
273            next.expect("the stream yielded no next item")
274                .unwrap_or_else(|e| panic!("Error waiting for watcher: {:?}", e))
275        })
276    }
277
278    #[fuchsia::test]
279    async fn test_existing() {
280        let tmp_dir = tempdir().unwrap();
281        let _ = File::create(tmp_dir.path().join("file1")).unwrap();
282
283        let dir = crate::directory::open_in_namespace(
284            tmp_dir.path().to_str().unwrap(),
285            fio::PERM_READABLE,
286        )
287        .unwrap();
288        let mut w = Watcher::new(&dir).await.unwrap();
289
290        let msg = one_step(&mut w).await;
291        assert_eq!(WatchEvent::EXISTING, msg.event);
292        assert_eq!(Path::new("."), msg.filename);
293
294        let msg = one_step(&mut w).await;
295        assert_eq!(WatchEvent::EXISTING, msg.event);
296        assert_eq!(Path::new("file1"), msg.filename);
297
298        let msg = one_step(&mut w).await;
299        assert_eq!(WatchEvent::IDLE, msg.event);
300    }
301
302    #[fuchsia::test]
303    async fn test_add() {
304        let tmp_dir = tempdir().unwrap();
305
306        let dir = crate::directory::open_in_namespace(
307            tmp_dir.path().to_str().unwrap(),
308            fio::PERM_READABLE,
309        )
310        .unwrap();
311        let mut w = Watcher::new(&dir).await.unwrap();
312
313        loop {
314            let msg = one_step(&mut w).await;
315            match msg.event {
316                WatchEvent::EXISTING => continue,
317                WatchEvent::IDLE => break,
318                _ => panic!("Unexpected watch event!"),
319            }
320        }
321
322        let _ = File::create(tmp_dir.path().join("file1")).unwrap();
323        let msg = one_step(&mut w).await;
324        assert_eq!(WatchEvent::ADD_FILE, msg.event);
325        assert_eq!(Path::new("file1"), msg.filename);
326    }
327
328    #[fuchsia::test]
329    async fn test_remove() {
330        let tmp_dir = tempdir().unwrap();
331
332        let filename = "file1";
333        let filepath = tmp_dir.path().join(filename);
334        let _ = File::create(&filepath).unwrap();
335
336        let dir = crate::directory::open_in_namespace(
337            tmp_dir.path().to_str().unwrap(),
338            fio::PERM_READABLE,
339        )
340        .unwrap();
341        let mut w = Watcher::new(&dir).await.unwrap();
342
343        loop {
344            let msg = one_step(&mut w).await;
345            match msg.event {
346                WatchEvent::EXISTING => continue,
347                WatchEvent::IDLE => break,
348                _ => panic!("Unexpected watch event!"),
349            }
350        }
351
352        ::std::fs::remove_file(&filepath).unwrap();
353        let msg = one_step(&mut w).await;
354        assert_eq!(WatchEvent::REMOVE_FILE, msg.event);
355        assert_eq!(Path::new(filename), msg.filename);
356    }
357
358    struct MockDirectory;
359
360    impl MockDirectory {
361        fn new() -> Arc<Self> {
362            Arc::new(Self)
363        }
364    }
365
366    impl GetEntryInfo for MockDirectory {
367        fn entry_info(&self) -> EntryInfo {
368            EntryInfo::new(fio::INO_UNKNOWN, fio::DirentType::Directory)
369        }
370    }
371
372    impl Node for MockDirectory {
373        async fn get_attributes(
374            &self,
375            _query: fio::NodeAttributesQuery,
376        ) -> Result<fio::NodeAttributes2, zx::Status> {
377            unimplemented!();
378        }
379
380        fn close(self: Arc<Self>) {}
381    }
382
383    impl Directory for MockDirectory {
384        fn open(
385            self: Arc<Self>,
386            scope: ExecutionScope,
387            _path: vfs::path::Path,
388            flags: fio::Flags,
389            object_request: ObjectRequestRef<'_>,
390        ) -> Result<(), zx::Status> {
391            object_request.take().create_connection_sync::<ImmutableConnection<_>, _>(
392                scope,
393                self.clone(),
394                flags,
395            );
396            Ok(())
397        }
398
399        async fn read_dirents<'a>(
400            &'a self,
401            _pos: &'a TraversalPosition,
402            _sink: Box<dyn dirents_sink::Sink>,
403        ) -> Result<(TraversalPosition, Box<dyn dirents_sink::Sealed>), zx::Status> {
404            unimplemented!("Not implemented");
405        }
406
407        fn register_watcher(
408            self: Arc<Self>,
409            _scope: ExecutionScope,
410            _mask: fio::WatchMask,
411            _watcher: DirectoryWatcher,
412        ) -> Result<(), zx::Status> {
413            // Don't do anything, just throw out the watcher, which should close the channel, to
414            // generate a PEER_CLOSED error.
415            Ok(())
416        }
417
418        fn unregister_watcher(self: Arc<Self>, _key: usize) {
419            unimplemented!("Not implemented");
420        }
421    }
422
423    #[fuchsia::test]
424    async fn test_error() {
425        let test_dir = MockDirectory::new();
426        let client = vfs::directory::serve_read_only(test_dir);
427        let mut w = Watcher::new(&client).await.unwrap();
428        let msg = w.next().await.expect("the stream yielded no next item");
429        assert!(!w.is_terminated());
430        assert_matches!(msg, Err(WatcherStreamError::ChannelRead(zx::Status::PEER_CLOSED)));
431        assert!(!w.is_terminated());
432        assert_matches!(w.next().await, None);
433        assert!(w.is_terminated());
434    }
435}