vfs/directory/
watchers.rs1pub mod event_producers;
9
10mod watcher;
11pub use watcher::Controller;
12
13use crate::directory::entry_container::{Directory, DirectoryWatcher};
14use crate::directory::watchers::event_producers::EventProducer;
15use crate::execution_scope::ExecutionScope;
16use fidl_fuchsia_io as fio;
17use slab::Slab;
18use std::sync::Arc;
19
20pub struct Watchers(Slab<Controller>);
24
25impl Watchers {
26 pub fn new() -> Self {
28 Watchers(Slab::new())
29 }
30
31 #[must_use = "Caller of add() must send WATCH_EVENT_EXISTING and fio::WatchMask::IDLE on the \
46 returned controller"]
47 pub fn add(
48 &mut self,
49 scope: ExecutionScope,
50 directory: Arc<dyn Directory>,
51 mask: fio::WatchMask,
52 watcher: DirectoryWatcher,
53 ) -> &Controller {
54 let entry = self.0.vacant_entry();
55 let key = entry.key();
56 let done = move || directory.unregister_watcher(key);
57
58 entry.insert(Controller::new(scope, mask, watcher, done))
59 }
60
61 pub fn send_event(&mut self, producer: &mut dyn EventProducer) {
71 while producer.prepare_for_next_buffer() {
72 let mut consumed_any = false;
73
74 for (_key, controller) in self.0.iter() {
75 controller.send_buffer(producer.mask(), || {
76 consumed_any = true;
77 producer.buffer()
78 });
79 }
80
81 if !consumed_any {
82 break;
83 }
84 }
85 }
86
87 pub fn remove(&mut self, key: usize) {
90 self.0.remove(key);
91 }
92}
93
94#[cfg(all(test, target_os = "fuchsia"))]
95mod tests {
96 use super::*;
97 use crate::directory::dirents_sink::Sealed;
98 use crate::directory::entry::{EntryInfo, GetEntryInfo};
99 use crate::directory::traversal_position::TraversalPosition;
100 use crate::node::Node;
101 use fuchsia_async as fasync;
102 use fuchsia_sync::Mutex;
103 use zx_status::Status;
104
105 struct FakeDirectory(Mutex<Inner>);
106
107 impl FakeDirectory {
108 fn new() -> Arc<Self> {
109 Arc::new(FakeDirectory(Mutex::new(Inner {
110 remove_called: false,
111 watchers: Watchers::new(),
112 })))
113 }
114 }
115 struct Inner {
116 remove_called: bool,
117 watchers: Watchers,
118 }
119
120 impl Directory for FakeDirectory {
121 fn open(
122 self: Arc<Self>,
123 _scope: ExecutionScope,
124 _path: crate::Path,
125 _flags: fio::Flags,
126 _object_request: crate::ObjectRequestRef<'_>,
127 ) -> Result<(), Status> {
128 unimplemented!()
129 }
130
131 async fn read_dirents<'a>(
132 &'a self,
133 _pos: &'a TraversalPosition,
134 _sink: Box<dyn crate::directory::dirents_sink::Sink>,
135 ) -> Result<(TraversalPosition, Box<dyn Sealed>), Status> {
136 unimplemented!()
137 }
138
139 fn register_watcher(
140 self: Arc<Self>,
141 scope: ExecutionScope,
142 mask: fio::WatchMask,
143 watcher: DirectoryWatcher,
144 ) -> Result<(), Status> {
145 let _ = self.0.lock().watchers.add(scope, self.clone(), mask, watcher);
146 Ok(())
147 }
148
149 fn unregister_watcher(self: Arc<Self>, key: usize) {
150 let mut this = self.0.lock();
151 this.remove_called = true;
152 this.watchers.remove(key);
153 }
154 }
155
156 impl Node for FakeDirectory {
157 async fn get_attributes(
158 &self,
159 _requested_attributes: fio::NodeAttributesQuery,
160 ) -> Result<fio::NodeAttributes2, Status> {
161 unimplemented!()
162 }
163 }
164
165 impl GetEntryInfo for FakeDirectory {
166 fn entry_info(&self) -> EntryInfo {
167 unimplemented!()
168 }
169 }
170
171 #[fuchsia::test]
172 fn test_unregister_watcher_on_peer_closed() {
173 let mut executor = fasync::TestExecutor::new();
174 let directory = FakeDirectory::new();
175 let (client, server) = fidl::endpoints::create_endpoints::<fio::DirectoryWatcherMarker>();
176 directory
177 .clone()
178 .register_watcher(ExecutionScope::new(), fio::WatchMask::EXISTING, server.into())
179 .expect("Failed to register watcher");
180 assert!(!directory.0.lock().remove_called);
181 assert_eq!(directory.0.lock().watchers.0.len(), 1);
182
183 std::mem::drop(client);
185 let _ = executor.run_until_stalled(&mut std::future::pending::<()>());
187 assert!(directory.0.lock().remove_called);
188 assert_eq!(directory.0.lock().watchers.0.len(), 0);
189 }
190
191 #[fuchsia::test]
192 fn test_unregister_watcher_on_message() {
193 let mut executor = fasync::TestExecutor::new();
194 let directory = FakeDirectory::new();
195 let (client, server) = fidl::endpoints::create_endpoints::<fio::DirectoryWatcherMarker>();
196 directory
197 .clone()
198 .register_watcher(ExecutionScope::new(), fio::WatchMask::EXISTING, server.into())
199 .expect("Failed to register watcher");
200 assert!(!directory.0.lock().remove_called);
201 assert_eq!(directory.0.lock().watchers.0.len(), 1);
202
203 client.channel().write(&[1, 2, 3], &mut []).expect("Failed to write to channel");
206 let _ = executor.run_until_stalled(&mut std::future::pending::<()>());
208 assert!(directory.0.lock().remove_called);
209 assert_eq!(directory.0.lock().watchers.0.len(), 0);
210 }
211}