fuchsia_async/handle/zircon/
rwhandle.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::OnSignalsRef;
6use crate::runtime::{EHandle, PacketReceiver, ReceiverRegistration};
7use fuchsia_sync::Mutex;
8use std::marker::PhantomData;
9use std::task::{Context, Poll, Waker, ready};
10use zx::AsHandleRef;
11
12const OBJECT_PEER_CLOSED: zx::Signals = zx::Signals::OBJECT_PEER_CLOSED;
13const OBJECT_READABLE: zx::Signals = zx::Signals::OBJECT_READABLE;
14const OBJECT_WRITABLE: zx::Signals = zx::Signals::OBJECT_WRITABLE;
15
16/// State of an object when it is ready for reading.
17#[derive(Debug, PartialEq, Eq, Copy, Clone)]
18pub enum ReadableState {
19    /// Received `OBJECT_READABLE`, or optimistically assuming the object is readable.
20    Readable,
21    /// Received `OBJECT_PEER_CLOSED`.  The object might also be readable.
22    MaybeReadableAndClosed,
23}
24
25/// State of an object when it is ready for writing.
26#[derive(Debug, PartialEq, Eq, Copy, Clone)]
27pub enum WritableState {
28    /// Received `OBJECT_WRITABLE`, or optimistically assuming the object is writable.
29    Writable,
30    /// Received `OBJECT_PEER_CLOSED`.
31    Closed,
32}
33
34/// A `Handle` that receives notifications when it is readable.
35///
36/// # Examples
37///
38/// ```
39/// loop {
40///     ready!(self.poll_readable(cx))?;
41///     match /* make read syscall */ {
42///         Err(zx::Status::SHOULD_WAIT) => ready!(self.need_readable(cx)?),
43///         status => return Poll::Ready(status),
44///     }
45/// }
46/// ```
47pub trait ReadableHandle {
48    /// If the object is ready for reading, returns `Ready` with the readable
49    /// state. If the implementor returns Pending, it should first ensure that
50    /// `need_readable` is called.
51    ///
52    /// This should be called in a poll function. If the syscall returns
53    /// `SHOULD_WAIT`, you must call `need_readable` to schedule wakeup when the
54    /// object is readable.
55    ///
56    /// The returned `ReadableState` does not necessarily reflect an observed
57    /// `OBJECT_READABLE` signal. We optimistically assume the object remains
58    /// readable until `need_readable` is called.
59    fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<Result<ReadableState, zx::Status>>;
60
61    /// Arranges for the current task to be woken when the object receives an
62    /// `OBJECT_READABLE` or `OBJECT_PEER_CLOSED` signal.  This can return
63    /// Poll::Ready if the object has already been signaled in which case the
64    /// waker *will* not be woken and it is the caller's responsibility to not
65    /// lose the signal.
66    fn need_readable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>>;
67}
68
69/// A `Handle` that receives notifications when it is writable.
70///
71/// # Examples
72///
73/// ```
74/// loop {
75///     ready!(self.poll_writable(cx))?;
76///     match /* make write syscall */ {
77///         Err(zx::Status::SHOULD_WAIT) => ready!(self.need_writable(cx)?),
78///         status => Poll::Ready(status),
79///     }
80/// }
81/// ```
82pub trait WritableHandle {
83    /// If the object is ready for writing, returns `Ready` with the writable
84    /// state. If the implementor returns Pending, it should first ensure that
85    /// `need_writable` is called.
86    ///
87    /// This should be called in a poll function. If the syscall returns
88    /// `SHOULD_WAIT`, you must call `need_writable` to schedule wakeup when the
89    /// object is writable.
90    ///
91    /// The returned `WritableState` does not necessarily reflect an observed
92    /// `OBJECT_WRITABLE` signal. We optimistically assume the object remains
93    /// writable until `need_writable` is called.
94    fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<Result<WritableState, zx::Status>>;
95
96    /// Arranges for the current task to be woken when the object receives an
97    /// `OBJECT_WRITABLE` or `OBJECT_PEER_CLOSED` signal. This can return
98    /// Poll::Ready if the object has already been signaled in which case the
99    /// waker *will* not be woken and it is the caller's responsibility to not
100    /// lose the signal.
101    fn need_writable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>>;
102}
103
104struct RWPacketReceiver<S: RWHandleSpec> {
105    inner: Mutex<Inner>,
106    _marker: PhantomData<S>,
107}
108
109struct Inner {
110    signals: zx::Signals,
111    read_task: Option<Waker>,
112    write_task: Option<Waker>,
113}
114
115impl<S: RWHandleSpec> PacketReceiver for RWPacketReceiver<S> {
116    fn receive_packet(&self, packet: zx::Packet) {
117        let new = if let zx::PacketContents::SignalOne(p) = packet.contents() {
118            // Only consider the signals that were part of the trigger. This
119            // ensures that only the packets generated by the correct signal
120            // observer (Read or Write) can raise the corresponding inner signal
121            // bit.
122            //
123            // Without this, we can lose track of how many port observers are
124            // installed.
125            p.observed() & p.trigger()
126        } else {
127            return;
128        };
129
130        // We wake the tasks when the lock isn't held in case the wakers need the same lock.
131        let mut read_task = None;
132        let mut write_task = None;
133        {
134            let mut inner = self.inner.lock();
135            let old = inner.signals;
136            inner.signals |= new;
137
138            let became_readable =
139                new.intersection(S::READABLE_SIGNALS) != old.intersection(S::READABLE_SIGNALS);
140            let became_writable =
141                new.intersection(S::WRITABLE_SIGNALS) != old.intersection(S::WRITABLE_SIGNALS);
142            let became_closed =
143                new.contains(OBJECT_PEER_CLOSED) && !old.contains(OBJECT_PEER_CLOSED);
144            if became_readable || became_closed {
145                read_task = inner.read_task.take();
146            }
147            if became_writable || became_closed {
148                write_task = inner.write_task.take();
149            }
150        }
151        // *NOTE*: This is the only safe place to wake wakers.  In any other location, there is a
152        // risk that locks are held which might be required when the waker is woken.  It is safe to
153        // wake here because this is called from the executor when no locks are held.
154        if let Some(read_task) = read_task {
155            read_task.wake();
156        }
157        if let Some(write_task) = write_task {
158            write_task.wake();
159        }
160    }
161}
162
163/// A `Handle` that receives notifications when it is readable/writable.
164pub struct RWHandle<T, S: RWHandleSpec = DefaultRWHandleSpec> {
165    handle: T,
166    receiver: ReceiverRegistration<RWPacketReceiver<S>>,
167}
168
169impl<T> RWHandle<T, DefaultRWHandleSpec>
170where
171    T: AsHandleRef,
172{
173    /// Creates a new `RWHandle` object which will receive notifications when
174    /// the underlying handle becomes readable, writable, or closes.
175    ///
176    /// # Panics
177    ///
178    /// If called outside the context of an active async executor.
179    pub fn new(handle: T) -> Self {
180        Self::new_with_spec(handle)
181    }
182}
183
184impl<T, S> RWHandle<T, S>
185where
186    T: AsHandleRef,
187    S: RWHandleSpec,
188{
189    /// Creates a new `RWHandle` with a non-default spec and an object which
190    /// will receive notifications when the underlying handle becomes readable,
191    /// writable, or closes.
192    ///
193    /// # Panics
194    ///
195    /// If called outside the context of an active async executor.
196    ///
197    /// If `S::READABLE_SIGNALS` or `S::WRITABLE_SIGNALS` contain
198    /// [`zx::Signals::OBJECT_PEER_CLOSED`] or have a non-empty intersection.
199    pub fn new_with_spec(handle: T) -> Self {
200        let ehandle = EHandle::local();
201        let internal_signals = OBJECT_PEER_CLOSED;
202        assert!(
203            !S::READABLE_SIGNALS.contains(internal_signals),
204            "readable signals may not contain ({internal_signals:?})"
205        );
206        assert!(
207            !S::WRITABLE_SIGNALS.contains(internal_signals),
208            "writable signals may not contain ({internal_signals:?})"
209        );
210        assert!(!S::WRITABLE_SIGNALS.intersects(S::READABLE_SIGNALS), "signals may not intersect");
211        let initial_signals = S::WRITABLE_SIGNALS | S::READABLE_SIGNALS;
212        let receiver = ehandle.register_receiver(RWPacketReceiver {
213            inner: Mutex::new(Inner {
214                // Optimistically assume that the handle is readable and writable.
215                // Reads and writes will be attempted before queueing a packet.
216                // This makes handles slightly faster to read/write the first time
217                // they're accessed after being created, provided they start off as
218                // readable or writable. In return, there will be an extra wasted
219                // syscall per read/write if the handle is not readable or writable.
220                signals: initial_signals,
221                read_task: None,
222                write_task: None,
223            }),
224            _marker: PhantomData,
225        });
226
227        RWHandle { handle, receiver }
228    }
229
230    /// Returns a reference to the underlying handle.
231    pub fn get_ref(&self) -> &T {
232        &self.handle
233    }
234
235    /// Returns a mutable reference to the underlying handle.
236    pub fn get_mut(&mut self) -> &mut T {
237        &mut self.handle
238    }
239
240    /// Consumes `self` and returns the underlying handle.
241    pub fn into_inner(self) -> T {
242        self.handle
243    }
244
245    /// Returns true if the object received the `OBJECT_PEER_CLOSED` signal.
246    pub fn is_closed(&self) -> bool {
247        let signals = self.receiver().inner.lock().signals;
248        if signals.contains(OBJECT_PEER_CLOSED) {
249            return true;
250        }
251
252        // The signals bitset might not be updated if we haven't gotten around to processing the
253        // packet telling us that yet. To provide an up-to-date response, we query the current
254        // state of the signal.
255        //
256        // Note: we _could_ update the bitset with what we find here, if we're careful to also
257        // update READABLE + WRITEABLE at the same time, and also wakeup the tasks as necessary.
258        // But having `is_closed` wakeup tasks if it discovered a signal change seems too weird, so
259        // we just leave the bitset as-is and let the regular notification mechanism get around to
260        // it when it gets around to it.
261        match self
262            .handle
263            .as_handle_ref()
264            .wait_one(OBJECT_PEER_CLOSED, zx::MonotonicInstant::INFINITE_PAST)
265            .to_result()
266        {
267            Ok(_) => true,
268            Err(zx::Status::TIMED_OUT) => false,
269            Err(status) => {
270                // None of the other documented error statuses should be possible, either the type
271                // system doesn't allow it or the wait from `RWHandle::new()` would have already
272                // failed.
273                unreachable!("status: {status}")
274            }
275        }
276    }
277
278    /// Returns a future that completes when `is_closed()` is true.
279    pub fn on_closed(&self) -> OnSignalsRef<'_> {
280        OnSignalsRef::new(self.handle.as_handle_ref(), OBJECT_PEER_CLOSED)
281    }
282
283    fn receiver(&self) -> &RWPacketReceiver<S> {
284        self.receiver.receiver()
285    }
286
287    fn need_signal(&self, cx: &mut Context<'_>, signal: Signal) -> Poll<Result<(), zx::Status>> {
288        let mut inner = self.receiver.inner.lock();
289        let old = inner.signals;
290        if old.contains(OBJECT_PEER_CLOSED) {
291            // We don't want to return an error here because even though the peer has closed, the
292            // object could still have queued messages that can be read.
293            Poll::Ready(Ok(()))
294        } else {
295            let waker = cx.waker().clone();
296            let signal = match signal {
297                Signal::Read => {
298                    inner.read_task = Some(waker);
299                    S::READABLE_SIGNALS
300                }
301                Signal::Write => {
302                    inner.write_task = Some(waker);
303                    S::WRITABLE_SIGNALS
304                }
305            };
306            if old.intersects(signal) {
307                inner.signals &= !signal;
308                std::mem::drop(inner);
309                self.handle.as_handle_ref().wait_async(
310                    self.receiver.port(),
311                    self.receiver.key(),
312                    signal | OBJECT_PEER_CLOSED,
313                    zx::WaitAsyncOpts::empty(),
314                )?;
315            }
316            Poll::Pending
317        }
318    }
319
320    fn poll_signal(
321        &self,
322        cx: &mut Context<'_>,
323        signal: Signal,
324    ) -> Poll<Result<zx::Signals, zx::Status>> {
325        let mask = match signal {
326            Signal::Read => S::READABLE_SIGNALS,
327            Signal::Write => S::WRITABLE_SIGNALS,
328        } | OBJECT_PEER_CLOSED;
329
330        loop {
331            let signals = self.receiver().inner.lock().signals;
332            let asserted = signals.intersection(mask);
333            if !asserted.is_empty() {
334                return Poll::Ready(Ok(asserted));
335            }
336            ready!(self.need_signal(cx, signal)?);
337        }
338    }
339}
340
341#[derive(Copy, Clone)]
342enum Signal {
343    Read,
344    Write,
345}
346
347impl<T, S> ReadableHandle for RWHandle<T, S>
348where
349    T: AsHandleRef,
350    S: RWHandleSpec,
351{
352    fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<Result<ReadableState, zx::Status>> {
353        let signals = ready!(self.poll_signal(cx, Signal::Read)?);
354        if signals.contains(OBJECT_PEER_CLOSED) {
355            return Poll::Ready(Ok(ReadableState::MaybeReadableAndClosed));
356        }
357        Poll::Ready(Ok(ReadableState::Readable))
358    }
359
360    fn need_readable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
361        self.need_signal(cx, Signal::Read)
362    }
363}
364
365impl<T, S> WritableHandle for RWHandle<T, S>
366where
367    T: AsHandleRef,
368    S: RWHandleSpec,
369{
370    fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<Result<WritableState, zx::Status>> {
371        let signals = ready!(self.poll_signal(cx, Signal::Write)?);
372        if signals.contains(OBJECT_PEER_CLOSED) {
373            return Poll::Ready(Ok(WritableState::Closed));
374        }
375        Poll::Ready(Ok(WritableState::Writable))
376    }
377
378    fn need_writable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
379        self.need_signal(cx, Signal::Write)
380    }
381}
382
383/// A trait specifying the behavior of [`RWHandle`].
384///
385/// The default behavior, listening for [`zx::Signals::OBJECT_READABLE`] and
386/// [`zx::Signals::OBJECT_WRITABLE`] is provided by [`DefaultRWHandleSpec`].
387pub trait RWHandleSpec: Send + Sync + 'static {
388    /// Signals asserted when the handle is readable.
389    ///
390    /// [`RWHandle`] installs wait for these signals and if any of them are
391    /// asserted, the handle is considered readable.
392    const READABLE_SIGNALS: zx::Signals;
393    /// Signals asserted when the handle is writable.
394    ///
395    /// [`RWHandle`] installs wait for these signals and if any of them are
396    /// asserted, the handle is considered writable.
397    const WRITABLE_SIGNALS: zx::Signals;
398}
399
400/// The default behavior for [`RWHandle`].
401///
402/// Considers the handle readable when [`zx::Signals::OBJECT_READABLE`] is set,
403/// and writable when [`zx::Signals::OBJECT_WRITABLE`] is set.
404pub struct DefaultRWHandleSpec;
405
406impl RWHandleSpec for DefaultRWHandleSpec {
407    const READABLE_SIGNALS: zx::Signals = OBJECT_READABLE;
408    const WRITABLE_SIGNALS: zx::Signals = OBJECT_WRITABLE;
409}
410
411#[cfg(test)]
412mod tests {
413    use super::*;
414    use crate::TestExecutor;
415
416    #[test]
417    fn is_closed_immediately_after_close() {
418        let mut exec = TestExecutor::new();
419        let (tx, rx) = zx::Channel::create();
420        let rx_rw_handle = RWHandle::new(rx);
421        let mut noop_ctx = Context::from_waker(Waker::noop());
422        // Clear optimistic readable state
423        assert!(rx_rw_handle.need_readable(&mut noop_ctx).is_pending());
424        // Starting state: the channel is not closed (because we haven't closed it yet)
425        assert!(!rx_rw_handle.is_closed());
426        // we will never set readable, so this should be Pending until we close
427        assert_eq!(rx_rw_handle.poll_readable(&mut noop_ctx), Poll::Pending);
428
429        drop(tx);
430
431        // Implementation note: the cached state will not be updated yet
432        assert_eq!(rx_rw_handle.poll_readable(&mut noop_ctx), Poll::Pending);
433        // But is_closed should return true immediately
434        assert!(rx_rw_handle.is_closed());
435        // Still not updated, and won't be until we let the executor process port packets
436        assert_eq!(rx_rw_handle.poll_readable(&mut noop_ctx), Poll::Pending);
437        // So we do
438        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
439        // And now it is updated, so we observe Closed
440        assert_eq!(
441            rx_rw_handle.poll_readable(&mut noop_ctx),
442            Poll::Ready(Ok(ReadableState::MaybeReadableAndClosed))
443        );
444        // And is_closed should still be true, of course.
445        assert!(rx_rw_handle.is_closed());
446    }
447
448    // Regression test for https://fxbug.dev/417333384.
449    #[test]
450    fn simultaneous_read_and_write() {
451        let mut exec = TestExecutor::new();
452        let (peer, local) = zx::Socket::create_stream();
453        let mut buff = [0u8; 1024];
454        while local.write(&buff[..]).is_ok() {}
455
456        let rw_handle = RWHandle::new(local);
457        let read_fut = futures::future::poll_fn(|cx| {
458            let readable = ready!(rw_handle.poll_readable(cx));
459            assert_eq!(readable, Ok(ReadableState::Readable));
460            let mut buf = [0u8; 2];
461            loop {
462                match rw_handle.get_ref().read(&mut buf[..]) {
463                    Ok(r) => assert_eq!(r, 1),
464                    Err(e) => {
465                        assert_eq!(e, zx::Status::SHOULD_WAIT);
466                        break;
467                    }
468                }
469            }
470            assert_eq!(rw_handle.need_readable(cx), Poll::Pending);
471            Poll::<()>::Pending
472        });
473
474        let write_fut = futures::future::poll_fn(|cx| {
475            let writable = ready!(rw_handle.poll_writable(cx));
476            assert_eq!(writable, Ok(WritableState::Writable));
477            let buf = [0u8; 1];
478            loop {
479                match rw_handle.get_ref().write(&buf[..]) {
480                    Ok(r) => assert_eq!(r, 1),
481                    Err(e) => {
482                        assert_eq!(e, zx::Status::SHOULD_WAIT);
483                        break;
484                    }
485                }
486            }
487            assert_eq!(rw_handle.need_writable(cx), Poll::Pending);
488            Poll::<()>::Pending
489        });
490
491        let mut fut = std::pin::pin!(futures::future::join(write_fut, read_fut));
492        for _ in 0..5 {
493            assert_eq!(exec.run_until_stalled(&mut fut), Poll::Pending);
494            assert_eq!(peer.read(&mut buff[0..1]), Ok(1));
495            assert_eq!(peer.write(&buff[0..1]), Ok(1));
496        }
497
498        let mut read_waits = 0;
499        let mut write_waits = 0;
500        while let Ok(p) = exec.port().wait(zx::MonotonicInstant::INFINITE_PAST) {
501            if p.key() != rw_handle.receiver.key() {
502                continue;
503            }
504            let p = match p.contents() {
505                zx::PacketContents::SignalOne(p) => p,
506                e => panic!("unexpected packet {e:?}"),
507            };
508            if p.trigger().contains(zx::Signals::OBJECT_READABLE) {
509                read_waits += 1;
510            }
511            if p.trigger().contains(zx::Signals::OBJECT_WRITABLE) {
512                write_waits += 1;
513            }
514        }
515        // We should not have installed more than 1 waiter for each side of the
516        // operation.
517        assert_eq!((read_waits, write_waits), (1, 1));
518    }
519}