1use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::LazyLock;
7
8const FUTEX_WAITING: i32 = 0;
12const FUTEX_NOTIFIED: i32 = 1;
16const FUTEX_INTERRUPTED: i32 = 2;
20const FUTEX_USE_PORT: i32 = 3;
24
25const ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE: Ordering = Ordering::Relaxed;
34
35#[derive(Debug)]
46pub struct PortEvent {
47 futex: zx::Futex,
53 port: LazyLock<zx::Port>,
59 has_pending_user_packet: AtomicBool,
62}
63
64#[derive(Clone, Copy, Debug, Eq, PartialEq)]
66pub enum NotifyKind {
67 Regular,
68 Interrupt,
69}
70
71#[derive(Debug, Eq, PartialEq)]
73pub enum PortWaitResult {
74 Signal { key: u64, observed: zx::Signals },
76 Notification { kind: NotifyKind },
78 TimedOut,
80}
81
82impl PortWaitResult {
83 const NOTIFY_REGULAR: Self = Self::Notification { kind: NotifyKind::Regular };
84 const NOTIFY_INTERRUPT: Self = Self::Notification { kind: NotifyKind::Interrupt };
85}
86
87impl PortEvent {
88 pub fn new() -> Self {
90 Self {
91 futex: zx::Futex::new(FUTEX_WAITING),
92 port: LazyLock::new(zx::Port::create),
93 has_pending_user_packet: Default::default(),
94 }
95 }
96
97 pub fn wait(&self, deadline: zx::MonotonicInstant) -> PortWaitResult {
99 let mut state = self.futex.load(ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE);
100 loop {
101 match state {
102 FUTEX_WAITING => match self.futex.wait(FUTEX_WAITING, None, deadline) {
103 Ok(()) | Err(zx::Status::BAD_STATE) => {
104 state = self.futex.load(ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE);
105 }
106 Err(zx::Status::TIMED_OUT) => {
107 return PortWaitResult::TimedOut;
108 }
109 Err(e) => panic!("Unexpected error from zx_futex_wait: {e}"),
110 },
111 FUTEX_NOTIFIED | FUTEX_INTERRUPTED => {
112 match self.futex.compare_exchange(
113 state,
114 FUTEX_WAITING,
115 ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE,
116 ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE,
117 ) {
118 Ok(new_state) => {
119 debug_assert_eq!(new_state, state);
120 return if new_state == FUTEX_INTERRUPTED {
121 PortWaitResult::NOTIFY_INTERRUPT
122 } else {
123 PortWaitResult::NOTIFY_REGULAR
124 };
125 }
126 Err(new_state) => {
127 debug_assert_ne!(new_state, state);
128 state = new_state;
129 }
130 }
131 }
132 FUTEX_USE_PORT => {
133 break;
134 }
135 state => unreachable!("unexpected value = {state}"),
136 }
137 }
138
139 match self.port.wait(deadline) {
140 Ok(packet) => match packet.status() {
141 zx::sys::ZX_OK => {
142 match packet.contents() {
143 zx::PacketContents::SignalOne(sigpkt) => PortWaitResult::Signal {
144 key: packet.key(),
145 observed: sigpkt.observed(),
146 },
147 zx::PacketContents::User(_) => {
148 assert!(self
163 .has_pending_user_packet
164 .swap(false, ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE));
165
166 PortWaitResult::NOTIFY_REGULAR
167 }
168 _contents => panic!("unexpected packet = {:?}", packet),
169 }
170 }
171 zx::sys::ZX_ERR_CANCELED => PortWaitResult::NOTIFY_INTERRUPT,
172 status => {
173 panic!("Unexpected status in port wait {}", status);
174 }
175 },
176 Err(zx::Status::TIMED_OUT) => PortWaitResult::TimedOut,
177 Err(e) => panic!("Unexpected error from port_wait: {e}"),
178 }
179 }
180
181 pub fn object_wait_async(
183 &self,
184 handle: &dyn zx::AsHandleRef,
185 key: u64,
186 signals: zx::Signals,
187 opts: zx::WaitAsyncOpts,
188 ) -> Result<(), zx::Status> {
189 match self.futex.swap(FUTEX_USE_PORT, ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE) {
190 FUTEX_WAITING => {
191 self.futex.wake_all();
192 }
193 state @ (FUTEX_NOTIFIED | FUTEX_INTERRUPTED) => {
194 self.queue_user_packet_data(if state == FUTEX_INTERRUPTED {
195 NotifyKind::Interrupt
196 } else {
197 NotifyKind::Regular
198 })
199 }
200 FUTEX_USE_PORT => {}
201 v => unreachable!("unexpected value = {v}"),
202 }
203
204 handle.wait_async_handle(&self.port, key, signals, opts)
205 }
206
207 pub fn cancel(&self, handle: &zx::HandleRef<'_>, key: u64) {
209 let _: Result<(), zx::Status> = self.port.cancel(handle, key);
210 }
211
212 fn queue_user_packet_data(&self, kind: NotifyKind) {
218 let status = match kind {
219 NotifyKind::Interrupt => zx::sys::ZX_ERR_CANCELED,
220 NotifyKind::Regular => {
221 if self
222 .has_pending_user_packet
223 .swap(true, ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE)
224 {
225 return;
226 }
227
228 zx::sys::ZX_OK
229 }
230 };
231
232 let packet = zx::Packet::from_user_packet(0, status, zx::UserPacket::default());
233 self.port.queue(&packet).unwrap()
234 }
235
236 pub fn notify(&self, kind: NotifyKind) {
239 let futex_val = match kind {
240 NotifyKind::Interrupt => FUTEX_INTERRUPTED,
241 NotifyKind::Regular => FUTEX_NOTIFIED,
242 };
243
244 match self.futex.compare_exchange(
245 FUTEX_WAITING,
246 futex_val,
247 ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE,
248 ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE,
249 ) {
250 Ok(observed) => {
251 debug_assert_eq!(observed, FUTEX_WAITING);
252 self.futex.wake_all();
253 }
254 Err(observed) => match observed {
255 FUTEX_WAITING => unreachable!("this should have passed"),
256 FUTEX_NOTIFIED | FUTEX_INTERRUPTED => {}
257 FUTEX_USE_PORT => {
258 self.queue_user_packet_data(kind);
259 }
260 observed => unreachable!("unexpected value = {observed}"),
261 },
262 }
263 }
264}
265
266#[cfg(test)]
267mod test {
268 use std::sync::Arc;
269
270 use test_case::test_case;
271 use zx::AsHandleRef as _;
272
273 use super::*;
274
275 #[test]
276 fn test_signal_and_wait_block() {
277 const KEY: u64 = 1;
278 const ASSERTED_SIGNAL: zx::Signals = zx::Signals::USER_0;
279
280 let event = Arc::new(PortEvent::new());
281 let object = zx::Event::create();
282 assert_eq!(
283 event.object_wait_async(&object, KEY, ASSERTED_SIGNAL, zx::WaitAsyncOpts::empty()),
284 Ok(())
285 );
286
287 let event_clone = event.clone();
288 let thread = std::thread::spawn(move || {
289 assert_eq!(
290 event_clone.wait(zx::MonotonicInstant::INFINITE),
291 PortWaitResult::Signal { key: KEY, observed: ASSERTED_SIGNAL }
292 );
293 });
294
295 object
296 .signal_handle(
297 zx::Signals::NONE,
298 ASSERTED_SIGNAL,
299 )
300 .unwrap();
301 thread.join().expect("join thread");
302 }
303
304 #[test]
305 fn test_signal_then_wait_nonblock() {
306 const KEY: u64 = 2;
307 const ASSERTED_SIGNAL: zx::Signals = zx::Signals::USER_1;
308
309 let event = PortEvent::new();
310 let object = zx::Event::create();
311 assert_eq!(
312 event.object_wait_async(&object, KEY, ASSERTED_SIGNAL, zx::WaitAsyncOpts::empty()),
313 Ok(())
314 );
315 object
316 .signal_handle(
317 zx::Signals::NONE,
318 ASSERTED_SIGNAL,
319 )
320 .unwrap();
321
322 assert_eq!(
323 event.wait(zx::MonotonicInstant::INFINITE_PAST),
324 PortWaitResult::Signal { key: KEY, observed: ASSERTED_SIGNAL }
325 );
326 }
327
328 #[test]
329 fn test_signal_then_cancel_then_wait() {
330 const KEY: u64 = 3;
331 const ASSERTED_SIGNAL: zx::Signals = zx::Signals::USER_2;
332
333 let event = PortEvent::new();
334 let object = zx::Event::create();
335
336 assert_eq!(
337 event.object_wait_async(&object, KEY, ASSERTED_SIGNAL, zx::WaitAsyncOpts::empty()),
338 Ok(())
339 );
340 object
341 .signal_handle(
342 zx::Signals::NONE,
343 ASSERTED_SIGNAL,
344 )
345 .unwrap();
346
347 event.cancel(&object.as_handle_ref(), KEY);
348 assert_eq!(event.wait(zx::MonotonicInstant::INFINITE_PAST), PortWaitResult::TimedOut);
349 }
350
351 #[test_case(NotifyKind::Interrupt, true; "interrupt with object")]
352 #[test_case(NotifyKind::Regular, true; "not interrupt with object")]
353 #[test_case(NotifyKind::Interrupt, false; "interrupt without object")]
354 #[test_case(NotifyKind::Regular, false; "not interrupt without object")]
355 fn test_notify_and_wait_block(kind: NotifyKind, with_object: bool) {
356 const KEY: u64 = 4;
357
358 let event = Arc::new(PortEvent::new());
359 let object = zx::Event::create();
360 if with_object {
361 assert_eq!(
362 event.object_wait_async(
363 &object,
364 KEY,
365 zx::Signals::USER_3,
366 zx::WaitAsyncOpts::empty()
367 ),
368 Ok(())
369 );
370 }
371
372 let event_clone = event.clone();
373 let thread = std::thread::spawn(move || {
374 assert_eq!(
375 event_clone.wait(zx::MonotonicInstant::INFINITE),
376 PortWaitResult::Notification { kind }
377 );
378 });
379
380 event.notify(kind);
381 thread.join().expect("join thread");
382 }
383
384 #[test_case(NotifyKind::Interrupt, true; "interrupt with object")]
385 #[test_case(NotifyKind::Regular, true; "not interrupt with object")]
386 #[test_case(NotifyKind::Interrupt, false; "interrupt without object")]
387 #[test_case(NotifyKind::Regular, false; "not interrupt without object")]
388 fn test_notify_then_wait_nonblock(kind: NotifyKind, with_object: bool) {
389 const KEY: u64 = 5;
390
391 let event = PortEvent::new();
392 let object = zx::Event::create();
393 if with_object {
394 assert_eq!(
395 event.object_wait_async(
396 &object,
397 KEY,
398 zx::Signals::USER_4,
399 zx::WaitAsyncOpts::empty()
400 ),
401 Ok(())
402 );
403 }
404
405 event.notify(kind);
406 assert_eq!(
407 event.wait(zx::MonotonicInstant::INFINITE_PAST),
408 PortWaitResult::Notification { kind }
409 );
410 }
411
412 #[test_case(true, zx::MonotonicInstant::after(zx::MonotonicDuration::from_millis(100)); "blocking with object")]
413 #[test_case(false, zx::MonotonicInstant::after(zx::MonotonicDuration::from_millis(100)); "blocking without object")]
414 #[test_case(true, zx::MonotonicInstant::INFINITE_PAST; "non blocking with object")]
415 #[test_case(false, zx::MonotonicInstant::INFINITE_PAST; "non blocking without object")]
416 fn test_wait_timeout(with_object: bool, deadline: zx::MonotonicInstant) {
417 const KEY: u64 = 6;
418 const ASSERTED_SIGNAL: zx::Signals = zx::Signals::USER_5;
419
420 let event = PortEvent::new();
421 let object = zx::Event::create();
422
423 if with_object {
424 assert_eq!(
425 event.object_wait_async(&object, KEY, ASSERTED_SIGNAL, zx::WaitAsyncOpts::empty()),
426 Ok(())
427 );
428 }
429 assert_eq!(event.wait(deadline), PortWaitResult::TimedOut);
430 }
431}