1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

use {
    crate::{
        runtime::{EHandle, PacketReceiver, ReceiverRegistration},
        OnSignalsRef,
    },
    fuchsia_zircon::{self as zx, AsHandleRef},
    std::{
        sync::{Arc, Mutex},
        task::{ready, Context, Poll, Waker},
    },
};

const OBJECT_PEER_CLOSED: zx::Signals = zx::Signals::OBJECT_PEER_CLOSED;
const OBJECT_READABLE: zx::Signals = zx::Signals::OBJECT_READABLE;
const OBJECT_WRITABLE: zx::Signals = zx::Signals::OBJECT_WRITABLE;

/// State of an object when it is ready for reading.
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub enum ReadableState {
    /// Received `OBJECT_READABLE`, or optimistically assuming the object is readable.
    Readable,
    /// Received `OBJECT_PEER_CLOSED`.
    Closed,
    /// Both `Readable` and `Closed` apply.
    ReadableAndClosed,
}

/// State of an object when it is ready for writing.
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub enum WritableState {
    /// Received `OBJECT_WRITABLE`, or optimistically assuming the object is writable.
    Writable,
    /// Received `OBJECT_PEER_CLOSED`.
    Closed,
}

/// A `Handle` that receives notifications when it is readable.
///
/// # Examples
///
/// ```
/// loop {
///     ready!(self.poll_readable(cx))?;
///     match /* make read syscall */ {
///         Err(zx::Status::SHOULD_WAIT) => ready!(self.need_readable(cx)?),
///         status => return Poll::Ready(status),
///     }
/// }
/// ```
pub trait ReadableHandle {
    /// If the object is ready for reading, returns `Ready` with the readable
    /// state. If the implementor returns Pending, it should first ensure that
    /// `need_readable` is called.
    ///
    /// This should be called in a poll function. If the syscall returns
    /// `SHOULD_WAIT`, you must call `need_readable` to schedule wakeup when the
    /// object is readable.
    ///
    /// The returned `ReadableState` does not necessarily reflect an observed
    /// `OBJECT_READABLE` signal. We optimistically assume the object remains
    /// readable until `need_readable` is called.
    fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<Result<ReadableState, zx::Status>>;

    /// Arranges for the current task to be woken when the object receives an
    /// `OBJECT_READABLE` or `OBJECT_PEER_CLOSED` signal.  This can return
    /// Poll::Ready if the object has already been signaled in which case the
    /// waker *will* not be woken and it is the caller's responsibility to not
    /// lose the signal.
    fn need_readable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>>;
}

/// A `Handle` that receives notifications when it is writable.
///
/// # Examples
///
/// ```
/// loop {
///     ready!(self.poll_writable(cx))?;
///     match /* make write syscall */ {
///         Err(zx::Status::SHOULD_WAIT) => ready!(self.need_writable(cx)?),
///         status => Poll::Ready(status),
///     }
/// }
/// ```
pub trait WritableHandle {
    /// If the object is ready for writing, returns `Ready` with the writable
    /// state. If the implementor returns Pending, it should first ensure that
    /// `need_writable` is called.
    ///
    /// This should be called in a poll function. If the syscall returns
    /// `SHOULD_WAIT`, you must call `need_writable` to schedule wakeup when the
    /// object is writable.
    ///
    /// The returned `WritableState` does not necessarily reflect an observed
    /// `OBJECT_WRITABLE` signal. We optimistically assume the object remains
    /// writable until `need_writable` is called.
    fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<Result<WritableState, zx::Status>>;

    /// Arranges for the current task to be woken when the object receives an
    /// `OBJECT_WRITABLE` or `OBJECT_PEER_CLOSED` signal. This can return
    /// Poll::Ready if the object has already been signaled in which case the
    /// waker *will* not be woken and it is the caller's responsibility to not
    /// lose the signal.
    fn need_writable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>>;
}

struct RWPacketReceiver(Mutex<Inner>);

struct Inner {
    signals: zx::Signals,
    read_task: Option<Waker>,
    write_task: Option<Waker>,
}

impl PacketReceiver for RWPacketReceiver {
    fn receive_packet(&self, packet: zx::Packet) {
        let new = if let zx::PacketContents::SignalOne(p) = packet.contents() {
            p.observed()
        } else {
            return;
        };

        // We wake the tasks when the lock isn't held in case the wakers need the same lock.
        let mut read_task = None;
        let mut write_task = None;
        {
            let mut inner = self.0.lock().unwrap();
            let old = inner.signals;
            inner.signals |= new;

            let became_readable = new.contains(OBJECT_READABLE) && !old.contains(OBJECT_READABLE);
            let became_writable = new.contains(OBJECT_WRITABLE) && !old.contains(OBJECT_WRITABLE);
            let became_closed =
                new.contains(OBJECT_PEER_CLOSED) && !old.contains(OBJECT_PEER_CLOSED);

            if became_readable || became_closed {
                read_task = inner.read_task.take();
            }
            if became_writable || became_closed {
                write_task = inner.write_task.take();
            }
        }
        // *NOTE*: This is the only safe place to wake wakers.  In any other location, there is a
        // risk that locks are held which might be required when the waker is woken.  It is safe to
        // wake here because this is called from the executor when no locks are held.
        if let Some(read_task) = read_task {
            read_task.wake();
        }
        if let Some(write_task) = write_task {
            write_task.wake();
        }
    }
}

/// A `Handle` that receives notifications when it is readable/writable.
pub struct RWHandle<T> {
    handle: T,
    receiver: ReceiverRegistration<RWPacketReceiver>,
}

impl<T> RWHandle<T>
where
    T: AsHandleRef,
{
    /// Creates a new `RWHandle` object which will receive notifications when
    /// the underlying handle becomes readable, writable, or closes.
    ///
    /// # Panics
    ///
    /// If called outside the context of an active async executor.
    pub fn new(handle: T) -> Self {
        let ehandle = EHandle::local();

        let initial_signals = OBJECT_READABLE | OBJECT_WRITABLE;
        let receiver = ehandle.register_receiver(Arc::new(RWPacketReceiver(Mutex::new(Inner {
            // Optimistically assume that the handle is readable and writable.
            // Reads and writes will be attempted before queueing a packet.
            // This makes handles slightly faster to read/write the first time
            // they're accessed after being created, provided they start off as
            // readable or writable. In return, there will be an extra wasted
            // syscall per read/write if the handle is not readable or writable.
            signals: initial_signals,
            read_task: None,
            write_task: None,
        }))));

        RWHandle { handle, receiver }
    }

    /// Returns a reference to the underlying handle.
    pub fn get_ref(&self) -> &T {
        &self.handle
    }

    /// Returns a mutable reference to the underlying handle.
    pub fn get_mut(&mut self) -> &mut T {
        &mut self.handle
    }

    /// Consumes `self` and returns the underlying handle.
    pub fn into_inner(self) -> T {
        self.handle
    }

    /// Returns true if the object received the `OBJECT_PEER_CLOSED` signal.
    pub fn is_closed(&self) -> bool {
        let signals = self.receiver().0.lock().unwrap().signals;
        if signals.contains(OBJECT_PEER_CLOSED) {
            return true;
        }

        // The signals bitset might not be updated if we haven't gotten around to processing the
        // packet telling us that yet. To provide an up-to-date response, we query the current
        // state of the signal.
        //
        // Note: we _could_ update the bitset with what we find here, if we're careful to also
        // update READABLE + WRITEABLE at the same time, and also wakeup the tasks as necessary.
        // But having `is_closed` wakeup tasks if it discovered a signal change seems too weird, so
        // we just leave the bitset as-is and let the regular notification mechanism get around to
        // it when it gets around to it.
        match self.handle.wait_handle(OBJECT_PEER_CLOSED, zx::Time::INFINITE_PAST) {
            Ok(_) => true,
            Err(zx::Status::TIMED_OUT) => false,
            Err(status) => {
                // None of the other documented error statuses should be possible, either the type
                // system doesn't allow it or the wait from `RWHandle::new()` would have already
                // failed.
                unreachable!("status: {status}")
            }
        }
    }

    /// Returns a future that completes when `is_closed()` is true.
    pub fn on_closed(&self) -> OnSignalsRef<'_> {
        OnSignalsRef::new(self.handle.as_handle_ref(), OBJECT_PEER_CLOSED)
    }

    fn receiver(&self) -> &RWPacketReceiver {
        self.receiver.receiver()
    }

    fn need_signal(
        &self,
        cx: &mut Context<'_>,
        for_read: bool,
        signal: zx::Signals,
    ) -> Poll<Result<(), zx::Status>> {
        let mut inner = self.receiver.0.lock().unwrap();
        let old = inner.signals;
        if old.contains(zx::Signals::OBJECT_PEER_CLOSED) {
            // We don't want to return an error here because even though the peer has closed, the
            // object could still have queued messages that can be read.
            Poll::Ready(Ok(()))
        } else {
            let waker = cx.waker().clone();
            if for_read {
                inner.read_task = Some(waker);
            } else {
                inner.write_task = Some(waker);
            }
            if old.contains(signal) {
                inner.signals &= !signal;
                std::mem::drop(inner);
                self.handle.wait_async_handle(
                    self.receiver.port(),
                    self.receiver.key(),
                    signal | zx::Signals::OBJECT_PEER_CLOSED,
                    zx::WaitAsyncOpts::empty(),
                )?;
            }
            Poll::Pending
        }
    }
}

impl<T> ReadableHandle for RWHandle<T>
where
    T: AsHandleRef,
{
    fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<Result<ReadableState, zx::Status>> {
        loop {
            let signals = self.receiver().0.lock().unwrap().signals;
            match (signals.contains(OBJECT_READABLE), signals.contains(OBJECT_PEER_CLOSED)) {
                (true, false) => return Poll::Ready(Ok(ReadableState::Readable)),
                (false, true) => return Poll::Ready(Ok(ReadableState::Closed)),
                (true, true) => return Poll::Ready(Ok(ReadableState::ReadableAndClosed)),
                (false, false) => {
                    ready!(self.need_signal(cx, true, OBJECT_READABLE)?)
                }
            }
        }
    }

    fn need_readable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
        self.need_signal(cx, true, OBJECT_READABLE)
    }
}

impl<T> WritableHandle for RWHandle<T>
where
    T: AsHandleRef,
{
    fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<Result<WritableState, zx::Status>> {
        loop {
            let signals = self.receiver().0.lock().unwrap().signals;
            match (signals.contains(OBJECT_WRITABLE), signals.contains(OBJECT_PEER_CLOSED)) {
                (_, true) => return Poll::Ready(Ok(WritableState::Closed)),
                (true, _) => return Poll::Ready(Ok(WritableState::Writable)),
                (false, false) => {
                    ready!(self.need_signal(cx, false, OBJECT_WRITABLE)?)
                }
            }
        }
    }

    fn need_writable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
        self.need_signal(cx, false, OBJECT_WRITABLE)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::TestExecutor;
    use fuchsia_zircon as zx;

    #[test]
    fn is_closed_immediately_after_close() {
        let mut exec = TestExecutor::new();
        let (tx, rx) = zx::Channel::create();
        let rx_rw_handle = RWHandle::new(rx);
        let mut noop_ctx = Context::from_waker(futures::task::noop_waker_ref());
        // Clear optimistic readable state
        assert!(rx_rw_handle.need_readable(&mut noop_ctx).is_pending());
        // Starting state: the channel is not closed (because we haven't closed it yet)
        assert_eq!(rx_rw_handle.is_closed(), false);
        // we will never set readable, so this should be Pending until we close
        assert_eq!(rx_rw_handle.poll_readable(&mut noop_ctx), Poll::Pending);

        drop(tx);

        // Implementation note: the cached state will not be updated yet
        assert_eq!(rx_rw_handle.poll_readable(&mut noop_ctx), Poll::Pending);
        // But is_closed should return true immediately
        assert_eq!(rx_rw_handle.is_closed(), true);
        // Still not updated, and won't be until we let the executor process port packets
        assert_eq!(rx_rw_handle.poll_readable(&mut noop_ctx), Poll::Pending);
        // So we do
        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
        // And now it is updated, so we observe Closed
        assert_eq!(
            rx_rw_handle.poll_readable(&mut noop_ctx),
            Poll::Ready(Ok(ReadableState::Closed))
        );
        // And is_closed should still be true, of course.
        assert_eq!(rx_rw_handle.is_closed(), true);
    }
}