1use std::sync::LazyLock;
6use std::sync::atomic::{AtomicBool, Ordering};
7
8const FUTEX_WAITING: i32 = 0;
12const FUTEX_NOTIFIED: i32 = 1;
16const FUTEX_INTERRUPTED: i32 = 2;
20const FUTEX_USE_PORT: i32 = 3;
24
25const ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE: Ordering = Ordering::Relaxed;
34
35#[derive(Debug)]
46pub struct PortEvent {
47 futex: zx::Futex,
53 port: LazyLock<zx::Port>,
59 has_pending_user_packet: AtomicBool,
62}
63
64#[derive(Clone, Copy, Debug, Eq, PartialEq)]
66pub enum NotifyKind {
67 Regular,
68 Interrupt,
69}
70
71#[derive(Debug, Eq, PartialEq)]
73pub enum PortWaitResult {
74 Signal { key: u64, observed: zx::Signals },
76 Notification { kind: NotifyKind },
78 TimedOut,
80}
81
82impl PortWaitResult {
83 const NOTIFY_REGULAR: Self = Self::Notification { kind: NotifyKind::Regular };
84 const NOTIFY_INTERRUPT: Self = Self::Notification { kind: NotifyKind::Interrupt };
85}
86
87impl PortEvent {
88 pub fn new() -> Self {
90 Self {
91 futex: zx::Futex::new(FUTEX_WAITING),
92 port: LazyLock::new(zx::Port::create),
93 has_pending_user_packet: Default::default(),
94 }
95 }
96
97 pub fn wait(&self, deadline: zx::MonotonicInstant) -> PortWaitResult {
99 let mut state = self.futex.load(ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE);
100 loop {
101 match state {
102 FUTEX_WAITING => match self.futex.wait(FUTEX_WAITING, None, deadline) {
103 Ok(()) | Err(zx::Status::BAD_STATE) => {
104 state = self.futex.load(ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE);
105 }
106 Err(zx::Status::TIMED_OUT) => {
107 return PortWaitResult::TimedOut;
108 }
109 Err(e) => panic!("Unexpected error from zx_futex_wait: {e}"),
110 },
111 FUTEX_NOTIFIED | FUTEX_INTERRUPTED => {
112 match self.futex.compare_exchange(
113 state,
114 FUTEX_WAITING,
115 ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE,
116 ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE,
117 ) {
118 Ok(new_state) => {
119 debug_assert_eq!(new_state, state);
120 return if new_state == FUTEX_INTERRUPTED {
121 PortWaitResult::NOTIFY_INTERRUPT
122 } else {
123 PortWaitResult::NOTIFY_REGULAR
124 };
125 }
126 Err(new_state) => {
127 debug_assert_ne!(new_state, state);
128 state = new_state;
129 }
130 }
131 }
132 FUTEX_USE_PORT => {
133 break;
134 }
135 state => unreachable!("unexpected value = {state}"),
136 }
137 }
138
139 match self.port.wait(deadline) {
140 Ok(packet) => match packet.status() {
141 zx::sys::ZX_OK => {
142 match packet.contents() {
143 zx::PacketContents::SignalOne(sigpkt) => PortWaitResult::Signal {
144 key: packet.key(),
145 observed: sigpkt.observed(),
146 },
147 zx::PacketContents::User(_) => {
148 assert!(
163 self.has_pending_user_packet
164 .swap(false, ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE)
165 );
166
167 PortWaitResult::NOTIFY_REGULAR
168 }
169 _contents => panic!("unexpected packet = {:?}", packet),
170 }
171 }
172 zx::sys::ZX_ERR_CANCELED => PortWaitResult::NOTIFY_INTERRUPT,
173 status => {
174 panic!("Unexpected status in port wait {}", status);
175 }
176 },
177 Err(zx::Status::TIMED_OUT) => PortWaitResult::TimedOut,
178 Err(e) => panic!("Unexpected error from port_wait: {e}"),
179 }
180 }
181
182 pub fn object_wait_async(
184 &self,
185 handle: &dyn zx::AsHandleRef,
186 key: u64,
187 signals: zx::Signals,
188 opts: zx::WaitAsyncOpts,
189 ) -> Result<(), zx::Status> {
190 match self.futex.swap(FUTEX_USE_PORT, ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE) {
191 FUTEX_WAITING => {
192 self.futex.wake_all();
193 }
194 state @ (FUTEX_NOTIFIED | FUTEX_INTERRUPTED) => {
195 self.queue_user_packet_data(if state == FUTEX_INTERRUPTED {
196 NotifyKind::Interrupt
197 } else {
198 NotifyKind::Regular
199 })
200 }
201 FUTEX_USE_PORT => {}
202 v => unreachable!("unexpected value = {v}"),
203 }
204
205 handle.wait_async_handle(&self.port, key, signals, opts)
206 }
207
208 pub fn cancel(&self, key: u64) {
210 let _: Result<(), zx::Status> = self.port.cancel(key);
211 }
212
213 fn queue_user_packet_data(&self, kind: NotifyKind) {
219 let status = match kind {
220 NotifyKind::Interrupt => zx::sys::ZX_ERR_CANCELED,
221 NotifyKind::Regular => {
222 if self
223 .has_pending_user_packet
224 .swap(true, ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE)
225 {
226 return;
227 }
228
229 zx::sys::ZX_OK
230 }
231 };
232
233 let packet = zx::Packet::from_user_packet(0, status, zx::UserPacket::default());
234 self.port.queue(&packet).unwrap()
235 }
236
237 pub fn notify(&self, kind: NotifyKind) {
240 let futex_val = match kind {
241 NotifyKind::Interrupt => FUTEX_INTERRUPTED,
242 NotifyKind::Regular => FUTEX_NOTIFIED,
243 };
244
245 match self.futex.compare_exchange(
246 FUTEX_WAITING,
247 futex_val,
248 ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE,
249 ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE,
250 ) {
251 Ok(observed) => {
252 debug_assert_eq!(observed, FUTEX_WAITING);
253 self.futex.wake_all();
254 }
255 Err(observed) => match observed {
256 FUTEX_WAITING => unreachable!("this should have passed"),
257 FUTEX_NOTIFIED | FUTEX_INTERRUPTED => {}
258 FUTEX_USE_PORT => {
259 self.queue_user_packet_data(kind);
260 }
261 observed => unreachable!("unexpected value = {observed}"),
262 },
263 }
264 }
265}
266
267#[cfg(test)]
268mod test {
269 use std::sync::Arc;
270
271 use test_case::test_case;
272
273 use super::*;
274
275 #[test]
276 fn test_signal_and_wait_block() {
277 const KEY: u64 = 1;
278 const ASSERTED_SIGNAL: zx::Signals = zx::Signals::USER_0;
279
280 let event = Arc::new(PortEvent::new());
281 let object = zx::Event::create();
282 assert_eq!(
283 event.object_wait_async(&object, KEY, ASSERTED_SIGNAL, zx::WaitAsyncOpts::empty()),
284 Ok(())
285 );
286
287 let event_clone = event.clone();
288 let thread = std::thread::spawn(move || {
289 assert_eq!(
290 event_clone.wait(zx::MonotonicInstant::INFINITE),
291 PortWaitResult::Signal { key: KEY, observed: ASSERTED_SIGNAL }
292 );
293 });
294
295 object.signal(zx::Signals::NONE, ASSERTED_SIGNAL).unwrap();
296 thread.join().expect("join thread");
297 }
298
299 #[test]
300 fn test_signal_then_wait_nonblock() {
301 const KEY: u64 = 2;
302 const ASSERTED_SIGNAL: zx::Signals = zx::Signals::USER_1;
303
304 let event = PortEvent::new();
305 let object = zx::Event::create();
306 assert_eq!(
307 event.object_wait_async(&object, KEY, ASSERTED_SIGNAL, zx::WaitAsyncOpts::empty()),
308 Ok(())
309 );
310 object.signal(zx::Signals::NONE, ASSERTED_SIGNAL).unwrap();
311
312 assert_eq!(
313 event.wait(zx::MonotonicInstant::INFINITE_PAST),
314 PortWaitResult::Signal { key: KEY, observed: ASSERTED_SIGNAL }
315 );
316 }
317
318 #[test]
319 fn test_signal_then_cancel_then_wait() {
320 const KEY: u64 = 3;
321 const ASSERTED_SIGNAL: zx::Signals = zx::Signals::USER_2;
322
323 let event = PortEvent::new();
324 let object = zx::Event::create();
325
326 assert_eq!(
327 event.object_wait_async(&object, KEY, ASSERTED_SIGNAL, zx::WaitAsyncOpts::empty()),
328 Ok(())
329 );
330 object.signal(zx::Signals::NONE, ASSERTED_SIGNAL).unwrap();
331
332 event.cancel(KEY);
333 assert_eq!(event.wait(zx::MonotonicInstant::INFINITE_PAST), PortWaitResult::TimedOut);
334 }
335
336 #[test_case(NotifyKind::Interrupt, true; "interrupt with object")]
337 #[test_case(NotifyKind::Regular, true; "not interrupt with object")]
338 #[test_case(NotifyKind::Interrupt, false; "interrupt without object")]
339 #[test_case(NotifyKind::Regular, false; "not interrupt without object")]
340 fn test_notify_and_wait_block(kind: NotifyKind, with_object: bool) {
341 const KEY: u64 = 4;
342
343 let event = Arc::new(PortEvent::new());
344 let object = zx::Event::create();
345 if with_object {
346 assert_eq!(
347 event.object_wait_async(
348 &object,
349 KEY,
350 zx::Signals::USER_3,
351 zx::WaitAsyncOpts::empty()
352 ),
353 Ok(())
354 );
355 }
356
357 let event_clone = event.clone();
358 let thread = std::thread::spawn(move || {
359 assert_eq!(
360 event_clone.wait(zx::MonotonicInstant::INFINITE),
361 PortWaitResult::Notification { kind }
362 );
363 });
364
365 event.notify(kind);
366 thread.join().expect("join thread");
367 }
368
369 #[test_case(NotifyKind::Interrupt, true; "interrupt with object")]
370 #[test_case(NotifyKind::Regular, true; "not interrupt with object")]
371 #[test_case(NotifyKind::Interrupt, false; "interrupt without object")]
372 #[test_case(NotifyKind::Regular, false; "not interrupt without object")]
373 fn test_notify_then_wait_nonblock(kind: NotifyKind, with_object: bool) {
374 const KEY: u64 = 5;
375
376 let event = PortEvent::new();
377 let object = zx::Event::create();
378 if with_object {
379 assert_eq!(
380 event.object_wait_async(
381 &object,
382 KEY,
383 zx::Signals::USER_4,
384 zx::WaitAsyncOpts::empty()
385 ),
386 Ok(())
387 );
388 }
389
390 event.notify(kind);
391 assert_eq!(
392 event.wait(zx::MonotonicInstant::INFINITE_PAST),
393 PortWaitResult::Notification { kind }
394 );
395 }
396
397 #[test_case(true, zx::MonotonicInstant::after(zx::MonotonicDuration::from_millis(100)); "blocking with object")]
398 #[test_case(false, zx::MonotonicInstant::after(zx::MonotonicDuration::from_millis(100)); "blocking without object")]
399 #[test_case(true, zx::MonotonicInstant::INFINITE_PAST; "non blocking with object")]
400 #[test_case(false, zx::MonotonicInstant::INFINITE_PAST; "non blocking without object")]
401 fn test_wait_timeout(with_object: bool, deadline: zx::MonotonicInstant) {
402 const KEY: u64 = 6;
403 const ASSERTED_SIGNAL: zx::Signals = zx::Signals::USER_5;
404
405 let event = PortEvent::new();
406 let object = zx::Event::create();
407
408 if with_object {
409 assert_eq!(
410 event.object_wait_async(&object, KEY, ASSERTED_SIGNAL, zx::WaitAsyncOpts::empty()),
411 Ok(())
412 );
413 }
414 assert_eq!(event.wait(deadline), PortWaitResult::TimedOut);
415 }
416}