fuchsia_async/handle/zircon/
on_signals.rs1use std::future::Future;
6use std::marker::PhantomData;
7use std::pin::Pin;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::task::Poll;
10use std::{fmt, mem};
11
12use crate::runtime::{EHandle, PacketReceiver, RawReceiverRegistration};
13use futures::task::{AtomicWaker, Context};
14use zx::AsHandleRef;
15
16struct OnSignalsReceiver {
17 maybe_signals: AtomicUsize,
18 task: AtomicWaker,
19}
20
21impl OnSignalsReceiver {
22 fn get_signals(&self, cx: &mut Context<'_>) -> Poll<zx::Signals> {
23 let mut signals = self.maybe_signals.load(Ordering::Relaxed);
24 if signals == 0 {
25 self.task.register(cx.waker());
27 signals = self.maybe_signals.load(Ordering::SeqCst);
30 }
31 if signals == 0 {
32 Poll::Pending
33 } else {
34 Poll::Ready(zx::Signals::from_bits_truncate(signals as u32))
35 }
36 }
37
38 fn set_signals(&self, signals: zx::Signals) {
39 self.maybe_signals.store(signals.bits() as usize, Ordering::SeqCst);
40 self.task.wake();
41 }
42}
43
44impl PacketReceiver for OnSignalsReceiver {
45 fn receive_packet(&self, packet: zx::Packet) {
46 let observed = if let zx::PacketContents::SignalOne(p) = packet.contents() {
47 p.observed()
48 } else {
49 return;
50 };
51
52 self.set_signals(observed);
53 }
54}
55
56pin_project_lite::pin_project! {
57 #[must_use = "futures do nothing unless polled"]
59 pub struct OnSignals<'a, H: AsHandleRef> {
60 handle: H,
61 signals: zx::Signals,
62 #[pin]
63 registration: RawReceiverRegistration<OnSignalsReceiver>,
64 phantom: PhantomData<&'a H>,
65 }
66
67 impl<'a, H: AsHandleRef> PinnedDrop for OnSignals<'a, H> {
68 fn drop(mut this: Pin<&mut Self>) {
69 this.unregister();
70 }
71 }
72}
73
74impl<'a, H: AsHandleRef + 'a> OnSignals<'a, H> {
75 pub fn new(handle: H, signals: zx::Signals) -> Self {
78 OnSignals {
90 handle,
91 signals,
92 registration: RawReceiverRegistration::new(OnSignalsReceiver {
93 maybe_signals: AtomicUsize::new(0),
94 task: AtomicWaker::new(),
95 }),
96 phantom: PhantomData,
97 }
98 }
99
100 pub fn take_handle(mut self: Pin<&mut Self>) -> H
102 where
103 H: zx::HandleBased,
104 {
105 if self.registration.is_registered() {
106 self.as_mut().unregister();
107 }
108 mem::replace(self.project().handle, zx::NullableHandle::invalid().into())
109 }
110
111 fn register(
112 mut registration: Pin<&mut RawReceiverRegistration<OnSignalsReceiver>>,
113 handle: &H,
114 signals: zx::Signals,
115 cx: Option<&mut Context<'_>>,
116 ) -> Result<(), zx::Status> {
117 registration.as_mut().register(EHandle::local());
118
119 if let Some(cx) = cx {
122 registration.receiver().task.register(cx.waker());
123 }
124
125 handle.as_handle_ref().wait_async(
126 registration.port().unwrap(),
127 registration.key().unwrap(),
128 signals,
129 zx::WaitAsyncOpts::empty(),
130 )?;
131
132 Ok(())
133 }
134
135 fn unregister(self: Pin<&mut Self>) {
136 let mut this = self.project();
137 if let Some((ehandle, key)) = this.registration.as_mut().unregister()
138 && this.registration.receiver().maybe_signals.load(Ordering::SeqCst) == 0
139 {
140 let _ = ehandle.port().cancel(key);
144 }
145 }
146}
147
148impl<H: AsHandleRef> Future for OnSignals<'_, H> {
149 type Output = Result<zx::Signals, zx::Status>;
150 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
151 if !self.registration.is_registered() {
152 match self
153 .handle
154 .as_handle_ref()
155 .wait_one(self.signals, zx::MonotonicInstant::INFINITE_PAST)
156 .to_result()
157 {
158 Ok(signals) => Poll::Ready(Ok(signals)),
159 Err(zx::Status::TIMED_OUT) => {
160 let mut this = self.project();
161 Self::register(
162 this.registration.as_mut(),
163 this.handle,
164 *this.signals,
165 Some(cx),
166 )?;
167 Poll::Pending
168 }
169 Err(e) => Poll::Ready(Err(e)),
170 }
171 } else {
172 match self.registration.receiver().get_signals(cx) {
173 Poll::Ready(signals) => Poll::Ready(Ok(signals)),
174 Poll::Pending => {
175 match self
185 .handle
186 .as_handle_ref()
187 .wait_one(self.signals, zx::MonotonicInstant::INFINITE_PAST)
188 .to_result()
189 {
190 Ok(signals) => Poll::Ready(Ok(signals)),
191 Err(_) => Poll::Pending,
192 }
193 }
194 }
195 }
196 }
197}
198
199impl<H: AsHandleRef> fmt::Debug for OnSignals<'_, H> {
200 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
201 write!(f, "OnSignals")
202 }
203}
204
205impl<H: AsHandleRef> AsHandleRef for OnSignals<'_, H> {
206 fn as_handle_ref(&self) -> zx::HandleRef<'_> {
207 self.handle.as_handle_ref()
208 }
209}
210
211impl<H: AsHandleRef> AsRef<H> for OnSignals<'_, H> {
212 fn as_ref(&self) -> &H {
213 &self.handle
214 }
215}
216
217pub type OnSignalsRef<'a> = OnSignals<'a, zx::HandleRef<'a>>;
219
220#[cfg(test)]
221mod test {
222 use super::*;
223 use crate::TestExecutor;
224 use assert_matches::assert_matches;
225 use futures::future::{FutureExt, pending};
226 use std::pin::pin;
227 use std::task::Waker;
228
229 #[test]
230 fn wait_for_event() -> Result<(), zx::Status> {
231 let mut exec = crate::TestExecutor::new();
232 let mut deliver_events =
233 || assert!(exec.run_until_stalled(&mut pending::<()>()).is_pending());
234
235 let event = zx::Event::create();
236 let mut signals = pin!(OnSignals::new(&event, zx::Signals::EVENT_SIGNALED));
237 let (waker, waker_count) = futures_test::task::new_count_waker();
238 let cx = &mut std::task::Context::from_waker(&waker);
239
240 assert_eq!(signals.poll_unpin(cx), Poll::Pending);
242 deliver_events();
243 assert_eq!(waker_count, 0);
244 assert_eq!(signals.poll_unpin(cx), Poll::Pending);
245
246 event.signal(zx::Signals::NONE, zx::Signals::EVENT_SIGNALED)?;
249 deliver_events();
250 assert_eq!(waker_count, 1);
251 assert_eq!(signals.poll_unpin(cx), Poll::Ready(Ok(zx::Signals::EVENT_SIGNALED)));
252
253 Ok(())
254 }
255
256 #[test]
257 fn drop_before_event() {
258 let mut fut = std::pin::pin!(async {
259 let ehandle = EHandle::local();
260
261 let event = zx::Event::create();
262 let key = {
263 let mut signals = pin!(OnSignals::new(&event, zx::Signals::EVENT_SIGNALED));
264 assert_eq!(futures::poll!(&mut signals), Poll::Pending);
265 signals.registration.key().unwrap()
266 };
267
268 assert!(ehandle.port().cancel(key) == Err(zx::Status::NOT_FOUND));
269 });
270
271 assert!(TestExecutor::new().run_until_stalled(&mut fut).is_ready());
272 }
273
274 #[test]
275 fn test_always_polls() {
276 let mut exec = TestExecutor::new();
277
278 let (rx, tx) = zx::Channel::create();
279
280 let mut fut = pin!(OnSignals::new(&rx, zx::Signals::CHANNEL_READABLE));
281
282 assert_eq!(exec.run_until_stalled(&mut fut), Poll::Pending);
283
284 tx.write(b"hello", &mut []).expect("write failed");
285
286 assert_matches!(
289 fut.poll(&mut Context::from_waker(Waker::noop())),
290 Poll::Ready(Ok(signals)) if signals.contains(zx::Signals::CHANNEL_READABLE)
291 );
292 }
293
294 #[test]
295 fn test_take_handle() {
296 let mut exec = TestExecutor::new();
297
298 let (rx, tx) = zx::Channel::create();
299
300 let mut fut = pin!(OnSignals::new(rx, zx::Signals::CHANNEL_READABLE));
301
302 assert_eq!(exec.run_until_stalled(&mut fut), Poll::Pending);
303
304 tx.write(b"hello", &mut []).expect("write failed");
305
306 assert_matches!(exec.run_until_stalled(&mut fut), Poll::Ready(Ok(_)));
307
308 let mut message = zx::MessageBuf::new();
309 fut.take_handle().read(&mut message).unwrap();
310
311 assert_eq!(message.bytes(), b"hello");
312 }
313}