fuchsia_async/handle/zircon/
on_signals.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::future::Future;
6use std::marker::PhantomData;
7use std::pin::Pin;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::sync::Arc;
10use std::task::Poll;
11use std::{fmt, mem};
12
13use crate::runtime::{EHandle, PacketReceiver, ReceiverRegistration};
14use futures::task::{AtomicWaker, Context};
15use zx::{self as zx, AsHandleRef};
16
17struct OnSignalsReceiver {
18    maybe_signals: AtomicUsize,
19    task: AtomicWaker,
20}
21
22impl OnSignalsReceiver {
23    fn get_signals(&self, cx: &mut Context<'_>) -> Poll<zx::Signals> {
24        let mut signals = self.maybe_signals.load(Ordering::Relaxed);
25        if signals == 0 {
26            // No signals were received-- register to receive a wakeup when they arrive.
27            self.task.register(cx.waker());
28            // Check again for signals after registering for a wakeup in case signals
29            // arrived between registering and the initial load of signals
30            signals = self.maybe_signals.load(Ordering::SeqCst);
31        }
32        if signals == 0 {
33            Poll::Pending
34        } else {
35            Poll::Ready(zx::Signals::from_bits_truncate(signals as u32))
36        }
37    }
38
39    fn set_signals(&self, signals: zx::Signals) {
40        self.maybe_signals.store(signals.bits() as usize, Ordering::SeqCst);
41        self.task.wake();
42    }
43}
44
45impl PacketReceiver for OnSignalsReceiver {
46    fn receive_packet(&self, packet: zx::Packet) {
47        let observed = if let zx::PacketContents::SignalOne(p) = packet.contents() {
48            p.observed()
49        } else {
50            return;
51        };
52
53        self.set_signals(observed);
54    }
55}
56
57/// A future that completes when some set of signals become available on a Handle.
58#[must_use = "futures do nothing unless polled"]
59pub struct OnSignals<'a, H: AsHandleRef> {
60    handle: H,
61    signals: zx::Signals,
62    registration: Option<ReceiverRegistration<OnSignalsReceiver>>,
63    phantom: PhantomData<&'a H>,
64}
65
66/// Alias for the common case where OnSignals is used with zx::HandleRef.
67pub type OnSignalsRef<'a> = OnSignals<'a, zx::HandleRef<'a>>;
68
69impl<'a, H: AsHandleRef + 'a> OnSignals<'a, H> {
70    /// Creates a new `OnSignals` object which will receive notifications when
71    /// any signals in `signals` occur on `handle`.
72    pub fn new(handle: H, signals: zx::Signals) -> Self {
73        // We don't register for the signals until first polled.  When we are first polled, we'll
74        // check to see if the signals are set and if they are, we're done.  If they aren't, we then
75        // register for an asynchronous notification via the port.
76        //
77        // We could change the code to register for the asynchronous notification here, but then
78        // when first polled, if the notification hasn't arrived, we'll still check to see if the
79        // signals are set (see below for the reason why).  Given that the time between construction
80        // and when we first poll is typically small, registering here probably won't make much
81        // difference (and on a single-threaded executor, a notification is unlikely to be processed
82        // before the first poll anyway).  The way we have it now means we don't have to register at
83        // all if the signals are already set, which will be a win some of the time.
84        OnSignals { handle, signals, registration: None, phantom: PhantomData }
85    }
86
87    /// Takes the handle.
88    pub fn take_handle(mut self) -> H
89    where
90        H: zx::HandleBased,
91    {
92        self.unregister();
93        std::mem::replace(&mut self.handle, zx::Handle::invalid().into())
94    }
95
96    /// This function allows the `OnSignals` object to live for the `'static` lifetime, at the cost
97    /// of disabling automatic cleanup of the port wait.
98    ///
99    /// WARNING: Do not use unless you can guarantee that either:
100    /// - The future is not dropped before it completes, or
101    /// - The handle is dropped without creating additional OnSignals futures for it.
102    ///
103    /// Creating an OnSignals calls zx_object_wait_async, which consumes a small amount of kernel
104    /// resources. Dropping the OnSignals calls zx_port_cancel to clean up. But calling
105    /// extend_lifetime disables this cleanup, since the zx_port_wait call requires a reference to
106    /// the handle. The port registration can also be cleaned up by closing the handle or by
107    /// waiting for the signal to be triggered. But if neither of these happens, the registration
108    /// is leaked. This wastes kernel memory and the kernel will eventually kill your process to
109    /// force a cleanup.
110    ///
111    /// Note that `OnSignals` will not fire if the handle that was used to create it is dropped or
112    /// transferred to another process.
113    // TODO(https://fxbug.dev/42182035): Try to remove this footgun.
114    pub fn extend_lifetime(mut self) -> LeakedOnSignals {
115        match self.registration.take() {
116            Some(r) => LeakedOnSignals { registration: Ok(r) },
117            None => LeakedOnSignals { registration: self.register(None) },
118        }
119    }
120
121    fn register(
122        &self,
123        cx: Option<&mut Context<'_>>,
124    ) -> Result<ReceiverRegistration<OnSignalsReceiver>, zx::Status> {
125        let registration = EHandle::local().register_receiver(Arc::new(OnSignalsReceiver {
126            maybe_signals: AtomicUsize::new(0),
127            task: AtomicWaker::new(),
128        }));
129
130        // If a context has been supplied, we must register it now before calling
131        // `wait_async_handle` below to avoid races.
132        if let Some(cx) = cx {
133            registration.task.register(cx.waker());
134        }
135
136        self.handle.wait_async_handle(
137            registration.port(),
138            registration.key(),
139            self.signals,
140            zx::WaitAsyncOpts::empty(),
141        )?;
142
143        Ok(registration)
144    }
145
146    fn unregister(&mut self) {
147        if let Some(registration) = self.registration.take() {
148            if registration.receiver().maybe_signals.load(Ordering::SeqCst) == 0 {
149                // Ignore the error from zx_port_cancel, because it might just be a race condition.
150                // If the packet is handled between the above maybe_signals check and the port
151                // cancel, it will fail with ZX_ERR_NOT_FOUND, and we can't do anything about it.
152                let _ = registration.port().cancel(&self.handle, registration.key());
153            }
154        }
155    }
156}
157
158impl<H: AsHandleRef + Unpin> Future for OnSignals<'_, H> {
159    type Output = Result<zx::Signals, zx::Status>;
160    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
161        match &self.registration {
162            None => {
163                match self.handle.wait_handle(self.signals, zx::MonotonicInstant::INFINITE_PAST) {
164                    Ok(signals) => Poll::Ready(Ok(signals)),
165                    Err(zx::Status::TIMED_OUT) => {
166                        let registration = self.register(Some(cx))?;
167                        self.get_mut().registration = Some(registration);
168                        Poll::Pending
169                    }
170                    Err(e) => Poll::Ready(Err(e)),
171                }
172            }
173            Some(r) => match r.receiver().get_signals(cx) {
174                Poll::Ready(signals) => Poll::Ready(Ok(signals)),
175                Poll::Pending => {
176                    // We haven't received a notification for the signals, but we still want to poll
177                    // the kernel in case the notification hasn't been processed yet by the
178                    // executor.  This behaviour is relied upon in some cases: in Component Manager,
179                    // in some shutdown paths, it wants to drain and process all messages in
180                    // channels before it closes them.  There is no other reliable way to flush a
181                    // pending notification (particularly on a multi-threaded executor).  This will
182                    // incur a small performance penalty in the case that this future has been
183                    // polled when no notification was actually received (such as can be the case
184                    // with some futures combinators).
185                    match self.handle.wait_handle(self.signals, zx::MonotonicInstant::INFINITE_PAST)
186                    {
187                        Ok(signals) => Poll::Ready(Ok(signals)),
188                        Err(_) => Poll::Pending,
189                    }
190                }
191            },
192        }
193    }
194}
195
196impl<H: AsHandleRef> fmt::Debug for OnSignals<'_, H> {
197    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
198        write!(f, "OnSignals")
199    }
200}
201
202impl<H: AsHandleRef> Drop for OnSignals<'_, H> {
203    fn drop(&mut self) {
204        self.unregister();
205    }
206}
207
208impl<H: AsHandleRef> AsHandleRef for OnSignals<'_, H> {
209    fn as_handle_ref(&self) -> zx::HandleRef<'_> {
210        self.handle.as_handle_ref()
211    }
212}
213
214impl<H: AsHandleRef> AsRef<H> for OnSignals<'_, H> {
215    fn as_ref(&self) -> &H {
216        &self.handle
217    }
218}
219
220pub struct LeakedOnSignals {
221    registration: Result<ReceiverRegistration<OnSignalsReceiver>, zx::Status>,
222}
223
224impl Future for LeakedOnSignals {
225    type Output = Result<zx::Signals, zx::Status>;
226    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
227        let reg = self.registration.as_mut().map_err(|e| mem::replace(e, zx::Status::OK))?;
228        reg.receiver().get_signals(cx).map(Ok)
229    }
230}
231
232#[cfg(test)]
233mod test {
234    use super::*;
235    use crate::TestExecutor;
236    use assert_matches::assert_matches;
237    use futures::future::{pending, FutureExt};
238    use futures::task::{waker, ArcWake};
239    use std::pin::pin;
240
241    #[test]
242    fn wait_for_event() -> Result<(), zx::Status> {
243        let mut exec = crate::TestExecutor::new();
244        let mut deliver_events =
245            || assert!(exec.run_until_stalled(&mut pending::<()>()).is_pending());
246
247        let event = zx::Event::create();
248        let mut signals = OnSignals::new(&event, zx::Signals::EVENT_SIGNALED);
249        let (waker, waker_count) = futures_test::task::new_count_waker();
250        let cx = &mut std::task::Context::from_waker(&waker);
251
252        // Check that `signals` is still pending before the event has been signaled
253        assert_eq!(signals.poll_unpin(cx), Poll::Pending);
254        deliver_events();
255        assert_eq!(waker_count, 0);
256        assert_eq!(signals.poll_unpin(cx), Poll::Pending);
257
258        // signal the event and check that `signals` has been woken up and is
259        // no longer pending
260        event.signal_handle(zx::Signals::NONE, zx::Signals::EVENT_SIGNALED)?;
261        deliver_events();
262        assert_eq!(waker_count, 1);
263        assert_eq!(signals.poll_unpin(cx), Poll::Ready(Ok(zx::Signals::EVENT_SIGNALED)));
264
265        Ok(())
266    }
267
268    #[test]
269    fn drop_before_event() {
270        let mut fut = std::pin::pin!(async {
271            let ehandle = EHandle::local();
272
273            let event = zx::Event::create();
274            let mut signals = OnSignals::new(&event, zx::Signals::EVENT_SIGNALED);
275            assert_eq!(futures::poll!(&mut signals), Poll::Pending);
276            let key = signals.registration.as_ref().unwrap().key();
277
278            std::mem::drop(signals);
279            assert!(ehandle.port().cancel(&event, key) == Err(zx::Status::NOT_FOUND));
280
281            // try again but with extend_lifetime
282            let signals = OnSignals::new(&event, zx::Signals::EVENT_SIGNALED).extend_lifetime();
283            let key = signals.registration.as_ref().unwrap().key();
284            std::mem::drop(signals);
285            assert!(ehandle.port().cancel(&event, key) == Ok(()));
286        });
287
288        assert!(TestExecutor::new().run_until_stalled(&mut fut).is_ready());
289    }
290
291    #[test]
292    fn test_always_polls() {
293        let mut exec = TestExecutor::new();
294
295        let (rx, tx) = zx::Channel::create();
296
297        let mut fut = pin!(OnSignals::new(&rx, zx::Signals::CHANNEL_READABLE));
298
299        assert_eq!(exec.run_until_stalled(&mut fut), Poll::Pending);
300
301        tx.write(b"hello", &mut []).expect("write failed");
302
303        struct Waker;
304        impl ArcWake for Waker {
305            fn wake_by_ref(_arc_self: &Arc<Self>) {}
306        }
307
308        // Poll the future directly which guarantees the port notification for the write hasn't
309        // arrived.
310        assert_matches!(
311            fut.poll(&mut Context::from_waker(&waker(Arc::new(Waker)))),
312            Poll::Ready(Ok(signals)) if signals.contains(zx::Signals::CHANNEL_READABLE)
313        );
314    }
315
316    #[test]
317    fn test_take_handle() {
318        let mut exec = TestExecutor::new();
319
320        let (rx, tx) = zx::Channel::create();
321
322        let mut fut = OnSignals::new(rx, zx::Signals::CHANNEL_READABLE);
323
324        assert_eq!(exec.run_until_stalled(&mut fut), Poll::Pending);
325
326        tx.write(b"hello", &mut []).expect("write failed");
327
328        assert_matches!(exec.run_until_stalled(&mut fut), Poll::Ready(Ok(_)));
329
330        let mut message = zx::MessageBuf::new();
331        fut.take_handle().read(&mut message).unwrap();
332
333        assert_eq!(message.bytes(), b"hello");
334    }
335}