vfs/directory/watchers/
watcher.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

//! A task that is run to process communication with an individual watcher.

use crate::directory::entry_container::DirectoryWatcher;
use crate::directory::watchers::event_producers::EventProducer;
use crate::execution_scope::ExecutionScope;

use fidl_fuchsia_io as fio;
use futures::channel::mpsc::{self, UnboundedSender};
use futures::{select, FutureExt};

#[cfg(not(target_os = "fuchsia"))]
use fuchsia_async::emulated_handle::MessageBuf;
#[cfg(target_os = "fuchsia")]
use zx::MessageBuf;

/// `done` is not guaranteed to be called if the task failed to start.  It should only happen
/// in case the return value is an `Err`.  Unfortunately, there is no way to return the `done`
/// object itself, as the [`futures::Spawn::spawn_obj`] does not return the ownership in case
/// of a failure.
pub(crate) fn new(
    scope: ExecutionScope,
    mask: fio::WatchMask,
    watcher: DirectoryWatcher,
    done: impl FnOnce() + Send + 'static,
) -> Controller {
    use futures::StreamExt as _;

    let (sender, mut receiver) = mpsc::unbounded::<Vec<u8>>();

    let task = async move {
        let _done = CallOnDrop(Some(done));
        let mut buf = MessageBuf::new();
        let mut recv_msg = watcher.channel().recv_msg(&mut buf).fuse();
        loop {
            select! {
                command = receiver.next() => match command {
                    Some(message) => {
                        let result = watcher.channel().write(&*message, &mut []);
                        if result.is_err() {
                            break;
                        }
                    },
                    None => break,
                },
                _ = recv_msg => {
                    // We do not expect any messages to be received over the watcher connection.
                    // Should we receive a message we will close the connection to indicate an
                    // error.  If any error occurs, we also close the connection.  And if the
                    // connection is closed, we just stop the command processing as well.
                    break;
                },
            }
        }
    };

    scope.spawn(task);
    Controller { mask, messages: sender }
}

pub struct Controller {
    mask: fio::WatchMask,
    messages: UnboundedSender<Vec<u8>>,
}

impl Controller {
    /// Sends a buffer to the connected watcher.  `mask` specifies the type of the event the buffer
    /// is for.  If the watcher mask does not include the event specified by the `mask` then the
    /// buffer is not sent and `buffer` is not even invoked.
    pub(crate) fn send_buffer(&self, mask: fio::WatchMask, buffer: impl FnOnce() -> Vec<u8>) {
        if !self.mask.intersects(mask) {
            return;
        }

        if self.messages.unbounded_send(buffer()).is_ok() {
            return;
        }

        // An error to send indicates the execution task has been disconnected.  Controller should
        // always be removed from the watchers list before it is destroyed.  So this is some
        // logical bug.
        debug_assert!(false, "Watcher controller failed to send a command to the watcher.");
    }

    /// Uses a `producer` to generate one or more buffers and send them all to the connected
    /// watcher.  `producer.mask()` is used to determine the type of the event - in case the
    /// watcher mask does not specify that it needs to receive this event, then the producer is not
    /// used and `false` is returned.  If the producers mask and the watcher mask overlap, then
    /// `true` is returned (even if the producer did not generate a single buffer).
    pub fn send_event(&self, producer: &mut dyn EventProducer) -> bool {
        if !self.mask.intersects(producer.mask()) {
            return false;
        }

        while producer.prepare_for_next_buffer() {
            let buffer = producer.buffer();
            if self.messages.unbounded_send(buffer).is_ok() {
                continue;
            }

            // An error to send indicates the execution task has been disconnected.  Controller
            // should always be removed from the watchers list before it is destroyed.  So this is
            // some logical bug.
            debug_assert!(false, "Watcher controller failed to send a command to the watcher.");
        }

        return true;
    }
}

/// Calls the function when this object is dropped.
struct CallOnDrop<F: FnOnce()>(Option<F>);

impl<F: FnOnce()> Drop for CallOnDrop<F> {
    fn drop(&mut self) {
        self.0.take().unwrap()();
    }
}