fuchsia_async/handle/zircon/
on_signals.rs1use std::future::Future;
6use std::marker::PhantomData;
7use std::pin::Pin;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::task::Poll;
10use std::{fmt, mem};
11
12use crate::runtime::{EHandle, PacketReceiver, RawReceiverRegistration};
13use futures::task::{AtomicWaker, Context};
14use zx::AsHandleRef;
15
16struct OnSignalsReceiver {
17 maybe_signals: AtomicUsize,
18 task: AtomicWaker,
19}
20
21impl OnSignalsReceiver {
22 fn get_signals(&self, cx: &mut Context<'_>) -> Poll<zx::Signals> {
23 let mut signals = self.maybe_signals.load(Ordering::Relaxed);
24 if signals == 0 {
25 self.task.register(cx.waker());
27 signals = self.maybe_signals.load(Ordering::SeqCst);
30 }
31 if signals == 0 {
32 Poll::Pending
33 } else {
34 Poll::Ready(zx::Signals::from_bits_truncate(signals as u32))
35 }
36 }
37
38 fn set_signals(&self, signals: zx::Signals) {
39 self.maybe_signals.store(signals.bits() as usize, Ordering::SeqCst);
40 self.task.wake();
41 }
42}
43
44impl PacketReceiver for OnSignalsReceiver {
45 fn receive_packet(&self, packet: zx::Packet) {
46 let observed = if let zx::PacketContents::SignalOne(p) = packet.contents() {
47 p.observed()
48 } else {
49 return;
50 };
51
52 self.set_signals(observed);
53 }
54}
55
56pin_project_lite::pin_project! {
57 #[must_use = "futures do nothing unless polled"]
59 pub struct OnSignalsFuture<'a, H: AsHandleRef> {
60 handle: H,
61 signals: zx::Signals,
62 #[pin]
63 registration: RawReceiverRegistration<OnSignalsReceiver>,
64 phantom: PhantomData<&'a H>,
65 }
66
67 impl<'a, H: AsHandleRef> PinnedDrop for OnSignalsFuture<'a, H> {
68 fn drop(mut this: Pin<&mut Self>) {
69 this.unregister();
70 }
71 }
72}
73
74impl<'a, H: AsHandleRef + 'a> OnSignalsFuture<'a, H> {
75 pub fn new(handle: H, signals: zx::Signals) -> Self {
78 OnSignalsFuture {
90 handle,
91 signals,
92 registration: RawReceiverRegistration::new(OnSignalsReceiver {
93 maybe_signals: AtomicUsize::new(0),
94 task: AtomicWaker::new(),
95 }),
96 phantom: PhantomData,
97 }
98 }
99
100 pub fn take_handle(mut self: Pin<&mut Self>) -> H
102 where
103 H: zx::HandleBased,
104 {
105 if self.registration.is_registered() {
106 self.as_mut().unregister();
107 }
108 mem::replace(self.project().handle, zx::NullableHandle::invalid().into())
109 }
110
111 fn register(
112 mut registration: Pin<&mut RawReceiverRegistration<OnSignalsReceiver>>,
113 handle: &H,
114 signals: zx::Signals,
115 cx: Option<&mut Context<'_>>,
116 ) -> Result<(), zx::Status> {
117 registration.as_mut().register(EHandle::local());
118
119 if let Some(cx) = cx {
122 registration.receiver().task.register(cx.waker());
123 }
124
125 handle.wait_async_handle(
126 registration.port().unwrap(),
127 registration.key().unwrap(),
128 signals,
129 zx::WaitAsyncOpts::empty(),
130 )?;
131
132 Ok(())
133 }
134
135 fn unregister(self: Pin<&mut Self>) {
136 let mut this = self.project();
137 if let Some((ehandle, key)) = this.registration.as_mut().unregister()
138 && this.registration.receiver().maybe_signals.load(Ordering::SeqCst) == 0
139 {
140 let _ = ehandle.port().cancel(key);
144 }
145 }
146}
147
148impl<H: AsHandleRef> Future for OnSignalsFuture<'_, H> {
149 type Output = Result<zx::Signals, zx::Status>;
150 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
151 if !self.registration.is_registered() {
152 match self
153 .handle
154 .as_handle_ref()
155 .wait_one(self.signals, zx::MonotonicInstant::INFINITE_PAST)
156 .to_result()
157 {
158 Ok(signals) => Poll::Ready(Ok(signals)),
159 Err(zx::Status::TIMED_OUT) => {
160 let mut this = self.project();
161 Self::register(
162 this.registration.as_mut(),
163 this.handle,
164 *this.signals,
165 Some(cx),
166 )?;
167 Poll::Pending
168 }
169 Err(e) => Poll::Ready(Err(e)),
170 }
171 } else {
172 match self.registration.receiver().get_signals(cx) {
173 Poll::Ready(signals) => Poll::Ready(Ok(signals)),
174 Poll::Pending => {
175 match self
185 .handle
186 .as_handle_ref()
187 .wait_one(self.signals, zx::MonotonicInstant::INFINITE_PAST)
188 .to_result()
189 {
190 Ok(signals) => Poll::Ready(Ok(signals)),
191 Err(_) => Poll::Pending,
192 }
193 }
194 }
195 }
196 }
197}
198
199impl<H: AsHandleRef> fmt::Debug for OnSignalsFuture<'_, H> {
200 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
201 write!(f, "OnSignals")
202 }
203}
204
205impl<H: AsHandleRef> AsHandleRef for OnSignalsFuture<'_, H> {
206 fn as_handle_ref(&self) -> zx::HandleRef<'_> {
207 self.handle.as_handle_ref()
208 }
209}
210
211impl<H: AsHandleRef> AsRef<H> for OnSignalsFuture<'_, H> {
212 fn as_ref(&self) -> &H {
213 &self.handle
214 }
215}
216
217#[must_use = "futures do nothing unless polled"]
219pub struct OnSignals<'a, H: AsHandleRef> {
220 future: Pin<Box<OnSignalsFuture<'a, H>>>,
221}
222
223impl<'a, H: AsHandleRef + 'a> OnSignals<'a, H> {
224 pub fn new(handle: H, signals: zx::Signals) -> Self {
227 Self { future: Box::pin(OnSignalsFuture::new(handle, signals)) }
228 }
229
230 pub fn take_handle(mut self) -> H
232 where
233 H: zx::HandleBased,
234 {
235 self.future.as_mut().take_handle()
236 }
237}
238
239impl<H: AsHandleRef> Future for OnSignals<'_, H> {
240 type Output = Result<zx::Signals, zx::Status>;
241 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
242 Pin::into_inner(self).future.as_mut().poll(cx)
243 }
244}
245
246impl<H: AsHandleRef> fmt::Debug for OnSignals<'_, H> {
247 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
248 write!(f, "OnSignals")
249 }
250}
251
252impl<H: AsHandleRef> AsHandleRef for OnSignals<'_, H> {
253 fn as_handle_ref(&self) -> zx::HandleRef<'_> {
254 self.future.as_handle_ref()
255 }
256}
257
258impl<H: AsHandleRef> AsRef<H> for OnSignals<'_, H> {
259 fn as_ref(&self) -> &H {
260 &self.future.handle
261 }
262}
263
264pub type OnSignalsRef<'a> = OnSignals<'a, zx::HandleRef<'a>>;
266
267#[cfg(test)]
268mod test {
269 use super::*;
270 use crate::TestExecutor;
271 use assert_matches::assert_matches;
272 use futures::future::{FutureExt, pending};
273 use futures::task::{ArcWake, waker};
274 use std::pin::pin;
275 use std::sync::Arc;
276
277 #[test]
278 fn wait_for_event() -> Result<(), zx::Status> {
279 let mut exec = crate::TestExecutor::new();
280 let mut deliver_events =
281 || assert!(exec.run_until_stalled(&mut pending::<()>()).is_pending());
282
283 let event = zx::Event::create();
284 let mut signals = OnSignals::new(&event, zx::Signals::EVENT_SIGNALED);
285 let (waker, waker_count) = futures_test::task::new_count_waker();
286 let cx = &mut std::task::Context::from_waker(&waker);
287
288 assert_eq!(signals.poll_unpin(cx), Poll::Pending);
290 deliver_events();
291 assert_eq!(waker_count, 0);
292 assert_eq!(signals.poll_unpin(cx), Poll::Pending);
293
294 event.signal(zx::Signals::NONE, zx::Signals::EVENT_SIGNALED)?;
297 deliver_events();
298 assert_eq!(waker_count, 1);
299 assert_eq!(signals.poll_unpin(cx), Poll::Ready(Ok(zx::Signals::EVENT_SIGNALED)));
300
301 Ok(())
302 }
303
304 #[test]
305 fn drop_before_event() {
306 let mut fut = std::pin::pin!(async {
307 let ehandle = EHandle::local();
308
309 let event = zx::Event::create();
310 let mut signals = OnSignals::new(&event, zx::Signals::EVENT_SIGNALED);
311 assert_eq!(futures::poll!(&mut signals), Poll::Pending);
312 let key = signals.future.registration.key().unwrap();
313
314 std::mem::drop(signals);
315 assert!(ehandle.port().cancel(key) == Err(zx::Status::NOT_FOUND));
316 });
317
318 assert!(TestExecutor::new().run_until_stalled(&mut fut).is_ready());
319 }
320
321 #[test]
322 fn test_always_polls() {
323 let mut exec = TestExecutor::new();
324
325 let (rx, tx) = zx::Channel::create();
326
327 let mut fut = pin!(OnSignals::new(&rx, zx::Signals::CHANNEL_READABLE));
328
329 assert_eq!(exec.run_until_stalled(&mut fut), Poll::Pending);
330
331 tx.write(b"hello", &mut []).expect("write failed");
332
333 struct Waker;
334 impl ArcWake for Waker {
335 fn wake_by_ref(_arc_self: &Arc<Self>) {}
336 }
337
338 assert_matches!(
341 fut.poll(&mut Context::from_waker(&waker(Arc::new(Waker)))),
342 Poll::Ready(Ok(signals)) if signals.contains(zx::Signals::CHANNEL_READABLE)
343 );
344 }
345
346 #[test]
347 fn test_take_handle() {
348 let mut exec = TestExecutor::new();
349
350 let (rx, tx) = zx::Channel::create();
351
352 let mut fut = OnSignals::new(rx, zx::Signals::CHANNEL_READABLE);
353
354 assert_eq!(exec.run_until_stalled(&mut fut), Poll::Pending);
355
356 tx.write(b"hello", &mut []).expect("write failed");
357
358 assert_matches!(exec.run_until_stalled(&mut fut), Poll::Ready(Ok(_)));
359
360 let mut message = zx::MessageBuf::new();
361 fut.take_handle().read(&mut message).unwrap();
362
363 assert_eq!(message.bytes(), b"hello");
364 }
365}