fuchsia_async/handle/zircon/rwhandle.rs
1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::OnSignalsRef;
6use crate::runtime::{EHandle, PacketReceiver, ReceiverRegistration};
7use fuchsia_sync::Mutex;
8use std::marker::PhantomData;
9use std::task::{Context, Poll, Waker, ready};
10use zx::AsHandleRef;
11
12const OBJECT_PEER_CLOSED: zx::Signals = zx::Signals::OBJECT_PEER_CLOSED;
13const OBJECT_READABLE: zx::Signals = zx::Signals::OBJECT_READABLE;
14const OBJECT_WRITABLE: zx::Signals = zx::Signals::OBJECT_WRITABLE;
15
16/// State of an object when it is ready for reading.
17#[derive(Debug, PartialEq, Eq, Copy, Clone)]
18pub enum ReadableState {
19 /// Received `OBJECT_READABLE`, or optimistically assuming the object is readable.
20 Readable,
21 /// Received `OBJECT_PEER_CLOSED`. The object might also be readable.
22 MaybeReadableAndClosed,
23}
24
25/// State of an object when it is ready for writing.
26#[derive(Debug, PartialEq, Eq, Copy, Clone)]
27pub enum WritableState {
28 /// Received `OBJECT_WRITABLE`, or optimistically assuming the object is writable.
29 Writable,
30 /// Received `OBJECT_PEER_CLOSED`.
31 Closed,
32}
33
34/// A `Handle` that receives notifications when it is readable.
35///
36/// # Examples
37///
38/// ```
39/// loop {
40/// ready!(self.poll_readable(cx))?;
41/// match /* make read syscall */ {
42/// Err(zx::Status::SHOULD_WAIT) => ready!(self.need_readable(cx)?),
43/// status => return Poll::Ready(status),
44/// }
45/// }
46/// ```
47pub trait ReadableHandle {
48 /// If the object is ready for reading, returns `Ready` with the readable
49 /// state. If the implementor returns Pending, it should first ensure that
50 /// `need_readable` is called.
51 ///
52 /// This should be called in a poll function. If the syscall returns
53 /// `SHOULD_WAIT`, you must call `need_readable` to schedule wakeup when the
54 /// object is readable.
55 ///
56 /// The returned `ReadableState` does not necessarily reflect an observed
57 /// `OBJECT_READABLE` signal. We optimistically assume the object remains
58 /// readable until `need_readable` is called.
59 fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<Result<ReadableState, zx::Status>>;
60
61 /// Arranges for the current task to be woken when the object receives an
62 /// `OBJECT_READABLE` or `OBJECT_PEER_CLOSED` signal. This can return
63 /// Poll::Ready if the object has already been signaled in which case the
64 /// waker *will* not be woken and it is the caller's responsibility to not
65 /// lose the signal.
66 fn need_readable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>>;
67}
68
69/// A `Handle` that receives notifications when it is writable.
70///
71/// # Examples
72///
73/// ```
74/// loop {
75/// ready!(self.poll_writable(cx))?;
76/// match /* make write syscall */ {
77/// Err(zx::Status::SHOULD_WAIT) => ready!(self.need_writable(cx)?),
78/// status => Poll::Ready(status),
79/// }
80/// }
81/// ```
82pub trait WritableHandle {
83 /// If the object is ready for writing, returns `Ready` with the writable
84 /// state. If the implementor returns Pending, it should first ensure that
85 /// `need_writable` is called.
86 ///
87 /// This should be called in a poll function. If the syscall returns
88 /// `SHOULD_WAIT`, you must call `need_writable` to schedule wakeup when the
89 /// object is writable.
90 ///
91 /// The returned `WritableState` does not necessarily reflect an observed
92 /// `OBJECT_WRITABLE` signal. We optimistically assume the object remains
93 /// writable until `need_writable` is called.
94 fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<Result<WritableState, zx::Status>>;
95
96 /// Arranges for the current task to be woken when the object receives an
97 /// `OBJECT_WRITABLE` or `OBJECT_PEER_CLOSED` signal. This can return
98 /// Poll::Ready if the object has already been signaled in which case the
99 /// waker *will* not be woken and it is the caller's responsibility to not
100 /// lose the signal.
101 fn need_writable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>>;
102}
103
104struct RWPacketReceiver<S: RWHandleSpec> {
105 inner: Mutex<Inner>,
106 _marker: PhantomData<S>,
107}
108
109struct Inner {
110 signals: zx::Signals,
111 read_task: Option<Waker>,
112 write_task: Option<Waker>,
113}
114
115impl<S: RWHandleSpec> PacketReceiver for RWPacketReceiver<S> {
116 fn receive_packet(&self, packet: zx::Packet) {
117 let new = if let zx::PacketContents::SignalOne(p) = packet.contents() {
118 // Only consider the signals that were part of the trigger. This
119 // ensures that only the packets generated by the correct signal
120 // observer (Read or Write) can raise the corresponding inner signal
121 // bit.
122 //
123 // Without this, we can lose track of how many port observers are
124 // installed.
125 p.observed() & p.trigger()
126 } else {
127 return;
128 };
129
130 // We wake the tasks when the lock isn't held in case the wakers need the same lock.
131 let mut read_task = None;
132 let mut write_task = None;
133 {
134 let mut inner = self.inner.lock();
135 let old = inner.signals;
136 inner.signals |= new;
137
138 let became_readable =
139 new.intersection(S::READABLE_SIGNALS) != old.intersection(S::READABLE_SIGNALS);
140 let became_writable =
141 new.intersection(S::WRITABLE_SIGNALS) != old.intersection(S::WRITABLE_SIGNALS);
142 let became_closed =
143 new.contains(OBJECT_PEER_CLOSED) && !old.contains(OBJECT_PEER_CLOSED);
144 if became_readable || became_closed {
145 read_task = inner.read_task.take();
146 }
147 if became_writable || became_closed {
148 write_task = inner.write_task.take();
149 }
150 }
151 // *NOTE*: This is the only safe place to wake wakers. In any other location, there is a
152 // risk that locks are held which might be required when the waker is woken. It is safe to
153 // wake here because this is called from the executor when no locks are held.
154 if let Some(read_task) = read_task {
155 read_task.wake();
156 }
157 if let Some(write_task) = write_task {
158 write_task.wake();
159 }
160 }
161}
162
163/// A `Handle` that receives notifications when it is readable/writable.
164pub struct RWHandle<T, S: RWHandleSpec = DefaultRWHandleSpec> {
165 handle: T,
166 receiver: ReceiverRegistration<RWPacketReceiver<S>>,
167}
168
169impl<T> RWHandle<T, DefaultRWHandleSpec>
170where
171 T: AsHandleRef,
172{
173 /// Creates a new `RWHandle` object which will receive notifications when
174 /// the underlying handle becomes readable, writable, or closes.
175 ///
176 /// # Panics
177 ///
178 /// If called outside the context of an active async executor.
179 pub fn new(handle: T) -> Self {
180 Self::new_with_spec(handle)
181 }
182}
183
184impl<T, S> RWHandle<T, S>
185where
186 T: AsHandleRef,
187 S: RWHandleSpec,
188{
189 /// Creates a new `RWHandle` with a non-default spec and an object which
190 /// will receive notifications when the underlying handle becomes readable,
191 /// writable, or closes.
192 ///
193 /// # Panics
194 ///
195 /// If called outside the context of an active async executor.
196 ///
197 /// If `S::READABLE_SIGNALS` or `S::WRITABLE_SIGNALS` contain
198 /// [`zx::Signals::OBJECT_PEER_CLOSED`] or have a non-empty intersection.
199 pub fn new_with_spec(handle: T) -> Self {
200 let ehandle = EHandle::local();
201 let internal_signals = OBJECT_PEER_CLOSED;
202 assert!(
203 !S::READABLE_SIGNALS.contains(internal_signals),
204 "readable signals may not contain ({internal_signals:?})"
205 );
206 assert!(
207 !S::WRITABLE_SIGNALS.contains(internal_signals),
208 "writable signals may not contain ({internal_signals:?})"
209 );
210 assert!(!S::WRITABLE_SIGNALS.intersects(S::READABLE_SIGNALS), "signals may not intersect");
211 let initial_signals = S::WRITABLE_SIGNALS | S::READABLE_SIGNALS;
212 let receiver = ehandle.register_receiver(RWPacketReceiver {
213 inner: Mutex::new(Inner {
214 // Optimistically assume that the handle is readable and writable.
215 // Reads and writes will be attempted before queueing a packet.
216 // This makes handles slightly faster to read/write the first time
217 // they're accessed after being created, provided they start off as
218 // readable or writable. In return, there will be an extra wasted
219 // syscall per read/write if the handle is not readable or writable.
220 signals: initial_signals,
221 read_task: None,
222 write_task: None,
223 }),
224 _marker: PhantomData,
225 });
226
227 RWHandle { handle, receiver }
228 }
229
230 /// Returns a reference to the underlying handle.
231 pub fn get_ref(&self) -> &T {
232 &self.handle
233 }
234
235 /// Returns a mutable reference to the underlying handle.
236 pub fn get_mut(&mut self) -> &mut T {
237 &mut self.handle
238 }
239
240 /// Consumes `self` and returns the underlying handle.
241 pub fn into_inner(self) -> T {
242 self.handle
243 }
244
245 /// Returns true if the object received the `OBJECT_PEER_CLOSED` signal.
246 pub fn is_closed(&self) -> bool {
247 let signals = self.receiver().inner.lock().signals;
248 if signals.contains(OBJECT_PEER_CLOSED) {
249 return true;
250 }
251
252 // The signals bitset might not be updated if we haven't gotten around to processing the
253 // packet telling us that yet. To provide an up-to-date response, we query the current
254 // state of the signal.
255 //
256 // Note: we _could_ update the bitset with what we find here, if we're careful to also
257 // update READABLE + WRITEABLE at the same time, and also wakeup the tasks as necessary.
258 // But having `is_closed` wakeup tasks if it discovered a signal change seems too weird, so
259 // we just leave the bitset as-is and let the regular notification mechanism get around to
260 // it when it gets around to it.
261 match self
262 .handle
263 .as_handle_ref()
264 .wait_one(OBJECT_PEER_CLOSED, zx::MonotonicInstant::INFINITE_PAST)
265 .to_result()
266 {
267 Ok(_) => true,
268 Err(zx::Status::TIMED_OUT) => false,
269 Err(status) => {
270 // None of the other documented error statuses should be possible, either the type
271 // system doesn't allow it or the wait from `RWHandle::new()` would have already
272 // failed.
273 unreachable!("status: {status}")
274 }
275 }
276 }
277
278 /// Returns a future that completes when `is_closed()` is true.
279 pub fn on_closed(&self) -> OnSignalsRef<'_> {
280 OnSignalsRef::new(self.handle.as_handle_ref(), OBJECT_PEER_CLOSED)
281 }
282
283 fn receiver(&self) -> &RWPacketReceiver<S> {
284 self.receiver.receiver()
285 }
286
287 fn need_signal(&self, cx: &mut Context<'_>, signal: Signal) -> Poll<Result<(), zx::Status>> {
288 let mut inner = self.receiver.inner.lock();
289 let old = inner.signals;
290 if old.contains(OBJECT_PEER_CLOSED) {
291 // We don't want to return an error here because even though the peer has closed, the
292 // object could still have queued messages that can be read.
293 Poll::Ready(Ok(()))
294 } else {
295 let waker = cx.waker().clone();
296 let signal = match signal {
297 Signal::Read => {
298 inner.read_task = Some(waker);
299 S::READABLE_SIGNALS
300 }
301 Signal::Write => {
302 inner.write_task = Some(waker);
303 S::WRITABLE_SIGNALS
304 }
305 };
306 if old.intersects(signal) {
307 inner.signals &= !signal;
308 std::mem::drop(inner);
309 self.handle.as_handle_ref().wait_async(
310 self.receiver.port(),
311 self.receiver.key(),
312 signal | OBJECT_PEER_CLOSED,
313 zx::WaitAsyncOpts::empty(),
314 )?;
315 }
316 Poll::Pending
317 }
318 }
319
320 fn poll_signal(
321 &self,
322 cx: &mut Context<'_>,
323 signal: Signal,
324 ) -> Poll<Result<zx::Signals, zx::Status>> {
325 let mask = match signal {
326 Signal::Read => S::READABLE_SIGNALS,
327 Signal::Write => S::WRITABLE_SIGNALS,
328 } | OBJECT_PEER_CLOSED;
329
330 loop {
331 let signals = self.receiver().inner.lock().signals;
332 let asserted = signals.intersection(mask);
333 if !asserted.is_empty() {
334 return Poll::Ready(Ok(asserted));
335 }
336 ready!(self.need_signal(cx, signal)?);
337 }
338 }
339}
340
341#[derive(Copy, Clone)]
342enum Signal {
343 Read,
344 Write,
345}
346
347impl<T, S> ReadableHandle for RWHandle<T, S>
348where
349 T: AsHandleRef,
350 S: RWHandleSpec,
351{
352 fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<Result<ReadableState, zx::Status>> {
353 let signals = ready!(self.poll_signal(cx, Signal::Read)?);
354 if signals.contains(OBJECT_PEER_CLOSED) {
355 return Poll::Ready(Ok(ReadableState::MaybeReadableAndClosed));
356 }
357 Poll::Ready(Ok(ReadableState::Readable))
358 }
359
360 fn need_readable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
361 self.need_signal(cx, Signal::Read)
362 }
363}
364
365impl<T, S> WritableHandle for RWHandle<T, S>
366where
367 T: AsHandleRef,
368 S: RWHandleSpec,
369{
370 fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<Result<WritableState, zx::Status>> {
371 let signals = ready!(self.poll_signal(cx, Signal::Write)?);
372 if signals.contains(OBJECT_PEER_CLOSED) {
373 return Poll::Ready(Ok(WritableState::Closed));
374 }
375 Poll::Ready(Ok(WritableState::Writable))
376 }
377
378 fn need_writable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
379 self.need_signal(cx, Signal::Write)
380 }
381}
382
383/// A trait specifying the behavior of [`RWHandle`].
384///
385/// The default behavior, listening for [`zx::Signals::OBJECT_READABLE`] and
386/// [`zx::Signals::OBJECT_WRITABLE`] is provided by [`DefaultRWHandleSpec`].
387pub trait RWHandleSpec: Send + Sync + 'static {
388 /// Signals asserted when the handle is readable.
389 ///
390 /// [`RWHandle`] installs wait for these signals and if any of them are
391 /// asserted, the handle is considered readable.
392 const READABLE_SIGNALS: zx::Signals;
393 /// Signals asserted when the handle is writable.
394 ///
395 /// [`RWHandle`] installs wait for these signals and if any of them are
396 /// asserted, the handle is considered writable.
397 const WRITABLE_SIGNALS: zx::Signals;
398}
399
400/// The default behavior for [`RWHandle`].
401///
402/// Considers the handle readable when [`zx::Signals::OBJECT_READABLE`] is set,
403/// and writable when [`zx::Signals::OBJECT_WRITABLE`] is set.
404pub struct DefaultRWHandleSpec;
405
406impl RWHandleSpec for DefaultRWHandleSpec {
407 const READABLE_SIGNALS: zx::Signals = OBJECT_READABLE;
408 const WRITABLE_SIGNALS: zx::Signals = OBJECT_WRITABLE;
409}
410
411#[cfg(test)]
412mod tests {
413 use super::*;
414 use crate::TestExecutor;
415
416 #[test]
417 fn is_closed_immediately_after_close() {
418 let mut exec = TestExecutor::new();
419 let (tx, rx) = zx::Channel::create();
420 let rx_rw_handle = RWHandle::new(rx);
421 let mut noop_ctx = Context::from_waker(Waker::noop());
422 // Clear optimistic readable state
423 assert!(rx_rw_handle.need_readable(&mut noop_ctx).is_pending());
424 // Starting state: the channel is not closed (because we haven't closed it yet)
425 assert!(!rx_rw_handle.is_closed());
426 // we will never set readable, so this should be Pending until we close
427 assert_eq!(rx_rw_handle.poll_readable(&mut noop_ctx), Poll::Pending);
428
429 drop(tx);
430
431 // Implementation note: the cached state will not be updated yet
432 assert_eq!(rx_rw_handle.poll_readable(&mut noop_ctx), Poll::Pending);
433 // But is_closed should return true immediately
434 assert!(rx_rw_handle.is_closed());
435 // Still not updated, and won't be until we let the executor process port packets
436 assert_eq!(rx_rw_handle.poll_readable(&mut noop_ctx), Poll::Pending);
437 // So we do
438 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
439 // And now it is updated, so we observe Closed
440 assert_eq!(
441 rx_rw_handle.poll_readable(&mut noop_ctx),
442 Poll::Ready(Ok(ReadableState::MaybeReadableAndClosed))
443 );
444 // And is_closed should still be true, of course.
445 assert!(rx_rw_handle.is_closed());
446 }
447
448 // Regression test for https://fxbug.dev/417333384.
449 #[test]
450 fn simultaneous_read_and_write() {
451 let mut exec = TestExecutor::new();
452 let (peer, local) = zx::Socket::create_stream();
453 let mut buff = [0u8; 1024];
454 while local.write(&buff[..]).is_ok() {}
455
456 let rw_handle = RWHandle::new(local);
457 let read_fut = futures::future::poll_fn(|cx| {
458 let readable = ready!(rw_handle.poll_readable(cx));
459 assert_eq!(readable, Ok(ReadableState::Readable));
460 let mut buf = [0u8; 2];
461 loop {
462 match rw_handle.get_ref().read(&mut buf[..]) {
463 Ok(r) => assert_eq!(r, 1),
464 Err(e) => {
465 assert_eq!(e, zx::Status::SHOULD_WAIT);
466 break;
467 }
468 }
469 }
470 assert_eq!(rw_handle.need_readable(cx), Poll::Pending);
471 Poll::<()>::Pending
472 });
473
474 let write_fut = futures::future::poll_fn(|cx| {
475 let writable = ready!(rw_handle.poll_writable(cx));
476 assert_eq!(writable, Ok(WritableState::Writable));
477 let buf = [0u8; 1];
478 loop {
479 match rw_handle.get_ref().write(&buf[..]) {
480 Ok(r) => assert_eq!(r, 1),
481 Err(e) => {
482 assert_eq!(e, zx::Status::SHOULD_WAIT);
483 break;
484 }
485 }
486 }
487 assert_eq!(rw_handle.need_writable(cx), Poll::Pending);
488 Poll::<()>::Pending
489 });
490
491 let mut fut = std::pin::pin!(futures::future::join(write_fut, read_fut));
492 for _ in 0..5 {
493 assert_eq!(exec.run_until_stalled(&mut fut), Poll::Pending);
494 assert_eq!(peer.read(&mut buff[0..1]), Ok(1));
495 assert_eq!(peer.write(&buff[0..1]), Ok(1));
496 }
497
498 let mut read_waits = 0;
499 let mut write_waits = 0;
500 while let Ok(p) = exec.port().wait(zx::MonotonicInstant::INFINITE_PAST) {
501 if p.key() != rw_handle.receiver.key() {
502 continue;
503 }
504 let p = match p.contents() {
505 zx::PacketContents::SignalOne(p) => p,
506 e => panic!("unexpected packet {e:?}"),
507 };
508 if p.trigger().contains(zx::Signals::OBJECT_READABLE) {
509 read_waits += 1;
510 }
511 if p.trigger().contains(zx::Signals::OBJECT_WRITABLE) {
512 write_waits += 1;
513 }
514 }
515 // We should not have installed more than 1 waiter for each side of the
516 // operation.
517 assert_eq!((read_waits, write_waits), (1, 1));
518 }
519}