1use std::sync::LazyLock;
6use std::sync::atomic::{AtomicBool, Ordering};
7
8const FUTEX_WAITING: i32 = 0;
12const FUTEX_NOTIFIED: i32 = 1;
16const FUTEX_INTERRUPTED: i32 = 2;
20const FUTEX_USE_PORT: i32 = 3;
24
25const ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE: Ordering = Ordering::Relaxed;
34
35#[derive(Debug)]
46pub struct PortEvent {
47 futex: zx::Futex,
53 port: LazyLock<zx::Port>,
59 has_pending_user_packet: AtomicBool,
62}
63
64#[derive(Clone, Copy, Debug, Eq, PartialEq)]
66pub enum NotifyKind {
67 Regular,
68 Interrupt,
69}
70
71#[derive(Debug, Eq, PartialEq)]
73pub enum PortWaitResult {
74 Signal { key: u64, observed: zx::Signals },
76 Notification { kind: NotifyKind },
78 TimedOut,
80}
81
82impl PortWaitResult {
83 const NOTIFY_REGULAR: Self = Self::Notification { kind: NotifyKind::Regular };
84 const NOTIFY_INTERRUPT: Self = Self::Notification { kind: NotifyKind::Interrupt };
85}
86
87impl PortEvent {
88 pub fn new() -> Self {
90 Self {
91 futex: zx::Futex::new(FUTEX_WAITING),
92 port: LazyLock::new(zx::Port::create),
93 has_pending_user_packet: Default::default(),
94 }
95 }
96
97 pub fn wait(&self, deadline: zx::MonotonicInstant) -> PortWaitResult {
99 let mut state = self.futex.load(ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE);
100 loop {
101 match state {
102 FUTEX_WAITING => match self.futex.wait(FUTEX_WAITING, None, deadline) {
103 Ok(()) | Err(zx::Status::BAD_STATE) => {
104 state = self.futex.load(ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE);
105 }
106 Err(zx::Status::TIMED_OUT) => {
107 return PortWaitResult::TimedOut;
108 }
109 Err(e) => panic!("Unexpected error from zx_futex_wait: {e}"),
110 },
111 FUTEX_NOTIFIED | FUTEX_INTERRUPTED => {
112 match self.futex.compare_exchange(
113 state,
114 FUTEX_WAITING,
115 ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE,
116 ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE,
117 ) {
118 Ok(new_state) => {
119 debug_assert_eq!(new_state, state);
120 return if new_state == FUTEX_INTERRUPTED {
121 PortWaitResult::NOTIFY_INTERRUPT
122 } else {
123 PortWaitResult::NOTIFY_REGULAR
124 };
125 }
126 Err(new_state) => {
127 debug_assert_ne!(new_state, state);
128 state = new_state;
129 }
130 }
131 }
132 FUTEX_USE_PORT => {
133 break;
134 }
135 state => unreachable!("unexpected value = {state}"),
136 }
137 }
138
139 match self.port.wait(deadline) {
140 Ok(packet) => match packet.status() {
141 zx::sys::ZX_OK => {
142 match packet.contents() {
143 zx::PacketContents::SignalOne(sigpkt) => PortWaitResult::Signal {
144 key: packet.key(),
145 observed: sigpkt.observed(),
146 },
147 zx::PacketContents::User(_) => {
148 assert!(
163 self.has_pending_user_packet
164 .swap(false, ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE)
165 );
166
167 PortWaitResult::NOTIFY_REGULAR
168 }
169 _contents => panic!("unexpected packet = {:?}", packet),
170 }
171 }
172 zx::sys::ZX_ERR_CANCELED => PortWaitResult::NOTIFY_INTERRUPT,
173 status => {
174 panic!("Unexpected status in port wait {}", status);
175 }
176 },
177 Err(zx::Status::TIMED_OUT) => PortWaitResult::TimedOut,
178 Err(e) => panic!("Unexpected error from port_wait: {e}"),
179 }
180 }
181
182 pub fn object_wait_async(
184 &self,
185 handle: &dyn zx::AsHandleRef,
186 key: u64,
187 signals: zx::Signals,
188 opts: zx::WaitAsyncOpts,
189 ) -> Result<(), zx::Status> {
190 match self.futex.swap(FUTEX_USE_PORT, ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE) {
191 FUTEX_WAITING => {
192 self.futex.wake_all();
193 }
194 state @ (FUTEX_NOTIFIED | FUTEX_INTERRUPTED) => {
195 self.queue_user_packet_data(if state == FUTEX_INTERRUPTED {
196 NotifyKind::Interrupt
197 } else {
198 NotifyKind::Regular
199 })
200 }
201 FUTEX_USE_PORT => {}
202 v => unreachable!("unexpected value = {v}"),
203 }
204
205 handle.wait_async_handle(&self.port, key, signals, opts)
206 }
207
208 pub fn cancel(&self, key: u64) {
210 let _: Result<(), zx::Status> = self.port.cancel(key);
211 }
212
213 fn queue_user_packet_data(&self, kind: NotifyKind) {
219 let status = match kind {
220 NotifyKind::Interrupt => zx::sys::ZX_ERR_CANCELED,
221 NotifyKind::Regular => {
222 if self
223 .has_pending_user_packet
224 .swap(true, ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE)
225 {
226 return;
227 }
228
229 zx::sys::ZX_OK
230 }
231 };
232
233 let packet = zx::Packet::from_user_packet(0, status, zx::UserPacket::default());
234 self.port.queue(&packet).unwrap()
235 }
236
237 pub fn notify(&self, kind: NotifyKind) {
240 let futex_val = match kind {
241 NotifyKind::Interrupt => FUTEX_INTERRUPTED,
242 NotifyKind::Regular => FUTEX_NOTIFIED,
243 };
244
245 match self.futex.compare_exchange(
246 FUTEX_WAITING,
247 futex_val,
248 ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE,
249 ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE,
250 ) {
251 Ok(observed) => {
252 debug_assert_eq!(observed, FUTEX_WAITING);
253 self.futex.wake_all();
254 }
255 Err(observed) => match observed {
256 FUTEX_WAITING => unreachable!("this should have passed"),
257 FUTEX_NOTIFIED | FUTEX_INTERRUPTED => {}
258 FUTEX_USE_PORT => {
259 self.queue_user_packet_data(kind);
260 }
261 observed => unreachable!("unexpected value = {observed}"),
262 },
263 }
264 }
265}
266
267#[cfg(test)]
268mod test {
269 use std::sync::Arc;
270
271 use test_case::test_case;
272 use zx::AsHandleRef as _;
273
274 use super::*;
275
276 #[test]
277 fn test_signal_and_wait_block() {
278 const KEY: u64 = 1;
279 const ASSERTED_SIGNAL: zx::Signals = zx::Signals::USER_0;
280
281 let event = Arc::new(PortEvent::new());
282 let object = zx::Event::create();
283 assert_eq!(
284 event.object_wait_async(&object, KEY, ASSERTED_SIGNAL, zx::WaitAsyncOpts::empty()),
285 Ok(())
286 );
287
288 let event_clone = event.clone();
289 let thread = std::thread::spawn(move || {
290 assert_eq!(
291 event_clone.wait(zx::MonotonicInstant::INFINITE),
292 PortWaitResult::Signal { key: KEY, observed: ASSERTED_SIGNAL }
293 );
294 });
295
296 object
297 .signal_handle(
298 zx::Signals::NONE,
299 ASSERTED_SIGNAL,
300 )
301 .unwrap();
302 thread.join().expect("join thread");
303 }
304
305 #[test]
306 fn test_signal_then_wait_nonblock() {
307 const KEY: u64 = 2;
308 const ASSERTED_SIGNAL: zx::Signals = zx::Signals::USER_1;
309
310 let event = PortEvent::new();
311 let object = zx::Event::create();
312 assert_eq!(
313 event.object_wait_async(&object, KEY, ASSERTED_SIGNAL, zx::WaitAsyncOpts::empty()),
314 Ok(())
315 );
316 object
317 .signal_handle(
318 zx::Signals::NONE,
319 ASSERTED_SIGNAL,
320 )
321 .unwrap();
322
323 assert_eq!(
324 event.wait(zx::MonotonicInstant::INFINITE_PAST),
325 PortWaitResult::Signal { key: KEY, observed: ASSERTED_SIGNAL }
326 );
327 }
328
329 #[test]
330 fn test_signal_then_cancel_then_wait() {
331 const KEY: u64 = 3;
332 const ASSERTED_SIGNAL: zx::Signals = zx::Signals::USER_2;
333
334 let event = PortEvent::new();
335 let object = zx::Event::create();
336
337 assert_eq!(
338 event.object_wait_async(&object, KEY, ASSERTED_SIGNAL, zx::WaitAsyncOpts::empty()),
339 Ok(())
340 );
341 object
342 .signal_handle(
343 zx::Signals::NONE,
344 ASSERTED_SIGNAL,
345 )
346 .unwrap();
347
348 event.cancel(KEY);
349 assert_eq!(event.wait(zx::MonotonicInstant::INFINITE_PAST), PortWaitResult::TimedOut);
350 }
351
352 #[test_case(NotifyKind::Interrupt, true; "interrupt with object")]
353 #[test_case(NotifyKind::Regular, true; "not interrupt with object")]
354 #[test_case(NotifyKind::Interrupt, false; "interrupt without object")]
355 #[test_case(NotifyKind::Regular, false; "not interrupt without object")]
356 fn test_notify_and_wait_block(kind: NotifyKind, with_object: bool) {
357 const KEY: u64 = 4;
358
359 let event = Arc::new(PortEvent::new());
360 let object = zx::Event::create();
361 if with_object {
362 assert_eq!(
363 event.object_wait_async(
364 &object,
365 KEY,
366 zx::Signals::USER_3,
367 zx::WaitAsyncOpts::empty()
368 ),
369 Ok(())
370 );
371 }
372
373 let event_clone = event.clone();
374 let thread = std::thread::spawn(move || {
375 assert_eq!(
376 event_clone.wait(zx::MonotonicInstant::INFINITE),
377 PortWaitResult::Notification { kind }
378 );
379 });
380
381 event.notify(kind);
382 thread.join().expect("join thread");
383 }
384
385 #[test_case(NotifyKind::Interrupt, true; "interrupt with object")]
386 #[test_case(NotifyKind::Regular, true; "not interrupt with object")]
387 #[test_case(NotifyKind::Interrupt, false; "interrupt without object")]
388 #[test_case(NotifyKind::Regular, false; "not interrupt without object")]
389 fn test_notify_then_wait_nonblock(kind: NotifyKind, with_object: bool) {
390 const KEY: u64 = 5;
391
392 let event = PortEvent::new();
393 let object = zx::Event::create();
394 if with_object {
395 assert_eq!(
396 event.object_wait_async(
397 &object,
398 KEY,
399 zx::Signals::USER_4,
400 zx::WaitAsyncOpts::empty()
401 ),
402 Ok(())
403 );
404 }
405
406 event.notify(kind);
407 assert_eq!(
408 event.wait(zx::MonotonicInstant::INFINITE_PAST),
409 PortWaitResult::Notification { kind }
410 );
411 }
412
413 #[test_case(true, zx::MonotonicInstant::after(zx::MonotonicDuration::from_millis(100)); "blocking with object")]
414 #[test_case(false, zx::MonotonicInstant::after(zx::MonotonicDuration::from_millis(100)); "blocking without object")]
415 #[test_case(true, zx::MonotonicInstant::INFINITE_PAST; "non blocking with object")]
416 #[test_case(false, zx::MonotonicInstant::INFINITE_PAST; "non blocking without object")]
417 fn test_wait_timeout(with_object: bool, deadline: zx::MonotonicInstant) {
418 const KEY: u64 = 6;
419 const ASSERTED_SIGNAL: zx::Signals = zx::Signals::USER_5;
420
421 let event = PortEvent::new();
422 let object = zx::Event::create();
423
424 if with_object {
425 assert_eq!(
426 event.object_wait_async(&object, KEY, ASSERTED_SIGNAL, zx::WaitAsyncOpts::empty()),
427 Ok(())
428 );
429 }
430 assert_eq!(event.wait(deadline), PortWaitResult::TimedOut);
431 }
432}