Skip to main content

starnix_sync/
interruptible_event.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::sync::Arc;
6use std::sync::atomic::Ordering;
7
8/// A blocking object that can either be notified normally or interrupted
9///
10/// To block using an `InterruptibleEvent`, first call `begin_wait`. At this point, the event is
11/// in the "waiting" state, and future calls to `notify` or `interrupt` will terminate the wait.
12///
13/// After `begin_wait` returns, call `block_until` to block the current thread until one of the
14/// following conditions occur:
15///
16///  1. The given deadline expires.
17///  2. At least one of the `notify` or `interrupt` functions were called after `begin_wait`.
18///
19/// It's safe to call `notify` or `interrupt` at any time. However, calls to `begin_wait` and
20/// `block_until` must alternate, starting with `begin_wait`.
21///
22/// `InterruptibleEvent` uses two-phase waiting so that clients can register for notification,
23/// perform some related work, and then start blocking. This approach ensures that clients do not
24/// miss notifications that arrive after they perform the related work but before they actually
25/// start blocking.
26#[derive(Debug)]
27pub struct InterruptibleEvent {
28    futex: zx::Futex,
29}
30
31/// The initial state.
32///
33///  * Transitions to `WAITING` after `begin_wait`.
34const READY: i32 = 0;
35
36/// The event is waiting for a notification or an interruption.
37///
38///  * Transitions to `NOTIFIED` after `notify`.
39///  * Transitions to `INTERRUPTED` after `interrupt`.
40///  * Transitions to `READY` if the deadline for `block_until` expires.
41const WAITING: i32 = 1;
42
43/// The event has been notified and will wake up.
44///
45///  * Transitions to `READY` after `block_until` processes the notification.
46const NOTIFIED: i32 = 2;
47
48/// The event has been interrupted and will wake up.
49///
50///  * Transitions to `READY` after `block_until` processes the interruption.
51const INTERRUPTED: i32 = 3;
52
53/// A guard object to enforce that clients call `begin_wait` before `block_until`.
54#[must_use = "call block_until to advance the event state machine"]
55pub struct EventWaitGuard<'a> {
56    event: &'a Arc<InterruptibleEvent>,
57}
58
59impl<'a> EventWaitGuard<'a> {
60    /// The underlying event associated with this guard.
61    pub fn event(&self) -> &'a Arc<InterruptibleEvent> {
62        self.event
63    }
64
65    /// Returns the owner of the underlying futex, if any.
66    pub fn get_owner(&self) -> Option<zx::Koid> {
67        self.event.get_owner()
68    }
69
70    /// Block the thread until either `deadline` expires, the event is notified, or the event is
71    /// interrupted.
72    pub fn block_until(
73        self,
74        new_owner: Option<&zx::Thread>,
75        deadline: zx::MonotonicInstant,
76    ) -> Result<(), WakeReason> {
77        self.event.block_until(new_owner, deadline)
78    }
79}
80
81/// A description of why a `block_until` returned without the event being notified.
82#[derive(Debug, PartialEq, Eq)]
83pub enum WakeReason {
84    /// `block_until` returned because another thread interrupted the wait using `interrupt`.
85    Interrupted,
86
87    /// `block_until` returned because the given deadline expired.
88    DeadlineExpired,
89}
90
91impl InterruptibleEvent {
92    pub fn new() -> Arc<Self> {
93        Arc::new(InterruptibleEvent { futex: zx::Futex::new(0) })
94    }
95
96    /// Returns the owner of the underlying futex, if any.
97    pub fn get_owner(&self) -> Option<zx::Koid> {
98        self.futex.get_owner()
99    }
100
101    /// Called to initiate a wait.
102    ///
103    /// Calls to `notify` or `interrupt` after this function returns will cause the event to wake
104    /// up. Calls to those functions prior to calling `begin_wait` will be ignored.
105    ///
106    /// Once called, this function cannot be called again until `block_until` returns. Otherwise,
107    /// this function will panic.
108    pub fn begin_wait<'a>(self: &'a Arc<Self>) -> EventWaitGuard<'a> {
109        self.futex
110            .compare_exchange(READY, WAITING, Ordering::Relaxed, Ordering::Relaxed)
111            .expect("Tried to begin waiting on an event when not ready.");
112        EventWaitGuard { event: self }
113    }
114
115    fn block_until(
116        &self,
117        new_owner: Option<&zx::Thread>,
118        deadline: zx::MonotonicInstant,
119    ) -> Result<(), WakeReason> {
120        // We need to loop around the call to zx_futex_wake because we can receive spurious
121        // wakeups.
122        loop {
123            match self.futex.wait(WAITING, new_owner, deadline) {
124                // The deadline expired while we were sleeping.
125                Err(zx::Status::TIMED_OUT) => {
126                    self.futex.store(READY, Ordering::Relaxed);
127                    return Err(WakeReason::DeadlineExpired);
128                }
129                // The value changed before we went to sleep.
130                Err(zx::Status::BAD_STATE) => (),
131                Err(e) => panic!("Unexpected error from zx_futex_wait: {e}"),
132                Ok(()) => (),
133            }
134
135            let state = self.futex.load(Ordering::Acquire);
136
137            match state {
138                // If we're still in the `WAITING` state, then the wake ended spuriously and we
139                // need to go back to sleep.
140                WAITING => continue,
141                NOTIFIED => {
142                    // We use a store here rather than a compare_exchange because other threads are
143                    // only allowed to write to this value in the `WAITING` state and we are in the
144                    // `NOTIFIED` state.
145                    self.futex.store(READY, Ordering::Relaxed);
146                    return Ok(());
147                }
148                INTERRUPTED => {
149                    // We use a store here rather than a compare_exchange because other threads are
150                    // only allowed to write to this value in the `WAITING` state and we are in the
151                    // `INTERRUPTED` state.
152                    self.futex.store(READY, Ordering::Relaxed);
153                    return Err(WakeReason::Interrupted);
154                }
155                _ => {
156                    panic!("Unexpected event state: {state}");
157                }
158            }
159        }
160    }
161
162    /// Wake up the event normally.
163    ///
164    /// If this function is called before `begin_wait`, this notification is ignored. Calling this
165    /// function repeatedly has no effect. If both `notify` and `interrupt` are called, the state
166    /// observed by `block_until` is a race.
167    pub fn notify(&self) {
168        self.wake(NOTIFIED);
169    }
170
171    /// Wake up the event because of an interruption.
172    ///
173    /// If this function is called before `begin_wait`, this notification is ignored. Calling this
174    /// function repeatedly has no effect. If both `notify` and `interrupt` are called, the state
175    /// observed by `block_until` is a race.
176    pub fn interrupt(&self) {
177        self.wake(INTERRUPTED);
178    }
179
180    fn wake(&self, state: i32) {
181        // See <https://marabos.nl/atomics/hardware.html#failing-compare-exchange> for why we issue
182        // this load before the `compare_exchange` below.
183        let observed = self.futex.load(Ordering::Relaxed);
184        if observed == WAITING
185            && self
186                .futex
187                .compare_exchange(WAITING, state, Ordering::Release, Ordering::Relaxed)
188                .is_ok()
189        {
190            self.futex.wake_all();
191        }
192    }
193}
194
195#[cfg(test)]
196mod test {
197    use super::*;
198
199    #[test]
200    fn test_wait_block_and_notify() {
201        let event = InterruptibleEvent::new();
202
203        let guard = event.begin_wait();
204
205        let other_event = Arc::clone(&event);
206        let thread = std::thread::spawn(move || {
207            other_event.notify();
208        });
209
210        guard.block_until(None, zx::MonotonicInstant::INFINITE).expect("failed to be notified");
211        thread.join().expect("failed to join thread");
212    }
213
214    #[test]
215    fn test_wait_block_and_interrupt() {
216        let event = InterruptibleEvent::new();
217
218        let guard = event.begin_wait();
219
220        let other_event = Arc::clone(&event);
221        let thread = std::thread::spawn(move || {
222            other_event.interrupt();
223        });
224
225        let result = guard.block_until(None, zx::MonotonicInstant::INFINITE);
226        assert_eq!(result, Err(WakeReason::Interrupted));
227        thread.join().expect("failed to join thread");
228    }
229
230    #[test]
231    fn test_wait_block_and_timeout() {
232        let event = InterruptibleEvent::new();
233
234        let guard = event.begin_wait();
235        let result = guard
236            .block_until(None, zx::MonotonicInstant::after(zx::MonotonicDuration::from_millis(20)));
237        assert_eq!(result, Err(WakeReason::DeadlineExpired));
238    }
239
240    #[test]
241    fn futex_ownership_is_transferred() {
242        use zx::HandleBased;
243
244        let event = Arc::new(InterruptibleEvent::new());
245
246        let (root_thread_handle, root_thread_koid) = fuchsia_runtime::with_thread_self(|thread| {
247            (thread.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap(), thread.koid().unwrap())
248        });
249
250        let event_for_blocked_thread = event.clone();
251
252        let blocked_thread = std::thread::spawn(move || {
253            let event = event_for_blocked_thread;
254            let guard = event.begin_wait();
255            guard.block_until(Some(&root_thread_handle), zx::MonotonicInstant::INFINITE).unwrap();
256        });
257
258        // Wait for the correct owner to appear. If for some reason futex PI breaks, it's likely
259        // that this test will time out rather than panicking outright. It would be nice to have
260        // a clear assertion here, but there's no existing API we can use to atomically wait for a
261        // waiter on the futex *and* see what owner they set. If we find ourselves writing a lot of
262        // tests like this we might consider setting up a fake vdso.
263        while event.futex.get_owner() != Some(root_thread_koid) {
264            std::thread::sleep(std::time::Duration::from_millis(100));
265        }
266
267        event.notify();
268        blocked_thread.join().unwrap();
269    }
270
271    #[test]
272    fn stale_pi_owner_is_noop() {
273        use zx::HandleBased;
274
275        let mut new_owner = None;
276        std::thread::scope(|s| {
277            s.spawn(|| {
278                new_owner = Some(
279                    fuchsia_runtime::with_thread_self(|thread| {
280                        thread.duplicate_handle(zx::Rights::SAME_RIGHTS)
281                    })
282                    .unwrap(),
283                );
284            });
285        });
286        let new_owner = new_owner.unwrap();
287
288        let event = InterruptibleEvent::new();
289        let guard = event.begin_wait();
290        let result = guard.block_until(
291            Some(&new_owner),
292            zx::MonotonicInstant::after(zx::MonotonicDuration::from_millis(20)),
293        );
294        assert_eq!(result, Err(WakeReason::DeadlineExpired));
295    }
296}