starnix_sync/
port_event.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::LazyLock;
7
8/// A [`PortEvent`] is interested only in events originating from within the
9/// process (see [`PortEvent.futex`] for more details), and the waiter is
10/// may be notified up.
11const FUTEX_WAITING: i32 = 0;
12/// A [`PortEvent`] is interested only in events originating from within the
13/// process (see [`PortEvent.futex`] for more details), and the waiter is
14/// has been notified of an event.
15const FUTEX_NOTIFIED: i32 = 1;
16/// A [`PortEvent`] is interested only in events originating from within the
17/// process (see [`PortEvent.futex`] for more details), and the waiter is
18/// has been notified of an interrupt.
19const FUTEX_INTERRUPTED: i32 = 2;
20/// A [`PortEvent`] is interested in events originating from outside of
21/// process (see [`PortEvent.futex`] for more details). The waiter's `zx::Port`
22/// should be used instead of the Futex.
23const FUTEX_USE_PORT: i32 = 3;
24
25/// Specifies the ordering for atomics accessed by both the "notifier" and
26/// "notifee" (the waiter).
27///
28/// Relaxed ordering because the [`PortEvent`] does not provide synchronization
29/// between the "notifier" and the "notifee". If a notifiee needs synchronization,
30/// it needs to perform that synchronization itself.
31///
32/// See [`PortEvent.wait`] for more details.
33const ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE: Ordering = Ordering::Relaxed;
34
35/// A wrapper around a [`zx::Port`] that optimizes for the case where events are
36/// signaled within a process.
37///
38/// This object will prefer to use a Futex for notifications/interrupts but will
39/// fallback to a `zx::Port` when the port is subscribed for events on an object
40/// with [`PortEvent.object_wait_async`].
41///
42/// Note that the `PortEvent` does not provide any synchronization between a
43/// notifier (caller of [`PortEvent.notify`]) and a notifiee/waiter (caller of
44/// [`PortEvent.wait`].
45#[derive(Debug)]
46pub struct PortEvent {
47    /// The Futex used to wake up a thread when this waiter is waiting for
48    /// events that don't depend on a `zx::Port`.
49    ///
50    /// This mode is subject to spurious wakeups.
51    /// See more in https://fuchsia.dev/reference/syscalls/futex_wait?hl=en#spurious_wakeups
52    futex: zx::Futex,
53    /// The underlying Zircon port that the waiter waits on when it is
54    /// interested in events that cross process boundaries.
55    ///
56    /// Lazily allocated to optimize for the case where waiters are interested
57    /// only in events triggered within a process.
58    port: LazyLock<zx::Port>,
59    /// Indicates whether a user packet is sitting in the `zx::Port` to wake up
60    /// waiter after handling user events.
61    has_pending_user_packet: AtomicBool,
62}
63
64/// The kind of notification.
65#[derive(Clone, Copy, Debug, Eq, PartialEq)]
66pub enum NotifyKind {
67    Regular,
68    Interrupt,
69}
70
71/// The result of a call to [`PortEvent.wait`].
72#[derive(Debug, Eq, PartialEq)]
73pub enum PortWaitResult {
74    /// Signals asserted on an object.
75    Signal { key: u64, observed: zx::Signals },
76    /// A notification to wake up waiters.
77    Notification { kind: NotifyKind },
78    /// Wait timed out.
79    TimedOut,
80}
81
82impl PortWaitResult {
83    const NOTIFY_REGULAR: Self = Self::Notification { kind: NotifyKind::Regular };
84    const NOTIFY_INTERRUPT: Self = Self::Notification { kind: NotifyKind::Interrupt };
85}
86
87impl PortEvent {
88    /// Returns a new `PortEvent`.
89    pub fn new() -> Self {
90        Self {
91            futex: zx::Futex::new(FUTEX_WAITING),
92            port: LazyLock::new(zx::Port::create),
93            has_pending_user_packet: Default::default(),
94        }
95    }
96
97    /// Wait for an event to occur, or the deadline has been reached.
98    pub fn wait(&self, deadline: zx::MonotonicInstant) -> PortWaitResult {
99        let mut state = self.futex.load(ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE);
100        loop {
101            match state {
102                FUTEX_WAITING => match self.futex.wait(FUTEX_WAITING, None, deadline) {
103                    Ok(()) | Err(zx::Status::BAD_STATE) => {
104                        state = self.futex.load(ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE);
105                    }
106                    Err(zx::Status::TIMED_OUT) => {
107                        return PortWaitResult::TimedOut;
108                    }
109                    Err(e) => panic!("Unexpected error from zx_futex_wait: {e}"),
110                },
111                FUTEX_NOTIFIED | FUTEX_INTERRUPTED => {
112                    match self.futex.compare_exchange(
113                        state,
114                        FUTEX_WAITING,
115                        ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE,
116                        ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE,
117                    ) {
118                        Ok(new_state) => {
119                            debug_assert_eq!(new_state, state);
120                            return if new_state == FUTEX_INTERRUPTED {
121                                PortWaitResult::NOTIFY_INTERRUPT
122                            } else {
123                                PortWaitResult::NOTIFY_REGULAR
124                            };
125                        }
126                        Err(new_state) => {
127                            debug_assert_ne!(new_state, state);
128                            state = new_state;
129                        }
130                    }
131                }
132                FUTEX_USE_PORT => {
133                    break;
134                }
135                state => unreachable!("unexpected value = {state}"),
136            }
137        }
138
139        match self.port.wait(deadline) {
140            Ok(packet) => match packet.status() {
141                zx::sys::ZX_OK => {
142                    match packet.contents() {
143                        zx::PacketContents::SignalOne(sigpkt) => PortWaitResult::Signal {
144                            key: packet.key(),
145                            observed: sigpkt.observed(),
146                        },
147                        zx::PacketContents::User(_) => {
148                            // User packet w/ OK status is only used to wake up
149                            // the waiter after handling process-internal events.
150                            //
151                            // Note that we can be woken up even when we will
152                            // not handle any user events. This is because right
153                            // after we set `has_pending_user_packet` to `false`,
154                            // another thread can immediately queue a new user
155                            // event and set `has_pending_user_packet` to `true`.
156                            // However, that event will be handled by us (by the
157                            // caller when this method returns) as if the event
158                            // was enqueued before we received this user packet.
159                            // Once the caller handles all the current user events,
160                            // we end up with no remaining user events but a user
161                            // packet sitting in the `zx::Port`.
162                            assert!(self
163                                .has_pending_user_packet
164                                .swap(false, ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE));
165
166                            PortWaitResult::NOTIFY_REGULAR
167                        }
168                        _contents => panic!("unexpected packet = {:?}", packet),
169                    }
170                }
171                zx::sys::ZX_ERR_CANCELED => PortWaitResult::NOTIFY_INTERRUPT,
172                status => {
173                    panic!("Unexpected status in port wait {}", status);
174                }
175            },
176            Err(zx::Status::TIMED_OUT) => PortWaitResult::TimedOut,
177            Err(e) => panic!("Unexpected error from port_wait: {e}"),
178        }
179    }
180
181    /// Subscribe for signals on an object.
182    pub fn object_wait_async(
183        &self,
184        handle: &dyn zx::AsHandleRef,
185        key: u64,
186        signals: zx::Signals,
187        opts: zx::WaitAsyncOpts,
188    ) -> Result<(), zx::Status> {
189        match self.futex.swap(FUTEX_USE_PORT, ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE) {
190            FUTEX_WAITING => {
191                self.futex.wake_all();
192            }
193            state @ (FUTEX_NOTIFIED | FUTEX_INTERRUPTED) => {
194                self.queue_user_packet_data(if state == FUTEX_INTERRUPTED {
195                    NotifyKind::Interrupt
196                } else {
197                    NotifyKind::Regular
198                })
199            }
200            FUTEX_USE_PORT => {}
201            v => unreachable!("unexpected value = {v}"),
202        }
203
204        handle.wait_async_handle(&self.port, key, signals, opts)
205    }
206
207    /// Cancels async port notifications on an object.
208    pub fn cancel(&self, handle: &zx::HandleRef<'_>, key: u64) {
209        let _: Result<(), zx::Status> = self.port.cancel(handle, key);
210    }
211
212    /// Queue a packet to the underlying Zircon port, which will cause the
213    /// waiter to wake up.
214    ///
215    /// This method should only be called when the waiter is interested in
216    /// events that may originate from outside of the process.
217    fn queue_user_packet_data(&self, kind: NotifyKind) {
218        let status = match kind {
219            NotifyKind::Interrupt => zx::sys::ZX_ERR_CANCELED,
220            NotifyKind::Regular => {
221                if self
222                    .has_pending_user_packet
223                    .swap(true, ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE)
224                {
225                    return;
226                }
227
228                zx::sys::ZX_OK
229            }
230        };
231
232        let packet = zx::Packet::from_user_packet(0, status, zx::UserPacket::default());
233        self.port.queue(&packet).unwrap()
234    }
235
236    /// Marks the port as ready to handle a notification (or an interrupt) and
237    /// wakes up any blocked waiters.
238    pub fn notify(&self, kind: NotifyKind) {
239        let futex_val = match kind {
240            NotifyKind::Interrupt => FUTEX_INTERRUPTED,
241            NotifyKind::Regular => FUTEX_NOTIFIED,
242        };
243
244        match self.futex.compare_exchange(
245            FUTEX_WAITING,
246            futex_val,
247            ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE,
248            ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE,
249        ) {
250            Ok(observed) => {
251                debug_assert_eq!(observed, FUTEX_WAITING);
252                self.futex.wake_all();
253            }
254            Err(observed) => match observed {
255                FUTEX_WAITING => unreachable!("this should have passed"),
256                FUTEX_NOTIFIED | FUTEX_INTERRUPTED => {}
257                FUTEX_USE_PORT => {
258                    self.queue_user_packet_data(kind);
259                }
260                observed => unreachable!("unexpected value = {observed}"),
261            },
262        }
263    }
264}
265
266#[cfg(test)]
267mod test {
268    use std::sync::Arc;
269
270    use test_case::test_case;
271    use zx::AsHandleRef as _;
272
273    use super::*;
274
275    #[test]
276    fn test_signal_and_wait_block() {
277        const KEY: u64 = 1;
278        const ASSERTED_SIGNAL: zx::Signals = zx::Signals::USER_0;
279
280        let event = Arc::new(PortEvent::new());
281        let object = zx::Event::create();
282        assert_eq!(
283            event.object_wait_async(&object, KEY, ASSERTED_SIGNAL, zx::WaitAsyncOpts::empty()),
284            Ok(())
285        );
286
287        let event_clone = event.clone();
288        let thread = std::thread::spawn(move || {
289            assert_eq!(
290                event_clone.wait(zx::MonotonicInstant::INFINITE),
291                PortWaitResult::Signal { key: KEY, observed: ASSERTED_SIGNAL }
292            );
293        });
294
295        object
296            .signal_handle(
297                /*clear_mask=*/ zx::Signals::NONE,
298                /*set_mask=*/ ASSERTED_SIGNAL,
299            )
300            .unwrap();
301        thread.join().expect("join thread");
302    }
303
304    #[test]
305    fn test_signal_then_wait_nonblock() {
306        const KEY: u64 = 2;
307        const ASSERTED_SIGNAL: zx::Signals = zx::Signals::USER_1;
308
309        let event = PortEvent::new();
310        let object = zx::Event::create();
311        assert_eq!(
312            event.object_wait_async(&object, KEY, ASSERTED_SIGNAL, zx::WaitAsyncOpts::empty()),
313            Ok(())
314        );
315        object
316            .signal_handle(
317                /*clear_mask=*/ zx::Signals::NONE,
318                /*set_mask=*/ ASSERTED_SIGNAL,
319            )
320            .unwrap();
321
322        assert_eq!(
323            event.wait(zx::MonotonicInstant::INFINITE_PAST),
324            PortWaitResult::Signal { key: KEY, observed: ASSERTED_SIGNAL }
325        );
326    }
327
328    #[test]
329    fn test_signal_then_cancel_then_wait() {
330        const KEY: u64 = 3;
331        const ASSERTED_SIGNAL: zx::Signals = zx::Signals::USER_2;
332
333        let event = PortEvent::new();
334        let object = zx::Event::create();
335
336        assert_eq!(
337            event.object_wait_async(&object, KEY, ASSERTED_SIGNAL, zx::WaitAsyncOpts::empty()),
338            Ok(())
339        );
340        object
341            .signal_handle(
342                /*clear_mask=*/ zx::Signals::NONE,
343                /*set_mask=*/ ASSERTED_SIGNAL,
344            )
345            .unwrap();
346
347        event.cancel(&object.as_handle_ref(), KEY);
348        assert_eq!(event.wait(zx::MonotonicInstant::INFINITE_PAST), PortWaitResult::TimedOut);
349    }
350
351    #[test_case(NotifyKind::Interrupt, true; "interrupt with object")]
352    #[test_case(NotifyKind::Regular, true; "not interrupt with object")]
353    #[test_case(NotifyKind::Interrupt, false; "interrupt without object")]
354    #[test_case(NotifyKind::Regular, false; "not interrupt without object")]
355    fn test_notify_and_wait_block(kind: NotifyKind, with_object: bool) {
356        const KEY: u64 = 4;
357
358        let event = Arc::new(PortEvent::new());
359        let object = zx::Event::create();
360        if with_object {
361            assert_eq!(
362                event.object_wait_async(
363                    &object,
364                    KEY,
365                    zx::Signals::USER_3,
366                    zx::WaitAsyncOpts::empty()
367                ),
368                Ok(())
369            );
370        }
371
372        let event_clone = event.clone();
373        let thread = std::thread::spawn(move || {
374            assert_eq!(
375                event_clone.wait(zx::MonotonicInstant::INFINITE),
376                PortWaitResult::Notification { kind }
377            );
378        });
379
380        event.notify(kind);
381        thread.join().expect("join thread");
382    }
383
384    #[test_case(NotifyKind::Interrupt, true; "interrupt with object")]
385    #[test_case(NotifyKind::Regular, true; "not interrupt with object")]
386    #[test_case(NotifyKind::Interrupt, false; "interrupt without object")]
387    #[test_case(NotifyKind::Regular, false; "not interrupt without object")]
388    fn test_notify_then_wait_nonblock(kind: NotifyKind, with_object: bool) {
389        const KEY: u64 = 5;
390
391        let event = PortEvent::new();
392        let object = zx::Event::create();
393        if with_object {
394            assert_eq!(
395                event.object_wait_async(
396                    &object,
397                    KEY,
398                    zx::Signals::USER_4,
399                    zx::WaitAsyncOpts::empty()
400                ),
401                Ok(())
402            );
403        }
404
405        event.notify(kind);
406        assert_eq!(
407            event.wait(zx::MonotonicInstant::INFINITE_PAST),
408            PortWaitResult::Notification { kind }
409        );
410    }
411
412    #[test_case(true, zx::MonotonicInstant::after(zx::MonotonicDuration::from_millis(100)); "blocking with object")]
413    #[test_case(false, zx::MonotonicInstant::after(zx::MonotonicDuration::from_millis(100)); "blocking without object")]
414    #[test_case(true, zx::MonotonicInstant::INFINITE_PAST; "non blocking with object")]
415    #[test_case(false, zx::MonotonicInstant::INFINITE_PAST; "non blocking without object")]
416    fn test_wait_timeout(with_object: bool, deadline: zx::MonotonicInstant) {
417        const KEY: u64 = 6;
418        const ASSERTED_SIGNAL: zx::Signals = zx::Signals::USER_5;
419
420        let event = PortEvent::new();
421        let object = zx::Event::create();
422
423        if with_object {
424            assert_eq!(
425                event.object_wait_async(&object, KEY, ASSERTED_SIGNAL, zx::WaitAsyncOpts::empty()),
426                Ok(())
427            );
428        }
429        assert_eq!(event.wait(deadline), PortWaitResult::TimedOut);
430    }
431}