Skip to main content

fuchsia_async/handle/zircon/
on_signals.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::future::Future;
6use std::marker::PhantomData;
7use std::pin::Pin;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::task::Poll;
10use std::{fmt, mem};
11
12use crate::runtime::{EHandle, PacketReceiver, RawReceiverRegistration};
13use futures::task::{AtomicWaker, Context};
14use zx::AsHandleRef;
15
16struct OnSignalsReceiver {
17    maybe_signals: AtomicUsize,
18    task: AtomicWaker,
19}
20
21impl OnSignalsReceiver {
22    fn get_signals(&self, cx: &mut Context<'_>) -> Poll<zx::Signals> {
23        let mut signals = self.maybe_signals.load(Ordering::Relaxed);
24        if signals == 0 {
25            // No signals were received-- register to receive a wakeup when they arrive.
26            self.task.register(cx.waker());
27            // Check again for signals after registering for a wakeup in case signals
28            // arrived between registering and the initial load of signals
29            signals = self.maybe_signals.load(Ordering::SeqCst);
30        }
31        if signals == 0 {
32            Poll::Pending
33        } else {
34            Poll::Ready(zx::Signals::from_bits_truncate(signals as u32))
35        }
36    }
37
38    fn set_signals(&self, signals: zx::Signals) {
39        self.maybe_signals.store(signals.bits() as usize, Ordering::SeqCst);
40        self.task.wake();
41    }
42}
43
44impl PacketReceiver for OnSignalsReceiver {
45    fn receive_packet(&self, packet: zx::Packet) {
46        let observed = if let zx::PacketContents::SignalOne(p) = packet.contents() {
47            p.observed()
48        } else {
49            return;
50        };
51
52        self.set_signals(observed);
53    }
54}
55
56pin_project_lite::pin_project! {
57    /// A future that completes when some set of signals become available on a Handle.
58    #[must_use = "futures do nothing unless polled"]
59    pub struct OnSignals<'a, H: AsHandleRef> {
60        handle: H,
61        signals: zx::Signals,
62        #[pin]
63        registration: RawReceiverRegistration<OnSignalsReceiver>,
64        phantom: PhantomData<&'a H>,
65    }
66
67    impl<'a, H: AsHandleRef> PinnedDrop for OnSignals<'a, H> {
68        fn drop(mut this: Pin<&mut Self>) {
69            this.unregister();
70        }
71    }
72}
73
74impl<'a, H: AsHandleRef + 'a> OnSignals<'a, H> {
75    /// Creates a new `OnSignals` object which will receive notifications when
76    /// any signals in `signals` occur on `handle`.
77    pub fn new(handle: H, signals: zx::Signals) -> Self {
78        // We don't register for the signals until first polled.  When we are first polled, we'll
79        // check to see if the signals are set and if they are, we're done.  If they aren't, we then
80        // register for an asynchronous notification via the port.
81        //
82        // We could change the code to register for the asynchronous notification here, but then
83        // when first polled, if the notification hasn't arrived, we'll still check to see if the
84        // signals are set (see below for the reason why).  Given that the time between construction
85        // and when we first poll is typically small, registering here probably won't make much
86        // difference (and on a single-threaded executor, a notification is unlikely to be processed
87        // before the first poll anyway).  The way we have it now means we don't have to register at
88        // all if the signals are already set, which will be a win some of the time.
89        OnSignals {
90            handle,
91            signals,
92            registration: RawReceiverRegistration::new(OnSignalsReceiver {
93                maybe_signals: AtomicUsize::new(0),
94                task: AtomicWaker::new(),
95            }),
96            phantom: PhantomData,
97        }
98    }
99
100    /// Takes the handle.
101    pub fn take_handle(mut self: Pin<&mut Self>) -> H
102    where
103        H: zx::HandleBased,
104    {
105        if self.registration.is_registered() {
106            self.as_mut().unregister();
107        }
108        mem::replace(self.project().handle, zx::NullableHandle::invalid().into())
109    }
110
111    fn register(
112        mut registration: Pin<&mut RawReceiverRegistration<OnSignalsReceiver>>,
113        handle: &H,
114        signals: zx::Signals,
115        cx: Option<&mut Context<'_>>,
116    ) -> Result<(), zx::Status> {
117        registration.as_mut().register(EHandle::local());
118
119        // If a context has been supplied, we must register it now before calling
120        // `wait_async_handle` below to avoid races.
121        if let Some(cx) = cx {
122            registration.receiver().task.register(cx.waker());
123        }
124
125        handle.as_handle_ref().wait_async(
126            registration.port().unwrap(),
127            registration.key().unwrap(),
128            signals,
129            zx::WaitAsyncOpts::empty(),
130        )?;
131
132        Ok(())
133    }
134
135    fn unregister(self: Pin<&mut Self>) {
136        let mut this = self.project();
137        if let Some((ehandle, key)) = this.registration.as_mut().unregister()
138            && this.registration.receiver().maybe_signals.load(Ordering::SeqCst) == 0
139        {
140            // Ignore the error from zx_port_cancel, because it might just be a race condition.
141            // If the packet is handled between the above maybe_signals check and the port
142            // cancel, it will fail with ZX_ERR_NOT_FOUND, and we can't do anything about it.
143            let _ = ehandle.port().cancel(key);
144        }
145    }
146}
147
148impl<H: AsHandleRef> Future for OnSignals<'_, H> {
149    type Output = Result<zx::Signals, zx::Status>;
150    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
151        if !self.registration.is_registered() {
152            match self
153                .handle
154                .as_handle_ref()
155                .wait_one(self.signals, zx::MonotonicInstant::INFINITE_PAST)
156                .to_result()
157            {
158                Ok(signals) => Poll::Ready(Ok(signals)),
159                Err(zx::Status::TIMED_OUT) => {
160                    let mut this = self.project();
161                    Self::register(
162                        this.registration.as_mut(),
163                        this.handle,
164                        *this.signals,
165                        Some(cx),
166                    )?;
167                    Poll::Pending
168                }
169                Err(e) => Poll::Ready(Err(e)),
170            }
171        } else {
172            match self.registration.receiver().get_signals(cx) {
173                Poll::Ready(signals) => Poll::Ready(Ok(signals)),
174                Poll::Pending => {
175                    // We haven't received a notification for the signals, but we still want to poll
176                    // the kernel in case the notification hasn't been processed yet by the
177                    // executor.  This behaviour is relied upon in some cases: in Component Manager,
178                    // in some shutdown paths, it wants to drain and process all messages in
179                    // channels before it closes them.  There is no other reliable way to flush a
180                    // pending notification (particularly on a multi-threaded executor).  This will
181                    // incur a small performance penalty in the case that this future has been
182                    // polled when no notification was actually received (such as can be the case
183                    // with some futures combinators).
184                    match self
185                        .handle
186                        .as_handle_ref()
187                        .wait_one(self.signals, zx::MonotonicInstant::INFINITE_PAST)
188                        .to_result()
189                    {
190                        Ok(signals) => Poll::Ready(Ok(signals)),
191                        Err(_) => Poll::Pending,
192                    }
193                }
194            }
195        }
196    }
197}
198
199impl<H: AsHandleRef> fmt::Debug for OnSignals<'_, H> {
200    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
201        write!(f, "OnSignals")
202    }
203}
204
205impl<H: AsHandleRef> AsHandleRef for OnSignals<'_, H> {
206    fn as_handle_ref(&self) -> zx::HandleRef<'_> {
207        self.handle.as_handle_ref()
208    }
209}
210
211impl<H: AsHandleRef> AsRef<H> for OnSignals<'_, H> {
212    fn as_ref(&self) -> &H {
213        &self.handle
214    }
215}
216
217/// Alias for the common case where OnSignals is used with zx::HandleRef.
218pub type OnSignalsRef<'a> = OnSignals<'a, zx::HandleRef<'a>>;
219
220#[cfg(test)]
221mod test {
222    use super::*;
223    use crate::TestExecutor;
224    use assert_matches::assert_matches;
225    use futures::future::{FutureExt, pending};
226    use std::pin::pin;
227    use std::task::Waker;
228
229    #[test]
230    fn wait_for_event() -> Result<(), zx::Status> {
231        let mut exec = crate::TestExecutor::new();
232        let mut deliver_events =
233            || assert!(exec.run_until_stalled(&mut pending::<()>()).is_pending());
234
235        let event = zx::Event::create();
236        let mut signals = pin!(OnSignals::new(&event, zx::Signals::EVENT_SIGNALED));
237        let (waker, waker_count) = futures_test::task::new_count_waker();
238        let cx = &mut std::task::Context::from_waker(&waker);
239
240        // Check that `signals` is still pending before the event has been signaled
241        assert_eq!(signals.poll_unpin(cx), Poll::Pending);
242        deliver_events();
243        assert_eq!(waker_count, 0);
244        assert_eq!(signals.poll_unpin(cx), Poll::Pending);
245
246        // signal the event and check that `signals` has been woken up and is
247        // no longer pending
248        event.signal(zx::Signals::NONE, zx::Signals::EVENT_SIGNALED)?;
249        deliver_events();
250        assert_eq!(waker_count, 1);
251        assert_eq!(signals.poll_unpin(cx), Poll::Ready(Ok(zx::Signals::EVENT_SIGNALED)));
252
253        Ok(())
254    }
255
256    #[test]
257    fn drop_before_event() {
258        let mut fut = std::pin::pin!(async {
259            let ehandle = EHandle::local();
260
261            let event = zx::Event::create();
262            let key = {
263                let mut signals = pin!(OnSignals::new(&event, zx::Signals::EVENT_SIGNALED));
264                assert_eq!(futures::poll!(&mut signals), Poll::Pending);
265                signals.registration.key().unwrap()
266            };
267
268            assert!(ehandle.port().cancel(key) == Err(zx::Status::NOT_FOUND));
269        });
270
271        assert!(TestExecutor::new().run_until_stalled(&mut fut).is_ready());
272    }
273
274    #[test]
275    fn test_always_polls() {
276        let mut exec = TestExecutor::new();
277
278        let (rx, tx) = zx::Channel::create();
279
280        let mut fut = pin!(OnSignals::new(&rx, zx::Signals::CHANNEL_READABLE));
281
282        assert_eq!(exec.run_until_stalled(&mut fut), Poll::Pending);
283
284        tx.write(b"hello", &mut []).expect("write failed");
285
286        // Poll the future directly which guarantees the port notification for the write hasn't
287        // arrived.
288        assert_matches!(
289            fut.poll(&mut Context::from_waker(Waker::noop())),
290            Poll::Ready(Ok(signals)) if signals.contains(zx::Signals::CHANNEL_READABLE)
291        );
292    }
293
294    #[test]
295    fn test_take_handle() {
296        let mut exec = TestExecutor::new();
297
298        let (rx, tx) = zx::Channel::create();
299
300        let mut fut = pin!(OnSignals::new(rx, zx::Signals::CHANNEL_READABLE));
301
302        assert_eq!(exec.run_until_stalled(&mut fut), Poll::Pending);
303
304        tx.write(b"hello", &mut []).expect("write failed");
305
306        assert_matches!(exec.run_until_stalled(&mut fut), Poll::Ready(Ok(_)));
307
308        let mut message = zx::MessageBuf::new();
309        fut.take_handle().read(&mut message).unwrap();
310
311        assert_eq!(message.bytes(), b"hello");
312    }
313}