fuchsia_async/handle/zircon/rwhandle.rs
1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::runtime::{EHandle, PacketReceiver, ReceiverRegistration};
6use crate::OnSignalsRef;
7use std::sync::{Arc, Mutex};
8use std::task::{ready, Context, Poll, Waker};
9use zx::{self as zx, AsHandleRef};
10
11const OBJECT_PEER_CLOSED: zx::Signals = zx::Signals::OBJECT_PEER_CLOSED;
12const OBJECT_READABLE: zx::Signals = zx::Signals::OBJECT_READABLE;
13const OBJECT_WRITABLE: zx::Signals = zx::Signals::OBJECT_WRITABLE;
14
15/// State of an object when it is ready for reading.
16#[derive(Debug, PartialEq, Eq, Copy, Clone)]
17pub enum ReadableState {
18 /// Received `OBJECT_READABLE`, or optimistically assuming the object is readable.
19 Readable,
20 /// Received `OBJECT_PEER_CLOSED`. The object might also be readable.
21 MaybeReadableAndClosed,
22}
23
24/// State of an object when it is ready for writing.
25#[derive(Debug, PartialEq, Eq, Copy, Clone)]
26pub enum WritableState {
27 /// Received `OBJECT_WRITABLE`, or optimistically assuming the object is writable.
28 Writable,
29 /// Received `OBJECT_PEER_CLOSED`.
30 Closed,
31}
32
33/// A `Handle` that receives notifications when it is readable.
34///
35/// # Examples
36///
37/// ```
38/// loop {
39/// ready!(self.poll_readable(cx))?;
40/// match /* make read syscall */ {
41/// Err(zx::Status::SHOULD_WAIT) => ready!(self.need_readable(cx)?),
42/// status => return Poll::Ready(status),
43/// }
44/// }
45/// ```
46pub trait ReadableHandle {
47 /// If the object is ready for reading, returns `Ready` with the readable
48 /// state. If the implementor returns Pending, it should first ensure that
49 /// `need_readable` is called.
50 ///
51 /// This should be called in a poll function. If the syscall returns
52 /// `SHOULD_WAIT`, you must call `need_readable` to schedule wakeup when the
53 /// object is readable.
54 ///
55 /// The returned `ReadableState` does not necessarily reflect an observed
56 /// `OBJECT_READABLE` signal. We optimistically assume the object remains
57 /// readable until `need_readable` is called.
58 fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<Result<ReadableState, zx::Status>>;
59
60 /// Arranges for the current task to be woken when the object receives an
61 /// `OBJECT_READABLE` or `OBJECT_PEER_CLOSED` signal. This can return
62 /// Poll::Ready if the object has already been signaled in which case the
63 /// waker *will* not be woken and it is the caller's responsibility to not
64 /// lose the signal.
65 fn need_readable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>>;
66}
67
68/// A `Handle` that receives notifications when it is writable.
69///
70/// # Examples
71///
72/// ```
73/// loop {
74/// ready!(self.poll_writable(cx))?;
75/// match /* make write syscall */ {
76/// Err(zx::Status::SHOULD_WAIT) => ready!(self.need_writable(cx)?),
77/// status => Poll::Ready(status),
78/// }
79/// }
80/// ```
81pub trait WritableHandle {
82 /// If the object is ready for writing, returns `Ready` with the writable
83 /// state. If the implementor returns Pending, it should first ensure that
84 /// `need_writable` is called.
85 ///
86 /// This should be called in a poll function. If the syscall returns
87 /// `SHOULD_WAIT`, you must call `need_writable` to schedule wakeup when the
88 /// object is writable.
89 ///
90 /// The returned `WritableState` does not necessarily reflect an observed
91 /// `OBJECT_WRITABLE` signal. We optimistically assume the object remains
92 /// writable until `need_writable` is called.
93 fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<Result<WritableState, zx::Status>>;
94
95 /// Arranges for the current task to be woken when the object receives an
96 /// `OBJECT_WRITABLE` or `OBJECT_PEER_CLOSED` signal. This can return
97 /// Poll::Ready if the object has already been signaled in which case the
98 /// waker *will* not be woken and it is the caller's responsibility to not
99 /// lose the signal.
100 fn need_writable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>>;
101}
102
103struct RWPacketReceiver(Mutex<Inner>);
104
105struct Inner {
106 signals: zx::Signals,
107 read_task: Option<Waker>,
108 write_task: Option<Waker>,
109}
110
111impl PacketReceiver for RWPacketReceiver {
112 fn receive_packet(&self, packet: zx::Packet) {
113 let new = if let zx::PacketContents::SignalOne(p) = packet.contents() {
114 p.observed()
115 } else {
116 return;
117 };
118
119 // We wake the tasks when the lock isn't held in case the wakers need the same lock.
120 let mut read_task = None;
121 let mut write_task = None;
122 {
123 let mut inner = self.0.lock().unwrap();
124 let old = inner.signals;
125 inner.signals |= new;
126
127 let became_readable = new.contains(OBJECT_READABLE) && !old.contains(OBJECT_READABLE);
128 let became_writable = new.contains(OBJECT_WRITABLE) && !old.contains(OBJECT_WRITABLE);
129 let became_closed =
130 new.contains(OBJECT_PEER_CLOSED) && !old.contains(OBJECT_PEER_CLOSED);
131
132 if became_readable || became_closed {
133 read_task = inner.read_task.take();
134 }
135 if became_writable || became_closed {
136 write_task = inner.write_task.take();
137 }
138 }
139 // *NOTE*: This is the only safe place to wake wakers. In any other location, there is a
140 // risk that locks are held which might be required when the waker is woken. It is safe to
141 // wake here because this is called from the executor when no locks are held.
142 if let Some(read_task) = read_task {
143 read_task.wake();
144 }
145 if let Some(write_task) = write_task {
146 write_task.wake();
147 }
148 }
149}
150
151/// A `Handle` that receives notifications when it is readable/writable.
152pub struct RWHandle<T> {
153 handle: T,
154 receiver: ReceiverRegistration<RWPacketReceiver>,
155}
156
157impl<T> RWHandle<T>
158where
159 T: AsHandleRef,
160{
161 /// Creates a new `RWHandle` object which will receive notifications when
162 /// the underlying handle becomes readable, writable, or closes.
163 ///
164 /// # Panics
165 ///
166 /// If called outside the context of an active async executor.
167 pub fn new(handle: T) -> Self {
168 let ehandle = EHandle::local();
169
170 let initial_signals = OBJECT_READABLE | OBJECT_WRITABLE;
171 let receiver = ehandle.register_receiver(Arc::new(RWPacketReceiver(Mutex::new(Inner {
172 // Optimistically assume that the handle is readable and writable.
173 // Reads and writes will be attempted before queueing a packet.
174 // This makes handles slightly faster to read/write the first time
175 // they're accessed after being created, provided they start off as
176 // readable or writable. In return, there will be an extra wasted
177 // syscall per read/write if the handle is not readable or writable.
178 signals: initial_signals,
179 read_task: None,
180 write_task: None,
181 }))));
182
183 RWHandle { handle, receiver }
184 }
185
186 /// Returns a reference to the underlying handle.
187 pub fn get_ref(&self) -> &T {
188 &self.handle
189 }
190
191 /// Returns a mutable reference to the underlying handle.
192 pub fn get_mut(&mut self) -> &mut T {
193 &mut self.handle
194 }
195
196 /// Consumes `self` and returns the underlying handle.
197 pub fn into_inner(self) -> T {
198 self.handle
199 }
200
201 /// Returns true if the object received the `OBJECT_PEER_CLOSED` signal.
202 pub fn is_closed(&self) -> bool {
203 let signals = self.receiver().0.lock().unwrap().signals;
204 if signals.contains(OBJECT_PEER_CLOSED) {
205 return true;
206 }
207
208 // The signals bitset might not be updated if we haven't gotten around to processing the
209 // packet telling us that yet. To provide an up-to-date response, we query the current
210 // state of the signal.
211 //
212 // Note: we _could_ update the bitset with what we find here, if we're careful to also
213 // update READABLE + WRITEABLE at the same time, and also wakeup the tasks as necessary.
214 // But having `is_closed` wakeup tasks if it discovered a signal change seems too weird, so
215 // we just leave the bitset as-is and let the regular notification mechanism get around to
216 // it when it gets around to it.
217 match self.handle.wait_handle(OBJECT_PEER_CLOSED, zx::MonotonicInstant::INFINITE_PAST) {
218 Ok(_) => true,
219 Err(zx::Status::TIMED_OUT) => false,
220 Err(status) => {
221 // None of the other documented error statuses should be possible, either the type
222 // system doesn't allow it or the wait from `RWHandle::new()` would have already
223 // failed.
224 unreachable!("status: {status}")
225 }
226 }
227 }
228
229 /// Returns a future that completes when `is_closed()` is true.
230 pub fn on_closed(&self) -> OnSignalsRef<'_> {
231 OnSignalsRef::new(self.handle.as_handle_ref(), OBJECT_PEER_CLOSED)
232 }
233
234 fn receiver(&self) -> &RWPacketReceiver {
235 self.receiver.receiver()
236 }
237
238 fn need_signal(
239 &self,
240 cx: &mut Context<'_>,
241 for_read: bool,
242 signal: zx::Signals,
243 ) -> Poll<Result<(), zx::Status>> {
244 let mut inner = self.receiver.0.lock().unwrap();
245 let old = inner.signals;
246 if old.contains(zx::Signals::OBJECT_PEER_CLOSED) {
247 // We don't want to return an error here because even though the peer has closed, the
248 // object could still have queued messages that can be read.
249 Poll::Ready(Ok(()))
250 } else {
251 let waker = cx.waker().clone();
252 if for_read {
253 inner.read_task = Some(waker);
254 } else {
255 inner.write_task = Some(waker);
256 }
257 if old.contains(signal) {
258 inner.signals &= !signal;
259 std::mem::drop(inner);
260 self.handle.wait_async_handle(
261 self.receiver.port(),
262 self.receiver.key(),
263 signal | zx::Signals::OBJECT_PEER_CLOSED,
264 zx::WaitAsyncOpts::empty(),
265 )?;
266 }
267 Poll::Pending
268 }
269 }
270}
271
272impl<T> ReadableHandle for RWHandle<T>
273where
274 T: AsHandleRef,
275{
276 fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<Result<ReadableState, zx::Status>> {
277 loop {
278 let signals = self.receiver().0.lock().unwrap().signals;
279 match (signals.contains(OBJECT_READABLE), signals.contains(OBJECT_PEER_CLOSED)) {
280 (true, false) => return Poll::Ready(Ok(ReadableState::Readable)),
281 (_, true) => return Poll::Ready(Ok(ReadableState::MaybeReadableAndClosed)),
282 (false, false) => {
283 ready!(self.need_signal(cx, true, OBJECT_READABLE)?)
284 }
285 }
286 }
287 }
288
289 fn need_readable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
290 self.need_signal(cx, true, OBJECT_READABLE)
291 }
292}
293
294impl<T> WritableHandle for RWHandle<T>
295where
296 T: AsHandleRef,
297{
298 fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<Result<WritableState, zx::Status>> {
299 loop {
300 let signals = self.receiver().0.lock().unwrap().signals;
301 match (signals.contains(OBJECT_WRITABLE), signals.contains(OBJECT_PEER_CLOSED)) {
302 (_, true) => return Poll::Ready(Ok(WritableState::Closed)),
303 (true, _) => return Poll::Ready(Ok(WritableState::Writable)),
304 (false, false) => {
305 ready!(self.need_signal(cx, false, OBJECT_WRITABLE)?)
306 }
307 }
308 }
309 }
310
311 fn need_writable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
312 self.need_signal(cx, false, OBJECT_WRITABLE)
313 }
314}
315
316#[cfg(test)]
317mod tests {
318 use super::*;
319 use crate::TestExecutor;
320
321 #[test]
322 fn is_closed_immediately_after_close() {
323 let mut exec = TestExecutor::new();
324 let (tx, rx) = zx::Channel::create();
325 let rx_rw_handle = RWHandle::new(rx);
326 let mut noop_ctx = Context::from_waker(futures::task::noop_waker_ref());
327 // Clear optimistic readable state
328 assert!(rx_rw_handle.need_readable(&mut noop_ctx).is_pending());
329 // Starting state: the channel is not closed (because we haven't closed it yet)
330 assert_eq!(rx_rw_handle.is_closed(), false);
331 // we will never set readable, so this should be Pending until we close
332 assert_eq!(rx_rw_handle.poll_readable(&mut noop_ctx), Poll::Pending);
333
334 drop(tx);
335
336 // Implementation note: the cached state will not be updated yet
337 assert_eq!(rx_rw_handle.poll_readable(&mut noop_ctx), Poll::Pending);
338 // But is_closed should return true immediately
339 assert_eq!(rx_rw_handle.is_closed(), true);
340 // Still not updated, and won't be until we let the executor process port packets
341 assert_eq!(rx_rw_handle.poll_readable(&mut noop_ctx), Poll::Pending);
342 // So we do
343 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
344 // And now it is updated, so we observe Closed
345 assert_eq!(
346 rx_rw_handle.poll_readable(&mut noop_ctx),
347 Poll::Ready(Ok(ReadableState::MaybeReadableAndClosed))
348 );
349 // And is_closed should still be true, of course.
350 assert_eq!(rx_rw_handle.is_closed(), true);
351 }
352}