starnix_sync/
port_event.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::sync::LazyLock;
6use std::sync::atomic::{AtomicBool, Ordering};
7
8/// A [`PortEvent`] is interested only in events originating from within the
9/// process (see [`PortEvent.futex`] for more details), and the waiter is
10/// may be notified up.
11const FUTEX_WAITING: i32 = 0;
12/// A [`PortEvent`] is interested only in events originating from within the
13/// process (see [`PortEvent.futex`] for more details), and the waiter is
14/// has been notified of an event.
15const FUTEX_NOTIFIED: i32 = 1;
16/// A [`PortEvent`] is interested only in events originating from within the
17/// process (see [`PortEvent.futex`] for more details), and the waiter is
18/// has been notified of an interrupt.
19const FUTEX_INTERRUPTED: i32 = 2;
20/// A [`PortEvent`] is interested in events originating from outside of
21/// process (see [`PortEvent.futex`] for more details). The waiter's `zx::Port`
22/// should be used instead of the Futex.
23const FUTEX_USE_PORT: i32 = 3;
24
25/// Specifies the ordering for atomics accessed by both the "notifier" and
26/// "notifee" (the waiter).
27///
28/// Relaxed ordering because the [`PortEvent`] does not provide synchronization
29/// between the "notifier" and the "notifee". If a notifiee needs synchronization,
30/// it needs to perform that synchronization itself.
31///
32/// See [`PortEvent.wait`] for more details.
33const ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE: Ordering = Ordering::Relaxed;
34
35/// A wrapper around a [`zx::Port`] that optimizes for the case where events are
36/// signaled within a process.
37///
38/// This object will prefer to use a Futex for notifications/interrupts but will
39/// fallback to a `zx::Port` when the port is subscribed for events on an object
40/// with [`PortEvent.object_wait_async`].
41///
42/// Note that the `PortEvent` does not provide any synchronization between a
43/// notifier (caller of [`PortEvent.notify`]) and a notifiee/waiter (caller of
44/// [`PortEvent.wait`].
45#[derive(Debug)]
46pub struct PortEvent {
47    /// The Futex used to wake up a thread when this waiter is waiting for
48    /// events that don't depend on a `zx::Port`.
49    ///
50    /// This mode is subject to spurious wakeups.
51    /// See more in https://fuchsia.dev/reference/syscalls/futex_wait?hl=en#spurious_wakeups
52    futex: zx::Futex,
53    /// The underlying Zircon port that the waiter waits on when it is
54    /// interested in events that cross process boundaries.
55    ///
56    /// Lazily allocated to optimize for the case where waiters are interested
57    /// only in events triggered within a process.
58    port: LazyLock<zx::Port>,
59    /// Indicates whether a user packet is sitting in the `zx::Port` to wake up
60    /// waiter after handling user events.
61    has_pending_user_packet: AtomicBool,
62}
63
64/// The kind of notification.
65#[derive(Clone, Copy, Debug, Eq, PartialEq)]
66pub enum NotifyKind {
67    Regular,
68    Interrupt,
69}
70
71/// The result of a call to [`PortEvent.wait`].
72#[derive(Debug, Eq, PartialEq)]
73pub enum PortWaitResult {
74    /// Signals asserted on an object.
75    Signal { key: u64, observed: zx::Signals },
76    /// A notification to wake up waiters.
77    Notification { kind: NotifyKind },
78    /// Wait timed out.
79    TimedOut,
80}
81
82impl PortWaitResult {
83    const NOTIFY_REGULAR: Self = Self::Notification { kind: NotifyKind::Regular };
84    const NOTIFY_INTERRUPT: Self = Self::Notification { kind: NotifyKind::Interrupt };
85}
86
87impl PortEvent {
88    /// Returns a new `PortEvent`.
89    pub fn new() -> Self {
90        Self {
91            futex: zx::Futex::new(FUTEX_WAITING),
92            port: LazyLock::new(zx::Port::create),
93            has_pending_user_packet: Default::default(),
94        }
95    }
96
97    /// Wait for an event to occur, or the deadline has been reached.
98    pub fn wait(&self, deadline: zx::MonotonicInstant) -> PortWaitResult {
99        let mut state = self.futex.load(ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE);
100        loop {
101            match state {
102                FUTEX_WAITING => match self.futex.wait(FUTEX_WAITING, None, deadline) {
103                    Ok(()) | Err(zx::Status::BAD_STATE) => {
104                        state = self.futex.load(ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE);
105                    }
106                    Err(zx::Status::TIMED_OUT) => {
107                        return PortWaitResult::TimedOut;
108                    }
109                    Err(e) => panic!("Unexpected error from zx_futex_wait: {e}"),
110                },
111                FUTEX_NOTIFIED | FUTEX_INTERRUPTED => {
112                    match self.futex.compare_exchange(
113                        state,
114                        FUTEX_WAITING,
115                        ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE,
116                        ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE,
117                    ) {
118                        Ok(new_state) => {
119                            debug_assert_eq!(new_state, state);
120                            return if new_state == FUTEX_INTERRUPTED {
121                                PortWaitResult::NOTIFY_INTERRUPT
122                            } else {
123                                PortWaitResult::NOTIFY_REGULAR
124                            };
125                        }
126                        Err(new_state) => {
127                            debug_assert_ne!(new_state, state);
128                            state = new_state;
129                        }
130                    }
131                }
132                FUTEX_USE_PORT => {
133                    break;
134                }
135                state => unreachable!("unexpected value = {state}"),
136            }
137        }
138
139        match self.port.wait(deadline) {
140            Ok(packet) => match packet.status() {
141                zx::sys::ZX_OK => {
142                    match packet.contents() {
143                        zx::PacketContents::SignalOne(sigpkt) => PortWaitResult::Signal {
144                            key: packet.key(),
145                            observed: sigpkt.observed(),
146                        },
147                        zx::PacketContents::User(_) => {
148                            // User packet w/ OK status is only used to wake up
149                            // the waiter after handling process-internal events.
150                            //
151                            // Note that we can be woken up even when we will
152                            // not handle any user events. This is because right
153                            // after we set `has_pending_user_packet` to `false`,
154                            // another thread can immediately queue a new user
155                            // event and set `has_pending_user_packet` to `true`.
156                            // However, that event will be handled by us (by the
157                            // caller when this method returns) as if the event
158                            // was enqueued before we received this user packet.
159                            // Once the caller handles all the current user events,
160                            // we end up with no remaining user events but a user
161                            // packet sitting in the `zx::Port`.
162                            assert!(
163                                self.has_pending_user_packet
164                                    .swap(false, ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE)
165                            );
166
167                            PortWaitResult::NOTIFY_REGULAR
168                        }
169                        _contents => panic!("unexpected packet = {:?}", packet),
170                    }
171                }
172                zx::sys::ZX_ERR_CANCELED => PortWaitResult::NOTIFY_INTERRUPT,
173                status => {
174                    panic!("Unexpected status in port wait {}", status);
175                }
176            },
177            Err(zx::Status::TIMED_OUT) => PortWaitResult::TimedOut,
178            Err(e) => panic!("Unexpected error from port_wait: {e}"),
179        }
180    }
181
182    /// Subscribe for signals on an object.
183    pub fn object_wait_async(
184        &self,
185        handle: &dyn zx::AsHandleRef,
186        key: u64,
187        signals: zx::Signals,
188        opts: zx::WaitAsyncOpts,
189    ) -> Result<(), zx::Status> {
190        match self.futex.swap(FUTEX_USE_PORT, ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE) {
191            FUTEX_WAITING => {
192                self.futex.wake_all();
193            }
194            state @ (FUTEX_NOTIFIED | FUTEX_INTERRUPTED) => {
195                self.queue_user_packet_data(if state == FUTEX_INTERRUPTED {
196                    NotifyKind::Interrupt
197                } else {
198                    NotifyKind::Regular
199                })
200            }
201            FUTEX_USE_PORT => {}
202            v => unreachable!("unexpected value = {v}"),
203        }
204
205        handle.wait_async_handle(&self.port, key, signals, opts)
206    }
207
208    /// Cancels async port notifications on an object.
209    pub fn cancel(&self, key: u64) {
210        let _: Result<(), zx::Status> = self.port.cancel(key);
211    }
212
213    /// Queue a packet to the underlying Zircon port, which will cause the
214    /// waiter to wake up.
215    ///
216    /// This method should only be called when the waiter is interested in
217    /// events that may originate from outside of the process.
218    fn queue_user_packet_data(&self, kind: NotifyKind) {
219        let status = match kind {
220            NotifyKind::Interrupt => zx::sys::ZX_ERR_CANCELED,
221            NotifyKind::Regular => {
222                if self
223                    .has_pending_user_packet
224                    .swap(true, ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE)
225                {
226                    return;
227                }
228
229                zx::sys::ZX_OK
230            }
231        };
232
233        let packet = zx::Packet::from_user_packet(0, status, zx::UserPacket::default());
234        self.port.queue(&packet).unwrap()
235    }
236
237    /// Marks the port as ready to handle a notification (or an interrupt) and
238    /// wakes up any blocked waiters.
239    pub fn notify(&self, kind: NotifyKind) {
240        let futex_val = match kind {
241            NotifyKind::Interrupt => FUTEX_INTERRUPTED,
242            NotifyKind::Regular => FUTEX_NOTIFIED,
243        };
244
245        match self.futex.compare_exchange(
246            FUTEX_WAITING,
247            futex_val,
248            ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE,
249            ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE,
250        ) {
251            Ok(observed) => {
252                debug_assert_eq!(observed, FUTEX_WAITING);
253                self.futex.wake_all();
254            }
255            Err(observed) => match observed {
256                FUTEX_WAITING => unreachable!("this should have passed"),
257                FUTEX_NOTIFIED | FUTEX_INTERRUPTED => {}
258                FUTEX_USE_PORT => {
259                    self.queue_user_packet_data(kind);
260                }
261                observed => unreachable!("unexpected value = {observed}"),
262            },
263        }
264    }
265}
266
267#[cfg(test)]
268mod test {
269    use std::sync::Arc;
270
271    use test_case::test_case;
272    use zx::AsHandleRef as _;
273
274    use super::*;
275
276    #[test]
277    fn test_signal_and_wait_block() {
278        const KEY: u64 = 1;
279        const ASSERTED_SIGNAL: zx::Signals = zx::Signals::USER_0;
280
281        let event = Arc::new(PortEvent::new());
282        let object = zx::Event::create();
283        assert_eq!(
284            event.object_wait_async(&object, KEY, ASSERTED_SIGNAL, zx::WaitAsyncOpts::empty()),
285            Ok(())
286        );
287
288        let event_clone = event.clone();
289        let thread = std::thread::spawn(move || {
290            assert_eq!(
291                event_clone.wait(zx::MonotonicInstant::INFINITE),
292                PortWaitResult::Signal { key: KEY, observed: ASSERTED_SIGNAL }
293            );
294        });
295
296        object
297            .signal_handle(
298                /*clear_mask=*/ zx::Signals::NONE,
299                /*set_mask=*/ ASSERTED_SIGNAL,
300            )
301            .unwrap();
302        thread.join().expect("join thread");
303    }
304
305    #[test]
306    fn test_signal_then_wait_nonblock() {
307        const KEY: u64 = 2;
308        const ASSERTED_SIGNAL: zx::Signals = zx::Signals::USER_1;
309
310        let event = PortEvent::new();
311        let object = zx::Event::create();
312        assert_eq!(
313            event.object_wait_async(&object, KEY, ASSERTED_SIGNAL, zx::WaitAsyncOpts::empty()),
314            Ok(())
315        );
316        object
317            .signal_handle(
318                /*clear_mask=*/ zx::Signals::NONE,
319                /*set_mask=*/ ASSERTED_SIGNAL,
320            )
321            .unwrap();
322
323        assert_eq!(
324            event.wait(zx::MonotonicInstant::INFINITE_PAST),
325            PortWaitResult::Signal { key: KEY, observed: ASSERTED_SIGNAL }
326        );
327    }
328
329    #[test]
330    fn test_signal_then_cancel_then_wait() {
331        const KEY: u64 = 3;
332        const ASSERTED_SIGNAL: zx::Signals = zx::Signals::USER_2;
333
334        let event = PortEvent::new();
335        let object = zx::Event::create();
336
337        assert_eq!(
338            event.object_wait_async(&object, KEY, ASSERTED_SIGNAL, zx::WaitAsyncOpts::empty()),
339            Ok(())
340        );
341        object
342            .signal_handle(
343                /*clear_mask=*/ zx::Signals::NONE,
344                /*set_mask=*/ ASSERTED_SIGNAL,
345            )
346            .unwrap();
347
348        event.cancel(KEY);
349        assert_eq!(event.wait(zx::MonotonicInstant::INFINITE_PAST), PortWaitResult::TimedOut);
350    }
351
352    #[test_case(NotifyKind::Interrupt, true; "interrupt with object")]
353    #[test_case(NotifyKind::Regular, true; "not interrupt with object")]
354    #[test_case(NotifyKind::Interrupt, false; "interrupt without object")]
355    #[test_case(NotifyKind::Regular, false; "not interrupt without object")]
356    fn test_notify_and_wait_block(kind: NotifyKind, with_object: bool) {
357        const KEY: u64 = 4;
358
359        let event = Arc::new(PortEvent::new());
360        let object = zx::Event::create();
361        if with_object {
362            assert_eq!(
363                event.object_wait_async(
364                    &object,
365                    KEY,
366                    zx::Signals::USER_3,
367                    zx::WaitAsyncOpts::empty()
368                ),
369                Ok(())
370            );
371        }
372
373        let event_clone = event.clone();
374        let thread = std::thread::spawn(move || {
375            assert_eq!(
376                event_clone.wait(zx::MonotonicInstant::INFINITE),
377                PortWaitResult::Notification { kind }
378            );
379        });
380
381        event.notify(kind);
382        thread.join().expect("join thread");
383    }
384
385    #[test_case(NotifyKind::Interrupt, true; "interrupt with object")]
386    #[test_case(NotifyKind::Regular, true; "not interrupt with object")]
387    #[test_case(NotifyKind::Interrupt, false; "interrupt without object")]
388    #[test_case(NotifyKind::Regular, false; "not interrupt without object")]
389    fn test_notify_then_wait_nonblock(kind: NotifyKind, with_object: bool) {
390        const KEY: u64 = 5;
391
392        let event = PortEvent::new();
393        let object = zx::Event::create();
394        if with_object {
395            assert_eq!(
396                event.object_wait_async(
397                    &object,
398                    KEY,
399                    zx::Signals::USER_4,
400                    zx::WaitAsyncOpts::empty()
401                ),
402                Ok(())
403            );
404        }
405
406        event.notify(kind);
407        assert_eq!(
408            event.wait(zx::MonotonicInstant::INFINITE_PAST),
409            PortWaitResult::Notification { kind }
410        );
411    }
412
413    #[test_case(true, zx::MonotonicInstant::after(zx::MonotonicDuration::from_millis(100)); "blocking with object")]
414    #[test_case(false, zx::MonotonicInstant::after(zx::MonotonicDuration::from_millis(100)); "blocking without object")]
415    #[test_case(true, zx::MonotonicInstant::INFINITE_PAST; "non blocking with object")]
416    #[test_case(false, zx::MonotonicInstant::INFINITE_PAST; "non blocking without object")]
417    fn test_wait_timeout(with_object: bool, deadline: zx::MonotonicInstant) {
418        const KEY: u64 = 6;
419        const ASSERTED_SIGNAL: zx::Signals = zx::Signals::USER_5;
420
421        let event = PortEvent::new();
422        let object = zx::Event::create();
423
424        if with_object {
425            assert_eq!(
426                event.object_wait_async(&object, KEY, ASSERTED_SIGNAL, zx::WaitAsyncOpts::empty()),
427                Ok(())
428            );
429        }
430        assert_eq!(event.wait(deadline), PortWaitResult::TimedOut);
431    }
432}