1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
45use std::future::Future;
6use std::marker::PhantomData;
7use std::pin::Pin;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::sync::Arc;
10use std::task::Poll;
11use std::{fmt, mem};
1213use crate::runtime::{EHandle, PacketReceiver, ReceiverRegistration};
14use futures::task::{AtomicWaker, Context};
15use zx::{self as zx, AsHandleRef};
1617struct OnSignalsReceiver {
18 maybe_signals: AtomicUsize,
19 task: AtomicWaker,
20}
2122impl OnSignalsReceiver {
23fn get_signals(&self, cx: &mut Context<'_>) -> Poll<zx::Signals> {
24let mut signals = self.maybe_signals.load(Ordering::Relaxed);
25if signals == 0 {
26// No signals were received-- register to receive a wakeup when they arrive.
27self.task.register(cx.waker());
28// Check again for signals after registering for a wakeup in case signals
29 // arrived between registering and the initial load of signals
30signals = self.maybe_signals.load(Ordering::SeqCst);
31 }
32if signals == 0 {
33 Poll::Pending
34 } else {
35 Poll::Ready(zx::Signals::from_bits_truncate(signals as u32))
36 }
37 }
3839fn set_signals(&self, signals: zx::Signals) {
40self.maybe_signals.store(signals.bits() as usize, Ordering::SeqCst);
41self.task.wake();
42 }
43}
4445impl PacketReceiver for OnSignalsReceiver {
46fn receive_packet(&self, packet: zx::Packet) {
47let observed = if let zx::PacketContents::SignalOne(p) = packet.contents() {
48 p.observed()
49 } else {
50return;
51 };
5253self.set_signals(observed);
54 }
55}
5657/// A future that completes when some set of signals become available on a Handle.
58#[must_use = "futures do nothing unless polled"]
59pub struct OnSignals<'a, H: AsHandleRef> {
60 handle: H,
61 signals: zx::Signals,
62 registration: Option<ReceiverRegistration<OnSignalsReceiver>>,
63 phantom: PhantomData<&'a H>,
64}
6566/// Alias for the common case where OnSignals is used with zx::HandleRef.
67pub type OnSignalsRef<'a> = OnSignals<'a, zx::HandleRef<'a>>;
6869impl<'a, H: AsHandleRef + 'a> OnSignals<'a, H> {
70/// Creates a new `OnSignals` object which will receive notifications when
71 /// any signals in `signals` occur on `handle`.
72pub fn new(handle: H, signals: zx::Signals) -> Self {
73// We don't register for the signals until first polled. When we are first polled, we'll
74 // check to see if the signals are set and if they are, we're done. If they aren't, we then
75 // register for an asynchronous notification via the port.
76 //
77 // We could change the code to register for the asynchronous notification here, but then
78 // when first polled, if the notification hasn't arrived, we'll still check to see if the
79 // signals are set (see below for the reason why). Given that the time between construction
80 // and when we first poll is typically small, registering here probably won't make much
81 // difference (and on a single-threaded executor, a notification is unlikely to be processed
82 // before the first poll anyway). The way we have it now means we don't have to register at
83 // all if the signals are already set, which will be a win some of the time.
84OnSignals { handle, signals, registration: None, phantom: PhantomData }
85 }
8687/// Takes the handle.
88pub fn take_handle(mut self) -> H
89where
90H: zx::HandleBased,
91 {
92self.unregister();
93 std::mem::replace(&mut self.handle, zx::Handle::invalid().into())
94 }
9596/// This function allows the `OnSignals` object to live for the `'static` lifetime, at the cost
97 /// of disabling automatic cleanup of the port wait.
98 ///
99 /// WARNING: Do not use unless you can guarantee that either:
100 /// - The future is not dropped before it completes, or
101 /// - The handle is dropped without creating additional OnSignals futures for it.
102 ///
103 /// Creating an OnSignals calls zx_object_wait_async, which consumes a small amount of kernel
104 /// resources. Dropping the OnSignals calls zx_port_cancel to clean up. But calling
105 /// extend_lifetime disables this cleanup, since the zx_port_wait call requires a reference to
106 /// the handle. The port registration can also be cleaned up by closing the handle or by
107 /// waiting for the signal to be triggered. But if neither of these happens, the registration
108 /// is leaked. This wastes kernel memory and the kernel will eventually kill your process to
109 /// force a cleanup.
110 ///
111 /// Note that `OnSignals` will not fire if the handle that was used to create it is dropped or
112 /// transferred to another process.
113// TODO(https://fxbug.dev/42182035): Try to remove this footgun.
114pub fn extend_lifetime(mut self) -> LeakedOnSignals {
115match self.registration.take() {
116Some(r) => LeakedOnSignals { registration: Ok(r) },
117None => LeakedOnSignals { registration: self.register(None) },
118 }
119 }
120121fn register(
122&self,
123 cx: Option<&mut Context<'_>>,
124 ) -> Result<ReceiverRegistration<OnSignalsReceiver>, zx::Status> {
125let registration = EHandle::local().register_receiver(Arc::new(OnSignalsReceiver {
126 maybe_signals: AtomicUsize::new(0),
127 task: AtomicWaker::new(),
128 }));
129130// If a context has been supplied, we must register it now before calling
131 // `wait_async_handle` below to avoid races.
132if let Some(cx) = cx {
133 registration.task.register(cx.waker());
134 }
135136self.handle.wait_async_handle(
137 registration.port(),
138 registration.key(),
139self.signals,
140 zx::WaitAsyncOpts::empty(),
141 )?;
142143Ok(registration)
144 }
145146fn unregister(&mut self) {
147if let Some(registration) = self.registration.take() {
148if registration.receiver().maybe_signals.load(Ordering::SeqCst) == 0 {
149// Ignore the error from zx_port_cancel, because it might just be a race condition.
150 // If the packet is handled between the above maybe_signals check and the port
151 // cancel, it will fail with ZX_ERR_NOT_FOUND, and we can't do anything about it.
152let _ = registration.port().cancel(&self.handle, registration.key());
153 }
154 }
155 }
156}
157158impl<H: AsHandleRef + Unpin> Future for OnSignals<'_, H> {
159type Output = Result<zx::Signals, zx::Status>;
160fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
161match &self.registration {
162None => {
163match self.handle.wait_handle(self.signals, zx::MonotonicInstant::INFINITE_PAST) {
164Ok(signals) => Poll::Ready(Ok(signals)),
165Err(zx::Status::TIMED_OUT) => {
166let registration = self.register(Some(cx))?;
167self.get_mut().registration = Some(registration);
168 Poll::Pending
169 }
170Err(e) => Poll::Ready(Err(e)),
171 }
172 }
173Some(r) => match r.receiver().get_signals(cx) {
174 Poll::Ready(signals) => Poll::Ready(Ok(signals)),
175 Poll::Pending => {
176// We haven't received a notification for the signals, but we still want to poll
177 // the kernel in case the notification hasn't been processed yet by the
178 // executor. This behaviour is relied upon in some cases: in Component Manager,
179 // in some shutdown paths, it wants to drain and process all messages in
180 // channels before it closes them. There is no other reliable way to flush a
181 // pending notification (particularly on a multi-threaded executor). This will
182 // incur a small performance penalty in the case that this future has been
183 // polled when no notification was actually received (such as can be the case
184 // with some futures combinators).
185match self.handle.wait_handle(self.signals, zx::MonotonicInstant::INFINITE_PAST)
186 {
187Ok(signals) => Poll::Ready(Ok(signals)),
188Err(_) => Poll::Pending,
189 }
190 }
191 },
192 }
193 }
194}
195196impl<H: AsHandleRef> fmt::Debug for OnSignals<'_, H> {
197fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
198write!(f, "OnSignals")
199 }
200}
201202impl<H: AsHandleRef> Drop for OnSignals<'_, H> {
203fn drop(&mut self) {
204self.unregister();
205 }
206}
207208impl<H: AsHandleRef> AsHandleRef for OnSignals<'_, H> {
209fn as_handle_ref(&self) -> zx::HandleRef<'_> {
210self.handle.as_handle_ref()
211 }
212}
213214impl<H: AsHandleRef> AsRef<H> for OnSignals<'_, H> {
215fn as_ref(&self) -> &H {
216&self.handle
217 }
218}
219220pub struct LeakedOnSignals {
221 registration: Result<ReceiverRegistration<OnSignalsReceiver>, zx::Status>,
222}
223224impl Future for LeakedOnSignals {
225type Output = Result<zx::Signals, zx::Status>;
226fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
227let reg = self.registration.as_mut().map_err(|e| mem::replace(e, zx::Status::OK))?;
228 reg.receiver().get_signals(cx).map(Ok)
229 }
230}
231232#[cfg(test)]
233mod test {
234use super::*;
235use crate::TestExecutor;
236use assert_matches::assert_matches;
237use futures::future::{pending, FutureExt};
238use futures::task::{waker, ArcWake};
239use std::pin::pin;
240241#[test]
242fn wait_for_event() -> Result<(), zx::Status> {
243let mut exec = crate::TestExecutor::new();
244let mut deliver_events =
245 || assert!(exec.run_until_stalled(&mut pending::<()>()).is_pending());
246247let event = zx::Event::create();
248let mut signals = OnSignals::new(&event, zx::Signals::EVENT_SIGNALED);
249let (waker, waker_count) = futures_test::task::new_count_waker();
250let cx = &mut std::task::Context::from_waker(&waker);
251252// Check that `signals` is still pending before the event has been signaled
253assert_eq!(signals.poll_unpin(cx), Poll::Pending);
254 deliver_events();
255assert_eq!(waker_count, 0);
256assert_eq!(signals.poll_unpin(cx), Poll::Pending);
257258// signal the event and check that `signals` has been woken up and is
259 // no longer pending
260event.signal_handle(zx::Signals::NONE, zx::Signals::EVENT_SIGNALED)?;
261 deliver_events();
262assert_eq!(waker_count, 1);
263assert_eq!(signals.poll_unpin(cx), Poll::Ready(Ok(zx::Signals::EVENT_SIGNALED)));
264265Ok(())
266 }
267268#[test]
269fn drop_before_event() {
270let mut fut = std::pin::pin!(async {
271let ehandle = EHandle::local();
272273let event = zx::Event::create();
274let mut signals = OnSignals::new(&event, zx::Signals::EVENT_SIGNALED);
275assert_eq!(futures::poll!(&mut signals), Poll::Pending);
276let key = signals.registration.as_ref().unwrap().key();
277278 std::mem::drop(signals);
279assert!(ehandle.port().cancel(&event, key) == Err(zx::Status::NOT_FOUND));
280281// try again but with extend_lifetime
282let signals = OnSignals::new(&event, zx::Signals::EVENT_SIGNALED).extend_lifetime();
283let key = signals.registration.as_ref().unwrap().key();
284 std::mem::drop(signals);
285assert!(ehandle.port().cancel(&event, key) == Ok(()));
286 });
287288assert!(TestExecutor::new().run_until_stalled(&mut fut).is_ready());
289 }
290291#[test]
292fn test_always_polls() {
293let mut exec = TestExecutor::new();
294295let (rx, tx) = zx::Channel::create();
296297let mut fut = pin!(OnSignals::new(&rx, zx::Signals::CHANNEL_READABLE));
298299assert_eq!(exec.run_until_stalled(&mut fut), Poll::Pending);
300301 tx.write(b"hello", &mut []).expect("write failed");
302303struct Waker;
304impl ArcWake for Waker {
305fn wake_by_ref(_arc_self: &Arc<Self>) {}
306 }
307308// Poll the future directly which guarantees the port notification for the write hasn't
309 // arrived.
310assert_matches!(
311 fut.poll(&mut Context::from_waker(&waker(Arc::new(Waker)))),
312 Poll::Ready(Ok(signals)) if signals.contains(zx::Signals::CHANNEL_READABLE)
313 );
314 }
315316#[test]
317fn test_take_handle() {
318let mut exec = TestExecutor::new();
319320let (rx, tx) = zx::Channel::create();
321322let mut fut = OnSignals::new(rx, zx::Signals::CHANNEL_READABLE);
323324assert_eq!(exec.run_until_stalled(&mut fut), Poll::Pending);
325326 tx.write(b"hello", &mut []).expect("write failed");
327328assert_matches!(exec.run_until_stalled(&mut fut), Poll::Ready(Ok(_)));
329330let mut message = zx::MessageBuf::new();
331 fut.take_handle().read(&mut message).unwrap();
332333assert_eq!(message.bytes(), b"hello");
334 }
335}