vfs/directory/watchers/watcher.rs
1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! A task that is run to process communication with an individual watcher.
6
7use crate::directory::entry_container::DirectoryWatcher;
8use crate::directory::watchers::event_producers::EventProducer;
9use crate::execution_scope::ExecutionScope;
10
11use flex_fuchsia_io as fio;
12use futures::channel::mpsc::{self, UnboundedSender};
13use futures::{FutureExt, select};
14
15#[cfg(all(not(target_os = "fuchsia"), not(feature = "fdomain")))]
16use fuchsia_async::emulated_handle::MessageBuf;
17#[cfg(all(target_os = "fuchsia", not(feature = "fdomain")))]
18use zx::MessageBuf;
19
20#[derive(Clone)]
21pub struct Controller {
22 mask: fio::WatchMask,
23 messages: UnboundedSender<Vec<u8>>,
24}
25
26impl Controller {
27 /// `done` is not guaranteed to be called if the task failed to start. It should only happen
28 /// in case the return value is an `Err`. Unfortunately, there is no way to return the `done`
29 /// object itself, as the [`futures::Spawn::spawn_obj`] does not return the ownership in case
30 /// of a failure.
31 pub(crate) fn new(
32 scope: ExecutionScope,
33 mask: fio::WatchMask,
34 watcher: DirectoryWatcher,
35 done: impl FnOnce() + Send + 'static,
36 ) -> Controller {
37 use futures::StreamExt as _;
38
39 let (sender, mut receiver) = mpsc::unbounded::<Vec<u8>>();
40
41 let task = async move {
42 let _done = CallOnDrop(Some(done));
43 #[cfg(not(feature = "fdomain"))]
44 let mut buf = MessageBuf::new();
45 #[cfg(not(feature = "fdomain"))]
46 let mut recv_msg = watcher.channel().recv_msg(&mut buf).fuse();
47 #[cfg(feature = "fdomain")]
48 let mut recv_msg = watcher.channel().recv_msg().fuse();
49 loop {
50 select! {
51 command = receiver.next() => match command {
52 Some(message) => {
53 #[cfg(not(feature = "fdomain"))]
54 let result = watcher.channel().write(&*message, &mut []);
55 #[cfg(feature = "fdomain")]
56 let result = watcher.channel().write(&*message, std::vec::Vec::new());
57 if result.is_err() {
58 break;
59 }
60 },
61 None => break,
62 },
63 _ = recv_msg => {
64 // We do not expect any messages to be received over the watcher connection.
65 // Should we receive a message we will close the connection to indicate an
66 // error. If any error occurs, we also close the connection. And if the
67 // connection is closed, we just stop the command processing as well.
68 break;
69 },
70 }
71 }
72 };
73
74 scope.spawn(task);
75 Controller { mask, messages: sender }
76 }
77
78 /// Sends a buffer to the connected watcher. `mask` specifies the type of the event the buffer
79 /// is for. If the watcher mask does not include the event specified by the `mask` then the
80 /// buffer is not sent and `buffer` is not even invoked.
81 pub(crate) fn send_buffer(&self, mask: fio::WatchMask, buffer: impl FnOnce() -> Vec<u8>) {
82 if !self.mask.intersects(mask) {
83 return;
84 }
85
86 if self.messages.unbounded_send(buffer()).is_ok() {
87 return;
88 }
89
90 // An error to send indicates the execution task has been disconnected. Controller should
91 // always be removed from the watchers list before it is destroyed. So this is some
92 // logical bug.
93 debug_assert!(false, "Watcher controller failed to send a command to the watcher.");
94 }
95
96 /// Uses a `producer` to generate one or more buffers and send them all to the connected
97 /// watcher. `producer.mask()` is used to determine the type of the event - in case the
98 /// watcher mask does not specify that it needs to receive this event, then the producer is not
99 /// used and `false` is returned. If the producers mask and the watcher mask overlap, then
100 /// `true` is returned (even if the producer did not generate a single buffer).
101 pub fn send_event(&self, producer: &mut dyn EventProducer) -> bool {
102 if !self.mask.intersects(producer.mask()) {
103 return false;
104 }
105
106 while producer.prepare_for_next_buffer() {
107 let buffer = producer.buffer();
108 if self.messages.unbounded_send(buffer).is_ok() {
109 continue;
110 }
111
112 // An error to send indicates the execution task has been disconnected. Controller
113 // should always be removed from the watchers list before it is destroyed. So this is
114 // some logical bug.
115 debug_assert!(false, "Watcher controller failed to send a command to the watcher.");
116 }
117
118 return true;
119 }
120}
121
122/// Calls the function when this object is dropped.
123struct CallOnDrop<F: FnOnce()>(Option<F>);
124
125impl<F: FnOnce()> Drop for CallOnDrop<F> {
126 fn drop(&mut self) {
127 self.0.take().unwrap()();
128 }
129}