starnix_sync/
port_event.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::sync::LazyLock;
6use std::sync::atomic::{AtomicBool, Ordering};
7
8/// A [`PortEvent`] is interested only in events originating from within the
9/// process (see [`PortEvent.futex`] for more details), and the waiter is
10/// may be notified up.
11const FUTEX_WAITING: i32 = 0;
12/// A [`PortEvent`] is interested only in events originating from within the
13/// process (see [`PortEvent.futex`] for more details), and the waiter is
14/// has been notified of an event.
15const FUTEX_NOTIFIED: i32 = 1;
16/// A [`PortEvent`] is interested only in events originating from within the
17/// process (see [`PortEvent.futex`] for more details), and the waiter is
18/// has been notified of an interrupt.
19const FUTEX_INTERRUPTED: i32 = 2;
20/// A [`PortEvent`] is interested in events originating from outside of
21/// process (see [`PortEvent.futex`] for more details). The waiter's `zx::Port`
22/// should be used instead of the Futex.
23const FUTEX_USE_PORT: i32 = 3;
24
25/// Specifies the ordering for atomics accessed by both the "notifier" and
26/// "notifee" (the waiter).
27///
28/// Relaxed ordering because the [`PortEvent`] does not provide synchronization
29/// between the "notifier" and the "notifee". If a notifiee needs synchronization,
30/// it needs to perform that synchronization itself.
31///
32/// See [`PortEvent.wait`] for more details.
33const ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE: Ordering = Ordering::Relaxed;
34
35/// A wrapper around a [`zx::Port`] that optimizes for the case where events are
36/// signaled within a process.
37///
38/// This object will prefer to use a Futex for notifications/interrupts but will
39/// fallback to a `zx::Port` when the port is subscribed for events on an object
40/// with [`PortEvent.object_wait_async`].
41///
42/// Note that the `PortEvent` does not provide any synchronization between a
43/// notifier (caller of [`PortEvent.notify`]) and a notifiee/waiter (caller of
44/// [`PortEvent.wait`].
45#[derive(Debug)]
46pub struct PortEvent {
47    /// The Futex used to wake up a thread when this waiter is waiting for
48    /// events that don't depend on a `zx::Port`.
49    ///
50    /// This mode is subject to spurious wakeups.
51    /// See more in https://fuchsia.dev/reference/syscalls/futex_wait?hl=en#spurious_wakeups
52    futex: zx::Futex,
53    /// The underlying Zircon port that the waiter waits on when it is
54    /// interested in events that cross process boundaries.
55    ///
56    /// Lazily allocated to optimize for the case where waiters are interested
57    /// only in events triggered within a process.
58    port: LazyLock<zx::Port>,
59    /// Indicates whether a user packet is sitting in the `zx::Port` to wake up
60    /// waiter after handling user events.
61    has_pending_user_packet: AtomicBool,
62}
63
64/// The kind of notification.
65#[derive(Clone, Copy, Debug, Eq, PartialEq)]
66pub enum NotifyKind {
67    Regular,
68    Interrupt,
69}
70
71/// The result of a call to [`PortEvent.wait`].
72#[derive(Debug, Eq, PartialEq)]
73pub enum PortWaitResult {
74    /// Signals asserted on an object.
75    Signal { key: u64, observed: zx::Signals },
76    /// A notification to wake up waiters.
77    Notification { kind: NotifyKind },
78    /// Wait timed out.
79    TimedOut,
80}
81
82impl PortWaitResult {
83    const NOTIFY_REGULAR: Self = Self::Notification { kind: NotifyKind::Regular };
84    const NOTIFY_INTERRUPT: Self = Self::Notification { kind: NotifyKind::Interrupt };
85}
86
87impl PortEvent {
88    /// Returns a new `PortEvent`.
89    pub fn new() -> Self {
90        Self {
91            futex: zx::Futex::new(FUTEX_WAITING),
92            port: LazyLock::new(zx::Port::create),
93            has_pending_user_packet: Default::default(),
94        }
95    }
96
97    /// Wait for an event to occur, or the deadline has been reached.
98    pub fn wait(&self, deadline: zx::MonotonicInstant) -> PortWaitResult {
99        let mut state = self.futex.load(ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE);
100        loop {
101            match state {
102                FUTEX_WAITING => match self.futex.wait(FUTEX_WAITING, None, deadline) {
103                    Ok(()) | Err(zx::Status::BAD_STATE) => {
104                        state = self.futex.load(ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE);
105                    }
106                    Err(zx::Status::TIMED_OUT) => {
107                        return PortWaitResult::TimedOut;
108                    }
109                    Err(e) => panic!("Unexpected error from zx_futex_wait: {e}"),
110                },
111                FUTEX_NOTIFIED | FUTEX_INTERRUPTED => {
112                    match self.futex.compare_exchange(
113                        state,
114                        FUTEX_WAITING,
115                        ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE,
116                        ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE,
117                    ) {
118                        Ok(new_state) => {
119                            debug_assert_eq!(new_state, state);
120                            return if new_state == FUTEX_INTERRUPTED {
121                                PortWaitResult::NOTIFY_INTERRUPT
122                            } else {
123                                PortWaitResult::NOTIFY_REGULAR
124                            };
125                        }
126                        Err(new_state) => {
127                            debug_assert_ne!(new_state, state);
128                            state = new_state;
129                        }
130                    }
131                }
132                FUTEX_USE_PORT => {
133                    break;
134                }
135                state => unreachable!("unexpected value = {state}"),
136            }
137        }
138
139        match self.port.wait(deadline) {
140            Ok(packet) => match packet.status() {
141                zx::sys::ZX_OK => {
142                    match packet.contents() {
143                        zx::PacketContents::SignalOne(sigpkt) => PortWaitResult::Signal {
144                            key: packet.key(),
145                            observed: sigpkt.observed(),
146                        },
147                        zx::PacketContents::User(_) => {
148                            // User packet w/ OK status is only used to wake up
149                            // the waiter after handling process-internal events.
150                            //
151                            // Note that we can be woken up even when we will
152                            // not handle any user events. This is because right
153                            // after we set `has_pending_user_packet` to `false`,
154                            // another thread can immediately queue a new user
155                            // event and set `has_pending_user_packet` to `true`.
156                            // However, that event will be handled by us (by the
157                            // caller when this method returns) as if the event
158                            // was enqueued before we received this user packet.
159                            // Once the caller handles all the current user events,
160                            // we end up with no remaining user events but a user
161                            // packet sitting in the `zx::Port`.
162                            assert!(
163                                self.has_pending_user_packet
164                                    .swap(false, ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE)
165                            );
166
167                            PortWaitResult::NOTIFY_REGULAR
168                        }
169                        _contents => panic!("unexpected packet = {:?}", packet),
170                    }
171                }
172                zx::sys::ZX_ERR_CANCELED => PortWaitResult::NOTIFY_INTERRUPT,
173                status => {
174                    panic!("Unexpected status in port wait {}", status);
175                }
176            },
177            Err(zx::Status::TIMED_OUT) => PortWaitResult::TimedOut,
178            Err(e) => panic!("Unexpected error from port_wait: {e}"),
179        }
180    }
181
182    /// Subscribe for signals on an object.
183    pub fn object_wait_async(
184        &self,
185        handle: &dyn zx::AsHandleRef,
186        key: u64,
187        signals: zx::Signals,
188        opts: zx::WaitAsyncOpts,
189    ) -> Result<(), zx::Status> {
190        match self.futex.swap(FUTEX_USE_PORT, ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE) {
191            FUTEX_WAITING => {
192                self.futex.wake_all();
193            }
194            state @ (FUTEX_NOTIFIED | FUTEX_INTERRUPTED) => {
195                self.queue_user_packet_data(if state == FUTEX_INTERRUPTED {
196                    NotifyKind::Interrupt
197                } else {
198                    NotifyKind::Regular
199                })
200            }
201            FUTEX_USE_PORT => {}
202            v => unreachable!("unexpected value = {v}"),
203        }
204
205        handle.wait_async_handle(&self.port, key, signals, opts)
206    }
207
208    /// Cancels async port notifications on an object.
209    pub fn cancel(&self, key: u64) {
210        let _: Result<(), zx::Status> = self.port.cancel(key);
211    }
212
213    /// Queue a packet to the underlying Zircon port, which will cause the
214    /// waiter to wake up.
215    ///
216    /// This method should only be called when the waiter is interested in
217    /// events that may originate from outside of the process.
218    fn queue_user_packet_data(&self, kind: NotifyKind) {
219        let status = match kind {
220            NotifyKind::Interrupt => zx::sys::ZX_ERR_CANCELED,
221            NotifyKind::Regular => {
222                if self
223                    .has_pending_user_packet
224                    .swap(true, ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE)
225                {
226                    return;
227                }
228
229                zx::sys::ZX_OK
230            }
231        };
232
233        let packet = zx::Packet::from_user_packet(0, status, zx::UserPacket::default());
234        self.port.queue(&packet).unwrap()
235    }
236
237    /// Marks the port as ready to handle a notification (or an interrupt) and
238    /// wakes up any blocked waiters.
239    pub fn notify(&self, kind: NotifyKind) {
240        let futex_val = match kind {
241            NotifyKind::Interrupt => FUTEX_INTERRUPTED,
242            NotifyKind::Regular => FUTEX_NOTIFIED,
243        };
244
245        match self.futex.compare_exchange(
246            FUTEX_WAITING,
247            futex_val,
248            ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE,
249            ORDERING_FOR_ATOMICS_BETWEEN_NOTIFIER_AND_NOTIFEE,
250        ) {
251            Ok(observed) => {
252                debug_assert_eq!(observed, FUTEX_WAITING);
253                self.futex.wake_all();
254            }
255            Err(observed) => match observed {
256                FUTEX_WAITING => unreachable!("this should have passed"),
257                FUTEX_NOTIFIED | FUTEX_INTERRUPTED => {}
258                FUTEX_USE_PORT => {
259                    self.queue_user_packet_data(kind);
260                }
261                observed => unreachable!("unexpected value = {observed}"),
262            },
263        }
264    }
265}
266
267#[cfg(test)]
268mod test {
269    use std::sync::Arc;
270
271    use test_case::test_case;
272
273    use super::*;
274
275    #[test]
276    fn test_signal_and_wait_block() {
277        const KEY: u64 = 1;
278        const ASSERTED_SIGNAL: zx::Signals = zx::Signals::USER_0;
279
280        let event = Arc::new(PortEvent::new());
281        let object = zx::Event::create();
282        assert_eq!(
283            event.object_wait_async(&object, KEY, ASSERTED_SIGNAL, zx::WaitAsyncOpts::empty()),
284            Ok(())
285        );
286
287        let event_clone = event.clone();
288        let thread = std::thread::spawn(move || {
289            assert_eq!(
290                event_clone.wait(zx::MonotonicInstant::INFINITE),
291                PortWaitResult::Signal { key: KEY, observed: ASSERTED_SIGNAL }
292            );
293        });
294
295        object.signal(/*clear_mask=*/ zx::Signals::NONE, /*set_mask=*/ ASSERTED_SIGNAL).unwrap();
296        thread.join().expect("join thread");
297    }
298
299    #[test]
300    fn test_signal_then_wait_nonblock() {
301        const KEY: u64 = 2;
302        const ASSERTED_SIGNAL: zx::Signals = zx::Signals::USER_1;
303
304        let event = PortEvent::new();
305        let object = zx::Event::create();
306        assert_eq!(
307            event.object_wait_async(&object, KEY, ASSERTED_SIGNAL, zx::WaitAsyncOpts::empty()),
308            Ok(())
309        );
310        object.signal(/*clear_mask=*/ zx::Signals::NONE, /*set_mask=*/ ASSERTED_SIGNAL).unwrap();
311
312        assert_eq!(
313            event.wait(zx::MonotonicInstant::INFINITE_PAST),
314            PortWaitResult::Signal { key: KEY, observed: ASSERTED_SIGNAL }
315        );
316    }
317
318    #[test]
319    fn test_signal_then_cancel_then_wait() {
320        const KEY: u64 = 3;
321        const ASSERTED_SIGNAL: zx::Signals = zx::Signals::USER_2;
322
323        let event = PortEvent::new();
324        let object = zx::Event::create();
325
326        assert_eq!(
327            event.object_wait_async(&object, KEY, ASSERTED_SIGNAL, zx::WaitAsyncOpts::empty()),
328            Ok(())
329        );
330        object.signal(/*clear_mask=*/ zx::Signals::NONE, /*set_mask=*/ ASSERTED_SIGNAL).unwrap();
331
332        event.cancel(KEY);
333        assert_eq!(event.wait(zx::MonotonicInstant::INFINITE_PAST), PortWaitResult::TimedOut);
334    }
335
336    #[test_case(NotifyKind::Interrupt, true; "interrupt with object")]
337    #[test_case(NotifyKind::Regular, true; "not interrupt with object")]
338    #[test_case(NotifyKind::Interrupt, false; "interrupt without object")]
339    #[test_case(NotifyKind::Regular, false; "not interrupt without object")]
340    fn test_notify_and_wait_block(kind: NotifyKind, with_object: bool) {
341        const KEY: u64 = 4;
342
343        let event = Arc::new(PortEvent::new());
344        let object = zx::Event::create();
345        if with_object {
346            assert_eq!(
347                event.object_wait_async(
348                    &object,
349                    KEY,
350                    zx::Signals::USER_3,
351                    zx::WaitAsyncOpts::empty()
352                ),
353                Ok(())
354            );
355        }
356
357        let event_clone = event.clone();
358        let thread = std::thread::spawn(move || {
359            assert_eq!(
360                event_clone.wait(zx::MonotonicInstant::INFINITE),
361                PortWaitResult::Notification { kind }
362            );
363        });
364
365        event.notify(kind);
366        thread.join().expect("join thread");
367    }
368
369    #[test_case(NotifyKind::Interrupt, true; "interrupt with object")]
370    #[test_case(NotifyKind::Regular, true; "not interrupt with object")]
371    #[test_case(NotifyKind::Interrupt, false; "interrupt without object")]
372    #[test_case(NotifyKind::Regular, false; "not interrupt without object")]
373    fn test_notify_then_wait_nonblock(kind: NotifyKind, with_object: bool) {
374        const KEY: u64 = 5;
375
376        let event = PortEvent::new();
377        let object = zx::Event::create();
378        if with_object {
379            assert_eq!(
380                event.object_wait_async(
381                    &object,
382                    KEY,
383                    zx::Signals::USER_4,
384                    zx::WaitAsyncOpts::empty()
385                ),
386                Ok(())
387            );
388        }
389
390        event.notify(kind);
391        assert_eq!(
392            event.wait(zx::MonotonicInstant::INFINITE_PAST),
393            PortWaitResult::Notification { kind }
394        );
395    }
396
397    #[test_case(true, zx::MonotonicInstant::after(zx::MonotonicDuration::from_millis(100)); "blocking with object")]
398    #[test_case(false, zx::MonotonicInstant::after(zx::MonotonicDuration::from_millis(100)); "blocking without object")]
399    #[test_case(true, zx::MonotonicInstant::INFINITE_PAST; "non blocking with object")]
400    #[test_case(false, zx::MonotonicInstant::INFINITE_PAST; "non blocking without object")]
401    fn test_wait_timeout(with_object: bool, deadline: zx::MonotonicInstant) {
402        const KEY: u64 = 6;
403        const ASSERTED_SIGNAL: zx::Signals = zx::Signals::USER_5;
404
405        let event = PortEvent::new();
406        let object = zx::Event::create();
407
408        if with_object {
409            assert_eq!(
410                event.object_wait_async(&object, KEY, ASSERTED_SIGNAL, zx::WaitAsyncOpts::empty()),
411                Ok(())
412            );
413        }
414        assert_eq!(event.wait(deadline), PortWaitResult::TimedOut);
415    }
416}