starnix_sync/
interruptible_event.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::sync::atomic::Ordering;
6use std::sync::Arc;
7
8/// A blocking object that can either be notified normally or interrupted
9///
10/// To block using an `InterruptibleEvent`, first call `begin_wait`. At this point, the event is
11/// in the "waiting" state, and future calls to `notify` or `interrupt` will terminate the wait.
12///
13/// After `begin_wait` returns, call `block_until` to block the current thread until one of the
14/// following conditions occur:
15///
16///  1. The given deadline expires.
17///  2. At least one of the `notify` or `interrupt` functions were called after `begin_wait`.
18///
19/// It's safe to call `notify` or `interrupt` at any time. However, calls to `begin_wait` and
20/// `block_until` must alternate, starting with `begin_wait`.
21///
22/// `InterruptibleEvent` uses two-phase waiting so that clients can register for notification,
23/// perform some related work, and then start blocking. This approach ensures that clients do not
24/// miss notifications that arrive after they perform the related work but before they actually
25/// start blocking.
26#[derive(Debug)]
27pub struct InterruptibleEvent {
28    futex: zx::Futex,
29}
30
31/// The initial state.
32///
33///  * Transitions to `WAITING` after `begin_wait`.
34const READY: i32 = 0;
35
36/// The event is waiting for a notification or an interruption.
37///
38///  * Transitions to `NOTIFIED` after `notify`.
39///  * Transitions to `INTERRUPTED` after `interrupt`.
40///  * Transitions to `READY` if the deadline for `block_until` expires.
41const WAITING: i32 = 1;
42
43/// The event has been notified and will wake up.
44///
45///  * Transitions to `READY` after `block_until` processes the notification.
46const NOTIFIED: i32 = 2;
47
48/// The event has been interrupted and will wake up.
49///
50///  * Transitions to `READY` after `block_until` processes the interruption.
51const INTERRUPTED: i32 = 3;
52
53/// A guard object to enforce that clients call `begin_wait` before `block_until`.
54#[must_use = "call block_until to advance the event state machine"]
55pub struct EventWaitGuard<'a> {
56    event: &'a Arc<InterruptibleEvent>,
57}
58
59impl<'a> EventWaitGuard<'a> {
60    /// The underlying event associated with this guard.
61    pub fn event(&self) -> &'a Arc<InterruptibleEvent> {
62        self.event
63    }
64
65    /// Block the thread until either `deadline` expires, the event is notified, or the event is
66    /// interrupted.
67    pub fn block_until(self, deadline: zx::MonotonicInstant) -> Result<(), WakeReason> {
68        self.event.block_until(deadline)
69    }
70}
71
72/// A description of why a `block_until` returned without the event being notified.
73#[derive(Debug, PartialEq, Eq)]
74pub enum WakeReason {
75    /// `block_until` returned because another thread interrupted the wait using `interrupt`.
76    Interrupted,
77
78    /// `block_until` returned because the given deadline expired.
79    DeadlineExpired,
80}
81
82impl InterruptibleEvent {
83    pub fn new() -> Arc<Self> {
84        Arc::new(InterruptibleEvent { futex: zx::Futex::new(0) })
85    }
86
87    /// Called to initiate a wait.
88    ///
89    /// Calls to `notify` or `interrupt` after this function returns will cause the event to wake
90    /// up. Calls to those functions prior to calling `begin_wait` will be ignored.
91    ///
92    /// Once called, this function cannot be called again until `block_until` returns. Otherwise,
93    /// this function will panic.
94    pub fn begin_wait<'a>(self: &'a Arc<Self>) -> EventWaitGuard<'a> {
95        self.futex
96            .compare_exchange(READY, WAITING, Ordering::Relaxed, Ordering::Relaxed)
97            .expect("Tried to begin waiting on an event when not ready.");
98        EventWaitGuard { event: self }
99    }
100
101    fn block_until(&self, deadline: zx::MonotonicInstant) -> Result<(), WakeReason> {
102        // We need to loop around the call to zx_futex_wake because we can receive spurious
103        // wakeups.
104        loop {
105            match self.futex.wait(WAITING, None, deadline) {
106                // The deadline expired while we were sleeping.
107                Err(zx::Status::TIMED_OUT) => {
108                    self.futex.store(READY, Ordering::Relaxed);
109                    return Err(WakeReason::DeadlineExpired);
110                }
111                // The value changed before we went to sleep.
112                Err(zx::Status::BAD_STATE) => (),
113                Err(e) => panic!("Unexpected error from zx_futex_wait: {e}"),
114                Ok(()) => (),
115            }
116
117            let state = self.futex.load(Ordering::Acquire);
118
119            match state {
120                // If we're still in the `WAITING` state, then the wake ended spuriously and we
121                // need to go back to sleep.
122                WAITING => continue,
123                NOTIFIED => {
124                    // We use a store here rather than a compare_exchange because other threads are
125                    // only allowed to write to this value in the `WAITING` state and we are in the
126                    // `NOTIFIED` state.
127                    self.futex.store(READY, Ordering::Relaxed);
128                    return Ok(());
129                }
130                INTERRUPTED => {
131                    // We use a store here rather than a compare_exchange because other threads are
132                    // only allowed to write to this value in the `WAITING` state and we are in the
133                    // `INTERRUPTED` state.
134                    self.futex.store(READY, Ordering::Relaxed);
135                    return Err(WakeReason::Interrupted);
136                }
137                _ => {
138                    panic!("Unexpected event state: {state}");
139                }
140            }
141        }
142    }
143
144    /// Wake up the event normally.
145    ///
146    /// If this function is called before `begin_wait`, this notification is ignored. Calling this
147    /// function repeatedly has no effect. If both `notify` and `interrupt` are called, the state
148    /// observed by `block_until` is a race.
149    pub fn notify(&self) {
150        self.wake(NOTIFIED);
151    }
152
153    /// Wake up the event because of an interruption.
154    ///
155    /// If this function is called before `begin_wait`, this notification is ignored. Calling this
156    /// function repeatedly has no effect. If both `notify` and `interrupt` are called, the state
157    /// observed by `block_until` is a race.
158    pub fn interrupt(&self) {
159        self.wake(INTERRUPTED);
160    }
161
162    fn wake(&self, state: i32) {
163        // See <https://marabos.nl/atomics/hardware.html#failing-compare-exchange> for why we issue
164        // this load before the `compare_exchange` below.
165        let observed = self.futex.load(Ordering::Relaxed);
166        if observed == WAITING
167            && self
168                .futex
169                .compare_exchange(WAITING, state, Ordering::Release, Ordering::Relaxed)
170                .is_ok()
171        {
172            self.futex.wake_all();
173        }
174    }
175}
176
177#[cfg(test)]
178mod test {
179    use super::*;
180
181    #[test]
182    fn test_wait_block_and_notify() {
183        let event = InterruptibleEvent::new();
184
185        let guard = event.begin_wait();
186
187        let other_event = Arc::clone(&event);
188        let thread = std::thread::spawn(move || {
189            other_event.notify();
190        });
191
192        guard.block_until(zx::MonotonicInstant::INFINITE).expect("failed to be notified");
193        thread.join().expect("failed to join thread");
194    }
195
196    #[test]
197    fn test_wait_block_and_interrupt() {
198        let event = InterruptibleEvent::new();
199
200        let guard = event.begin_wait();
201
202        let other_event = Arc::clone(&event);
203        let thread = std::thread::spawn(move || {
204            other_event.interrupt();
205        });
206
207        let result = guard.block_until(zx::MonotonicInstant::INFINITE);
208        assert_eq!(result, Err(WakeReason::Interrupted));
209        thread.join().expect("failed to join thread");
210    }
211
212    #[test]
213    fn test_wait_block_and_timeout() {
214        let event = InterruptibleEvent::new();
215
216        let guard = event.begin_wait();
217        let result =
218            guard.block_until(zx::MonotonicInstant::after(zx::MonotonicDuration::from_millis(20)));
219        assert_eq!(result, Err(WakeReason::DeadlineExpired));
220    }
221}