Skip to main content

vfs/directory/
watchers.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Watchers handles a list of watcher connections attached to a directory.  Watchers as described
6//! in fuchsia.io.
7
8pub mod event_producers;
9
10mod watcher;
11pub use watcher::Controller;
12
13use crate::directory::entry_container::{Directory, DirectoryWatcher};
14use crate::directory::watchers::event_producers::EventProducer;
15use crate::execution_scope::ExecutionScope;
16use flex_fuchsia_io as fio;
17use slab::Slab;
18use std::sync::Arc;
19
20/// Wraps all watcher connections observing one directory.  The directory is responsible for
21/// calling [`Self::add()`] and [`Self::send_event()`] method when appropriate to make sure
22/// watchers are observing a consistent view.
23pub struct Watchers(Slab<Controller>);
24
25impl Watchers {
26    /// Constructs a new Watchers instance with no connected watchers.
27    pub fn new() -> Self {
28        Watchers(Slab::new())
29    }
30
31    /// Connects a new watcher (connected over the `channel`) to the list of watchers.  It is the
32    /// responsibility of the caller to also send `WATCH_EVENT_EXISTING` and `WatchMask::IDLE`
33    /// events on the returned [`Controller`] to the newly connected watcher using the
34    /// [`Self::send_event`] methods.  This `mask` is the event mask this watcher has requested.
35    ///
36    /// Return value of `None` means the executor did not accept a new task, so the watcher has
37    /// been dropped.
38    ///
39    /// NOTE The reason `add` can not send both events on its own by consuming an
40    /// [`EventProducer`] is because a lazy directory needs async context to generate a list of
41    /// it's entries.  Meaning we need a async version of the [`EventProducer`] - and that is a lot
42    /// of additional managing of functions and state.  Traits do not support async methods yet, so
43    /// we would need to manage futures returned by the [`EventProducer`] methods explicitly.
44    /// Plus, for the [`crate::directory::immutable::Simple`] directory it is all unnecessary.
45    #[must_use = "Caller of add() must send WATCH_EVENT_EXISTING and fio::WatchMask::IDLE on the \
46                  returned controller"]
47    pub fn add(
48        &mut self,
49        scope: ExecutionScope,
50        directory: Arc<dyn Directory>,
51        mask: fio::WatchMask,
52        watcher: DirectoryWatcher,
53    ) -> &Controller {
54        let entry = self.0.vacant_entry();
55        let key = entry.key();
56        let done = move || directory.unregister_watcher(key);
57
58        entry.insert(Controller::new(scope, mask, watcher, done))
59    }
60
61    /// Informs all the connected watchers about the specified event.  While `mask` and `event`
62    /// carry the same information, as they are represented by `WatchMask::*` and `WATCH_EVENT_*`
63    /// constants in fuchsia.io, it is easier when both forms are provided.  `mask` is used to
64    /// filter out those watchers that did not request for observation of this event and `event` is
65    /// used to construct the event object.  The method will operate correctly only if `mask` and
66    /// `event` match.
67    ///
68    /// In case of a communication error with any of the watchers, connection to this watcher is
69    /// closed.
70    pub fn send_event(&mut self, producer: &mut dyn EventProducer) {
71        while producer.prepare_for_next_buffer() {
72            let mut consumed_any = false;
73
74            for (_key, controller) in self.0.iter() {
75                controller.send_buffer(producer.mask(), || {
76                    consumed_any = true;
77                    producer.buffer()
78                });
79            }
80
81            if !consumed_any {
82                break;
83            }
84        }
85    }
86
87    /// Disconnects a watcher with the specified key.  A directory will use this method during the
88    /// `unregister_watcher` call.
89    pub fn remove(&mut self, key: usize) {
90        self.0.remove(key);
91    }
92}
93
94#[cfg(all(test, target_os = "fuchsia"))]
95mod tests {
96    use super::*;
97    use crate::directory::dirents_sink::Sealed;
98    use crate::directory::entry::{EntryInfo, GetEntryInfo};
99    use crate::directory::traversal_position::TraversalPosition;
100    use crate::node::Node;
101    use fuchsia_async as fasync;
102    use fuchsia_sync::Mutex;
103    use zx_status::Status;
104
105    struct FakeDirectory(Mutex<Inner>);
106
107    impl FakeDirectory {
108        fn new() -> Arc<Self> {
109            Arc::new(FakeDirectory(Mutex::new(Inner {
110                remove_called: false,
111                watchers: Watchers::new(),
112            })))
113        }
114    }
115    struct Inner {
116        remove_called: bool,
117        watchers: Watchers,
118    }
119
120    impl Directory for FakeDirectory {
121        fn open(
122            self: Arc<Self>,
123            _scope: ExecutionScope,
124            _path: crate::Path,
125            _flags: fio::Flags,
126            _object_request: crate::ObjectRequestRef<'_>,
127        ) -> Result<(), Status> {
128            unimplemented!()
129        }
130
131        async fn read_dirents(
132            &self,
133            _pos: &TraversalPosition,
134            _sink: Box<dyn crate::directory::dirents_sink::Sink>,
135        ) -> Result<(TraversalPosition, Box<dyn Sealed>), Status> {
136            unimplemented!()
137        }
138
139        fn register_watcher(
140            self: Arc<Self>,
141            scope: ExecutionScope,
142            mask: fio::WatchMask,
143            watcher: DirectoryWatcher,
144        ) -> Result<(), Status> {
145            let _ = self.0.lock().watchers.add(scope, self.clone(), mask, watcher);
146            Ok(())
147        }
148
149        fn unregister_watcher(self: Arc<Self>, key: usize) {
150            let mut this = self.0.lock();
151            this.remove_called = true;
152            this.watchers.remove(key);
153        }
154    }
155
156    impl Node for FakeDirectory {
157        async fn get_attributes(
158            &self,
159            _requested_attributes: fio::NodeAttributesQuery,
160        ) -> Result<fio::NodeAttributes2, Status> {
161            unimplemented!()
162        }
163    }
164
165    impl GetEntryInfo for FakeDirectory {
166        fn entry_info(&self) -> EntryInfo {
167            unimplemented!()
168        }
169    }
170
171    #[fuchsia::test]
172    fn test_unregister_watcher_on_peer_closed() {
173        let mut executor = fasync::TestExecutor::new();
174        let directory = FakeDirectory::new();
175        #[cfg(feature = "fdomain")]
176        let client_domain = fdomain_local::local_client_empty();
177
178        #[cfg(feature = "fdomain")]
179        let (client, server) = client_domain.create_endpoints::<fio::DirectoryWatcherMarker>();
180        #[cfg(not(feature = "fdomain"))]
181        let (client, server) = flex_client::fidl::create_endpoints::<fio::DirectoryWatcherMarker>();
182
183        #[cfg(feature = "fdomain")]
184        let scope = ExecutionScope::new(client_domain.clone());
185        #[cfg(not(feature = "fdomain"))]
186        let scope = ExecutionScope::new();
187        directory
188            .clone()
189            .register_watcher(scope, fio::WatchMask::EXISTING, server.into())
190            .expect("Failed to register watcher");
191        assert!(!directory.0.lock().remove_called);
192        assert_eq!(directory.0.lock().watchers.0.len(), 1);
193
194        // Dropping the client end will signal PEER_CLOSED on the server end.
195        std::mem::drop(client);
196        // Wait for the Controller's task to handle the PEER_CLOSED signal.
197        let _ = executor.run_until_stalled(&mut std::future::pending::<()>());
198        assert!(directory.0.lock().remove_called);
199        assert_eq!(directory.0.lock().watchers.0.len(), 0);
200    }
201
202    #[fuchsia::test]
203    fn test_unregister_watcher_on_message() {
204        let mut executor = fasync::TestExecutor::new();
205        let directory = FakeDirectory::new();
206        #[cfg(feature = "fdomain")]
207        let client_domain = fdomain_local::local_client_empty();
208
209        #[cfg(feature = "fdomain")]
210        let (client, server) = client_domain.create_endpoints::<fio::DirectoryWatcherMarker>();
211        #[cfg(not(feature = "fdomain"))]
212        let (client, server) = flex_client::fidl::create_endpoints::<fio::DirectoryWatcherMarker>();
213
214        #[cfg(feature = "fdomain")]
215        let scope = ExecutionScope::new(client_domain.clone());
216        #[cfg(not(feature = "fdomain"))]
217        let scope = ExecutionScope::new();
218        directory
219            .clone()
220            .register_watcher(scope, fio::WatchMask::EXISTING, server.into())
221            .expect("Failed to register watcher");
222        assert!(!directory.0.lock().remove_called);
223        assert_eq!(directory.0.lock().watchers.0.len(), 1);
224
225        // The client shouldn't send anything over the channel. The Controller's task will terminate
226        // if it receives a message.
227        client.channel().write(&[1, 2, 3], &mut []).expect("Failed to write to channel");
228        // Wait for the Controller's task to handle the CHANNEL_READABLE signal.
229        let _ = executor.run_until_stalled(&mut std::future::pending::<()>());
230        assert!(directory.0.lock().remove_called);
231        assert_eq!(directory.0.lock().watchers.0.len(), 0);
232    }
233}