vfs/directory/
watchers.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Watchers handles a list of watcher connections attached to a directory.  Watchers as described
6//! in fuchsia.io.
7
8pub mod event_producers;
9
10mod watcher;
11pub use watcher::Controller;
12
13use crate::directory::entry_container::{Directory, DirectoryWatcher};
14use crate::directory::watchers::event_producers::EventProducer;
15use crate::execution_scope::ExecutionScope;
16use fidl_fuchsia_io as fio;
17use slab::Slab;
18use std::sync::Arc;
19
20/// Wraps all watcher connections observing one directory.  The directory is responsible for
21/// calling [`Self::add()`] and [`Self::send_event()`] method when appropriate to make sure
22/// watchers are observing a consistent view.
23pub struct Watchers(Slab<Controller>);
24
25impl Watchers {
26    /// Constructs a new Watchers instance with no connected watchers.
27    pub fn new() -> Self {
28        Watchers(Slab::new())
29    }
30
31    /// Connects a new watcher (connected over the `channel`) to the list of watchers.  It is the
32    /// responsibility of the caller to also send `WATCH_EVENT_EXISTING` and `WatchMask::IDLE`
33    /// events on the returned [`Controller`] to the newly connected watcher using the
34    /// [`Self::send_event`] methods.  This `mask` is the event mask this watcher has requested.
35    ///
36    /// Return value of `None` means the executor did not accept a new task, so the watcher has
37    /// been dropped.
38    ///
39    /// NOTE The reason `add` can not send both events on its own by consuming an
40    /// [`EventProducer`] is because a lazy directory needs async context to generate a list of
41    /// it's entries.  Meaning we need a async version of the [`EventProducer`] - and that is a lot
42    /// of additional managing of functions and state.  Traits do not support async methods yet, so
43    /// we would need to manage futures returned by the [`EventProducer`] methods explicitly.
44    /// Plus, for the [`crate::directory::immutable::Simple`] directory it is all unnecessary.
45    #[must_use = "Caller of add() must send WATCH_EVENT_EXISTING and fio::WatchMask::IDLE on the \
46                  returned controller"]
47    pub fn add(
48        &mut self,
49        scope: ExecutionScope,
50        directory: Arc<dyn Directory>,
51        mask: fio::WatchMask,
52        watcher: DirectoryWatcher,
53    ) -> &Controller {
54        let entry = self.0.vacant_entry();
55        let key = entry.key();
56        let done = move || directory.unregister_watcher(key);
57
58        entry.insert(Controller::new(scope, mask, watcher, done))
59    }
60
61    /// Informs all the connected watchers about the specified event.  While `mask` and `event`
62    /// carry the same information, as they are represented by `WatchMask::*` and `WATCH_EVENT_*`
63    /// constants in fuchsia.io, it is easier when both forms are provided.  `mask` is used to
64    /// filter out those watchers that did not request for observation of this event and `event` is
65    /// used to construct the event object.  The method will operate correctly only if `mask` and
66    /// `event` match.
67    ///
68    /// In case of a communication error with any of the watchers, connection to this watcher is
69    /// closed.
70    pub fn send_event(&mut self, producer: &mut dyn EventProducer) {
71        while producer.prepare_for_next_buffer() {
72            let mut consumed_any = false;
73
74            for (_key, controller) in self.0.iter() {
75                controller.send_buffer(producer.mask(), || {
76                    consumed_any = true;
77                    producer.buffer()
78                });
79            }
80
81            if !consumed_any {
82                break;
83            }
84        }
85    }
86
87    /// Disconnects a watcher with the specified key.  A directory will use this method during the
88    /// `unregister_watcher` call.
89    pub fn remove(&mut self, key: usize) {
90        self.0.remove(key);
91    }
92}
93
94#[cfg(all(test, target_os = "fuchsia"))]
95mod tests {
96    use super::*;
97    use crate::directory::dirents_sink::Sealed;
98    use crate::directory::entry::{EntryInfo, GetEntryInfo};
99    use crate::directory::traversal_position::TraversalPosition;
100    use crate::node::Node;
101    use fuchsia_async as fasync;
102    use fuchsia_sync::Mutex;
103    use zx_status::Status;
104
105    struct FakeDirectory(Mutex<Inner>);
106
107    impl FakeDirectory {
108        fn new() -> Arc<Self> {
109            Arc::new(FakeDirectory(Mutex::new(Inner {
110                remove_called: false,
111                watchers: Watchers::new(),
112            })))
113        }
114    }
115    struct Inner {
116        remove_called: bool,
117        watchers: Watchers,
118    }
119
120    impl Directory for FakeDirectory {
121        fn open(
122            self: Arc<Self>,
123            _scope: ExecutionScope,
124            _path: crate::Path,
125            _flags: fio::Flags,
126            _object_request: crate::ObjectRequestRef<'_>,
127        ) -> Result<(), Status> {
128            unimplemented!()
129        }
130
131        async fn read_dirents<'a>(
132            &'a self,
133            _pos: &'a TraversalPosition,
134            _sink: Box<dyn crate::directory::dirents_sink::Sink>,
135        ) -> Result<(TraversalPosition, Box<dyn Sealed>), Status> {
136            unimplemented!()
137        }
138
139        fn register_watcher(
140            self: Arc<Self>,
141            scope: ExecutionScope,
142            mask: fio::WatchMask,
143            watcher: DirectoryWatcher,
144        ) -> Result<(), Status> {
145            let _ = self.0.lock().watchers.add(scope, self.clone(), mask, watcher);
146            Ok(())
147        }
148
149        fn unregister_watcher(self: Arc<Self>, key: usize) {
150            let mut this = self.0.lock();
151            this.remove_called = true;
152            this.watchers.remove(key);
153        }
154    }
155
156    impl Node for FakeDirectory {
157        async fn get_attributes(
158            &self,
159            _requested_attributes: fio::NodeAttributesQuery,
160        ) -> Result<fio::NodeAttributes2, Status> {
161            unimplemented!()
162        }
163    }
164
165    impl GetEntryInfo for FakeDirectory {
166        fn entry_info(&self) -> EntryInfo {
167            unimplemented!()
168        }
169    }
170
171    #[fuchsia::test]
172    fn test_unregister_watcher_on_peer_closed() {
173        let mut executor = fasync::TestExecutor::new();
174        let directory = FakeDirectory::new();
175        let (client, server) = fidl::endpoints::create_endpoints::<fio::DirectoryWatcherMarker>();
176        directory
177            .clone()
178            .register_watcher(ExecutionScope::new(), fio::WatchMask::EXISTING, server.into())
179            .expect("Failed to register watcher");
180        assert!(!directory.0.lock().remove_called);
181        assert_eq!(directory.0.lock().watchers.0.len(), 1);
182
183        // Dropping the client end will signal PEER_CLOSED on the server end.
184        std::mem::drop(client);
185        // Wait for the Controller's task to handle the PEER_CLOSED signal.
186        let _ = executor.run_until_stalled(&mut std::future::pending::<()>());
187        assert!(directory.0.lock().remove_called);
188        assert_eq!(directory.0.lock().watchers.0.len(), 0);
189    }
190
191    #[fuchsia::test]
192    fn test_unregister_watcher_on_message() {
193        let mut executor = fasync::TestExecutor::new();
194        let directory = FakeDirectory::new();
195        let (client, server) = fidl::endpoints::create_endpoints::<fio::DirectoryWatcherMarker>();
196        directory
197            .clone()
198            .register_watcher(ExecutionScope::new(), fio::WatchMask::EXISTING, server.into())
199            .expect("Failed to register watcher");
200        assert!(!directory.0.lock().remove_called);
201        assert_eq!(directory.0.lock().watchers.0.len(), 1);
202
203        // The client shouldn't send anything over the channel. The Controller's task will terminate
204        // if it receives a message.
205        client.channel().write(&[1, 2, 3], &mut []).expect("Failed to write to channel");
206        // Wait for the Controller's task to handle the CHANNEL_READABLE signal.
207        let _ = executor.run_until_stalled(&mut std::future::pending::<()>());
208        assert!(directory.0.lock().remove_called);
209        assert_eq!(directory.0.lock().watchers.0.len(), 0);
210    }
211}