fuchsia_async/handle/zircon/
on_signals.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::future::Future;
6use std::marker::PhantomData;
7use std::pin::Pin;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::task::Poll;
10use std::{fmt, mem};
11
12use crate::runtime::{EHandle, PacketReceiver, RawReceiverRegistration};
13use futures::task::{AtomicWaker, Context};
14use zx::AsHandleRef;
15
16struct OnSignalsReceiver {
17    maybe_signals: AtomicUsize,
18    task: AtomicWaker,
19}
20
21impl OnSignalsReceiver {
22    fn get_signals(&self, cx: &mut Context<'_>) -> Poll<zx::Signals> {
23        let mut signals = self.maybe_signals.load(Ordering::Relaxed);
24        if signals == 0 {
25            // No signals were received-- register to receive a wakeup when they arrive.
26            self.task.register(cx.waker());
27            // Check again for signals after registering for a wakeup in case signals
28            // arrived between registering and the initial load of signals
29            signals = self.maybe_signals.load(Ordering::SeqCst);
30        }
31        if signals == 0 {
32            Poll::Pending
33        } else {
34            Poll::Ready(zx::Signals::from_bits_truncate(signals as u32))
35        }
36    }
37
38    fn set_signals(&self, signals: zx::Signals) {
39        self.maybe_signals.store(signals.bits() as usize, Ordering::SeqCst);
40        self.task.wake();
41    }
42}
43
44impl PacketReceiver for OnSignalsReceiver {
45    fn receive_packet(&self, packet: zx::Packet) {
46        let observed = if let zx::PacketContents::SignalOne(p) = packet.contents() {
47            p.observed()
48        } else {
49            return;
50        };
51
52        self.set_signals(observed);
53    }
54}
55
56pin_project_lite::pin_project! {
57    /// A future that completes when some set of signals become available on a Handle.
58    #[must_use = "futures do nothing unless polled"]
59    pub struct OnSignalsFuture<'a, H: AsHandleRef> {
60        handle: H,
61        signals: zx::Signals,
62        #[pin]
63        registration: RawReceiverRegistration<OnSignalsReceiver>,
64        phantom: PhantomData<&'a H>,
65    }
66
67    impl<'a, H: AsHandleRef> PinnedDrop for OnSignalsFuture<'a, H> {
68        fn drop(mut this: Pin<&mut Self>) {
69            this.unregister();
70        }
71    }
72}
73
74impl<'a, H: AsHandleRef + 'a> OnSignalsFuture<'a, H> {
75    /// Creates a new `OnSignals` object which will receive notifications when
76    /// any signals in `signals` occur on `handle`.
77    pub fn new(handle: H, signals: zx::Signals) -> Self {
78        // We don't register for the signals until first polled.  When we are first polled, we'll
79        // check to see if the signals are set and if they are, we're done.  If they aren't, we then
80        // register for an asynchronous notification via the port.
81        //
82        // We could change the code to register for the asynchronous notification here, but then
83        // when first polled, if the notification hasn't arrived, we'll still check to see if the
84        // signals are set (see below for the reason why).  Given that the time between construction
85        // and when we first poll is typically small, registering here probably won't make much
86        // difference (and on a single-threaded executor, a notification is unlikely to be processed
87        // before the first poll anyway).  The way we have it now means we don't have to register at
88        // all if the signals are already set, which will be a win some of the time.
89        OnSignalsFuture {
90            handle,
91            signals,
92            registration: RawReceiverRegistration::new(OnSignalsReceiver {
93                maybe_signals: AtomicUsize::new(0),
94                task: AtomicWaker::new(),
95            }),
96            phantom: PhantomData,
97        }
98    }
99
100    /// Takes the handle.
101    pub fn take_handle(mut self: Pin<&mut Self>) -> H
102    where
103        H: zx::HandleBased,
104    {
105        if self.registration.is_registered() {
106            self.as_mut().unregister();
107        }
108        mem::replace(self.project().handle, zx::NullableHandle::invalid().into())
109    }
110
111    fn register(
112        mut registration: Pin<&mut RawReceiverRegistration<OnSignalsReceiver>>,
113        handle: &H,
114        signals: zx::Signals,
115        cx: Option<&mut Context<'_>>,
116    ) -> Result<(), zx::Status> {
117        registration.as_mut().register(EHandle::local());
118
119        // If a context has been supplied, we must register it now before calling
120        // `wait_async_handle` below to avoid races.
121        if let Some(cx) = cx {
122            registration.receiver().task.register(cx.waker());
123        }
124
125        handle.wait_async_handle(
126            registration.port().unwrap(),
127            registration.key().unwrap(),
128            signals,
129            zx::WaitAsyncOpts::empty(),
130        )?;
131
132        Ok(())
133    }
134
135    fn unregister(self: Pin<&mut Self>) {
136        let mut this = self.project();
137        if let Some((ehandle, key)) = this.registration.as_mut().unregister()
138            && this.registration.receiver().maybe_signals.load(Ordering::SeqCst) == 0
139        {
140            // Ignore the error from zx_port_cancel, because it might just be a race condition.
141            // If the packet is handled between the above maybe_signals check and the port
142            // cancel, it will fail with ZX_ERR_NOT_FOUND, and we can't do anything about it.
143            let _ = ehandle.port().cancel(key);
144        }
145    }
146}
147
148impl<H: AsHandleRef> Future for OnSignalsFuture<'_, H> {
149    type Output = Result<zx::Signals, zx::Status>;
150    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
151        if !self.registration.is_registered() {
152            match self
153                .handle
154                .as_handle_ref()
155                .wait_one(self.signals, zx::MonotonicInstant::INFINITE_PAST)
156                .to_result()
157            {
158                Ok(signals) => Poll::Ready(Ok(signals)),
159                Err(zx::Status::TIMED_OUT) => {
160                    let mut this = self.project();
161                    Self::register(
162                        this.registration.as_mut(),
163                        this.handle,
164                        *this.signals,
165                        Some(cx),
166                    )?;
167                    Poll::Pending
168                }
169                Err(e) => Poll::Ready(Err(e)),
170            }
171        } else {
172            match self.registration.receiver().get_signals(cx) {
173                Poll::Ready(signals) => Poll::Ready(Ok(signals)),
174                Poll::Pending => {
175                    // We haven't received a notification for the signals, but we still want to poll
176                    // the kernel in case the notification hasn't been processed yet by the
177                    // executor.  This behaviour is relied upon in some cases: in Component Manager,
178                    // in some shutdown paths, it wants to drain and process all messages in
179                    // channels before it closes them.  There is no other reliable way to flush a
180                    // pending notification (particularly on a multi-threaded executor).  This will
181                    // incur a small performance penalty in the case that this future has been
182                    // polled when no notification was actually received (such as can be the case
183                    // with some futures combinators).
184                    match self
185                        .handle
186                        .as_handle_ref()
187                        .wait_one(self.signals, zx::MonotonicInstant::INFINITE_PAST)
188                        .to_result()
189                    {
190                        Ok(signals) => Poll::Ready(Ok(signals)),
191                        Err(_) => Poll::Pending,
192                    }
193                }
194            }
195        }
196    }
197}
198
199impl<H: AsHandleRef> fmt::Debug for OnSignalsFuture<'_, H> {
200    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
201        write!(f, "OnSignals")
202    }
203}
204
205impl<H: AsHandleRef> AsHandleRef for OnSignalsFuture<'_, H> {
206    fn as_handle_ref(&self) -> zx::HandleRef<'_> {
207        self.handle.as_handle_ref()
208    }
209}
210
211impl<H: AsHandleRef> AsRef<H> for OnSignalsFuture<'_, H> {
212    fn as_ref(&self) -> &H {
213        &self.handle
214    }
215}
216
217/// A future that completes when some set of signals become available on a Handle.
218#[must_use = "futures do nothing unless polled"]
219pub struct OnSignals<'a, H: AsHandleRef> {
220    future: Pin<Box<OnSignalsFuture<'a, H>>>,
221}
222
223impl<'a, H: AsHandleRef + 'a> OnSignals<'a, H> {
224    /// Creates a new `OnSignals` object which will receive notifications when
225    /// any signals in `signals` occur on `handle`.
226    pub fn new(handle: H, signals: zx::Signals) -> Self {
227        Self { future: Box::pin(OnSignalsFuture::new(handle, signals)) }
228    }
229
230    /// Takes the handle.
231    pub fn take_handle(mut self) -> H
232    where
233        H: zx::HandleBased,
234    {
235        self.future.as_mut().take_handle()
236    }
237}
238
239impl<H: AsHandleRef> Future for OnSignals<'_, H> {
240    type Output = Result<zx::Signals, zx::Status>;
241    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
242        Pin::into_inner(self).future.as_mut().poll(cx)
243    }
244}
245
246impl<H: AsHandleRef> fmt::Debug for OnSignals<'_, H> {
247    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
248        write!(f, "OnSignals")
249    }
250}
251
252impl<H: AsHandleRef> AsHandleRef for OnSignals<'_, H> {
253    fn as_handle_ref(&self) -> zx::HandleRef<'_> {
254        self.future.as_handle_ref()
255    }
256}
257
258impl<H: AsHandleRef> AsRef<H> for OnSignals<'_, H> {
259    fn as_ref(&self) -> &H {
260        &self.future.handle
261    }
262}
263
264/// Alias for the common case where OnSignals is used with zx::HandleRef.
265pub type OnSignalsRef<'a> = OnSignals<'a, zx::HandleRef<'a>>;
266
267#[cfg(test)]
268mod test {
269    use super::*;
270    use crate::TestExecutor;
271    use assert_matches::assert_matches;
272    use futures::future::{FutureExt, pending};
273    use futures::task::{ArcWake, waker};
274    use std::pin::pin;
275    use std::sync::Arc;
276
277    #[test]
278    fn wait_for_event() -> Result<(), zx::Status> {
279        let mut exec = crate::TestExecutor::new();
280        let mut deliver_events =
281            || assert!(exec.run_until_stalled(&mut pending::<()>()).is_pending());
282
283        let event = zx::Event::create();
284        let mut signals = OnSignals::new(&event, zx::Signals::EVENT_SIGNALED);
285        let (waker, waker_count) = futures_test::task::new_count_waker();
286        let cx = &mut std::task::Context::from_waker(&waker);
287
288        // Check that `signals` is still pending before the event has been signaled
289        assert_eq!(signals.poll_unpin(cx), Poll::Pending);
290        deliver_events();
291        assert_eq!(waker_count, 0);
292        assert_eq!(signals.poll_unpin(cx), Poll::Pending);
293
294        // signal the event and check that `signals` has been woken up and is
295        // no longer pending
296        event.signal(zx::Signals::NONE, zx::Signals::EVENT_SIGNALED)?;
297        deliver_events();
298        assert_eq!(waker_count, 1);
299        assert_eq!(signals.poll_unpin(cx), Poll::Ready(Ok(zx::Signals::EVENT_SIGNALED)));
300
301        Ok(())
302    }
303
304    #[test]
305    fn drop_before_event() {
306        let mut fut = std::pin::pin!(async {
307            let ehandle = EHandle::local();
308
309            let event = zx::Event::create();
310            let mut signals = OnSignals::new(&event, zx::Signals::EVENT_SIGNALED);
311            assert_eq!(futures::poll!(&mut signals), Poll::Pending);
312            let key = signals.future.registration.key().unwrap();
313
314            std::mem::drop(signals);
315            assert!(ehandle.port().cancel(key) == Err(zx::Status::NOT_FOUND));
316        });
317
318        assert!(TestExecutor::new().run_until_stalled(&mut fut).is_ready());
319    }
320
321    #[test]
322    fn test_always_polls() {
323        let mut exec = TestExecutor::new();
324
325        let (rx, tx) = zx::Channel::create();
326
327        let mut fut = pin!(OnSignals::new(&rx, zx::Signals::CHANNEL_READABLE));
328
329        assert_eq!(exec.run_until_stalled(&mut fut), Poll::Pending);
330
331        tx.write(b"hello", &mut []).expect("write failed");
332
333        struct Waker;
334        impl ArcWake for Waker {
335            fn wake_by_ref(_arc_self: &Arc<Self>) {}
336        }
337
338        // Poll the future directly which guarantees the port notification for the write hasn't
339        // arrived.
340        assert_matches!(
341            fut.poll(&mut Context::from_waker(&waker(Arc::new(Waker)))),
342            Poll::Ready(Ok(signals)) if signals.contains(zx::Signals::CHANNEL_READABLE)
343        );
344    }
345
346    #[test]
347    fn test_take_handle() {
348        let mut exec = TestExecutor::new();
349
350        let (rx, tx) = zx::Channel::create();
351
352        let mut fut = OnSignals::new(rx, zx::Signals::CHANNEL_READABLE);
353
354        assert_eq!(exec.run_until_stalled(&mut fut), Poll::Pending);
355
356        tx.write(b"hello", &mut []).expect("write failed");
357
358        assert_matches!(exec.run_until_stalled(&mut fut), Poll::Ready(Ok(_)));
359
360        let mut message = zx::MessageBuf::new();
361        fut.take_handle().read(&mut message).unwrap();
362
363        assert_eq!(message.bytes(), b"hello");
364    }
365}