starnix_sync/
interruptible_event.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use starnix_stack::clean_stack;
6use std::sync::Arc;
7use std::sync::atomic::Ordering;
8
9/// A blocking object that can either be notified normally or interrupted
10///
11/// To block using an `InterruptibleEvent`, first call `begin_wait`. At this point, the event is
12/// in the "waiting" state, and future calls to `notify` or `interrupt` will terminate the wait.
13///
14/// After `begin_wait` returns, call `block_until` to block the current thread until one of the
15/// following conditions occur:
16///
17///  1. The given deadline expires.
18///  2. At least one of the `notify` or `interrupt` functions were called after `begin_wait`.
19///
20/// It's safe to call `notify` or `interrupt` at any time. However, calls to `begin_wait` and
21/// `block_until` must alternate, starting with `begin_wait`.
22///
23/// `InterruptibleEvent` uses two-phase waiting so that clients can register for notification,
24/// perform some related work, and then start blocking. This approach ensures that clients do not
25/// miss notifications that arrive after they perform the related work but before they actually
26/// start blocking.
27#[derive(Debug)]
28pub struct InterruptibleEvent {
29    futex: zx::Futex,
30}
31
32/// The initial state.
33///
34///  * Transitions to `WAITING` after `begin_wait`.
35const READY: i32 = 0;
36
37/// The event is waiting for a notification or an interruption.
38///
39///  * Transitions to `NOTIFIED` after `notify`.
40///  * Transitions to `INTERRUPTED` after `interrupt`.
41///  * Transitions to `READY` if the deadline for `block_until` expires.
42const WAITING: i32 = 1;
43
44/// The event has been notified and will wake up.
45///
46///  * Transitions to `READY` after `block_until` processes the notification.
47const NOTIFIED: i32 = 2;
48
49/// The event has been interrupted and will wake up.
50///
51///  * Transitions to `READY` after `block_until` processes the interruption.
52const INTERRUPTED: i32 = 3;
53
54/// A guard object to enforce that clients call `begin_wait` before `block_until`.
55#[must_use = "call block_until to advance the event state machine"]
56pub struct EventWaitGuard<'a> {
57    event: &'a Arc<InterruptibleEvent>,
58}
59
60impl<'a> EventWaitGuard<'a> {
61    /// The underlying event associated with this guard.
62    pub fn event(&self) -> &'a Arc<InterruptibleEvent> {
63        self.event
64    }
65
66    /// Block the thread until either `deadline` expires, the event is notified, or the event is
67    /// interrupted.
68    pub fn block_until(
69        self,
70        new_owner: Option<&zx::Thread>,
71        deadline: zx::MonotonicInstant,
72    ) -> Result<(), WakeReason> {
73        self.event.block_until(new_owner, deadline)
74    }
75}
76
77/// A description of why a `block_until` returned without the event being notified.
78#[derive(Debug, PartialEq, Eq)]
79pub enum WakeReason {
80    /// `block_until` returned because another thread interrupted the wait using `interrupt`.
81    Interrupted,
82
83    /// `block_until` returned because the given deadline expired.
84    DeadlineExpired,
85}
86
87impl InterruptibleEvent {
88    pub fn new() -> Arc<Self> {
89        Arc::new(InterruptibleEvent { futex: zx::Futex::new(0) })
90    }
91
92    /// Called to initiate a wait.
93    ///
94    /// Calls to `notify` or `interrupt` after this function returns will cause the event to wake
95    /// up. Calls to those functions prior to calling `begin_wait` will be ignored.
96    ///
97    /// Once called, this function cannot be called again until `block_until` returns. Otherwise,
98    /// this function will panic.
99    pub fn begin_wait<'a>(self: &'a Arc<Self>) -> EventWaitGuard<'a> {
100        self.futex
101            .compare_exchange(READY, WAITING, Ordering::Relaxed, Ordering::Relaxed)
102            .expect("Tried to begin waiting on an event when not ready.");
103        EventWaitGuard { event: self }
104    }
105
106    fn block_until(
107        &self,
108        new_owner: Option<&zx::Thread>,
109        deadline: zx::MonotonicInstant,
110    ) -> Result<(), WakeReason> {
111        // As an optimization, decommit unused pages of the stack to reduce memory pressure while
112        // the thread is blocked.
113        clean_stack();
114
115        // We need to loop around the call to zx_futex_wake because we can receive spurious
116        // wakeups.
117        loop {
118            match self.futex.wait(WAITING, new_owner, deadline) {
119                // The deadline expired while we were sleeping.
120                Err(zx::Status::TIMED_OUT) => {
121                    self.futex.store(READY, Ordering::Relaxed);
122                    return Err(WakeReason::DeadlineExpired);
123                }
124                // The value changed before we went to sleep.
125                Err(zx::Status::BAD_STATE) => (),
126                Err(e) => panic!("Unexpected error from zx_futex_wait: {e}"),
127                Ok(()) => (),
128            }
129
130            let state = self.futex.load(Ordering::Acquire);
131
132            match state {
133                // If we're still in the `WAITING` state, then the wake ended spuriously and we
134                // need to go back to sleep.
135                WAITING => continue,
136                NOTIFIED => {
137                    // We use a store here rather than a compare_exchange because other threads are
138                    // only allowed to write to this value in the `WAITING` state and we are in the
139                    // `NOTIFIED` state.
140                    self.futex.store(READY, Ordering::Relaxed);
141                    return Ok(());
142                }
143                INTERRUPTED => {
144                    // We use a store here rather than a compare_exchange because other threads are
145                    // only allowed to write to this value in the `WAITING` state and we are in the
146                    // `INTERRUPTED` state.
147                    self.futex.store(READY, Ordering::Relaxed);
148                    return Err(WakeReason::Interrupted);
149                }
150                _ => {
151                    panic!("Unexpected event state: {state}");
152                }
153            }
154        }
155    }
156
157    /// Wake up the event normally.
158    ///
159    /// If this function is called before `begin_wait`, this notification is ignored. Calling this
160    /// function repeatedly has no effect. If both `notify` and `interrupt` are called, the state
161    /// observed by `block_until` is a race.
162    pub fn notify(&self) {
163        self.wake(NOTIFIED);
164    }
165
166    /// Wake up the event because of an interruption.
167    ///
168    /// If this function is called before `begin_wait`, this notification is ignored. Calling this
169    /// function repeatedly has no effect. If both `notify` and `interrupt` are called, the state
170    /// observed by `block_until` is a race.
171    pub fn interrupt(&self) {
172        self.wake(INTERRUPTED);
173    }
174
175    fn wake(&self, state: i32) {
176        // See <https://marabos.nl/atomics/hardware.html#failing-compare-exchange> for why we issue
177        // this load before the `compare_exchange` below.
178        let observed = self.futex.load(Ordering::Relaxed);
179        if observed == WAITING
180            && self
181                .futex
182                .compare_exchange(WAITING, state, Ordering::Release, Ordering::Relaxed)
183                .is_ok()
184        {
185            self.futex.wake_all();
186        }
187    }
188}
189
190#[cfg(test)]
191mod test {
192    use super::*;
193    use zx::AsHandleRef;
194
195    #[test]
196    fn test_wait_block_and_notify() {
197        let event = InterruptibleEvent::new();
198
199        let guard = event.begin_wait();
200
201        let other_event = Arc::clone(&event);
202        let thread = std::thread::spawn(move || {
203            other_event.notify();
204        });
205
206        guard.block_until(None, zx::MonotonicInstant::INFINITE).expect("failed to be notified");
207        thread.join().expect("failed to join thread");
208    }
209
210    #[test]
211    fn test_wait_block_and_interrupt() {
212        let event = InterruptibleEvent::new();
213
214        let guard = event.begin_wait();
215
216        let other_event = Arc::clone(&event);
217        let thread = std::thread::spawn(move || {
218            other_event.interrupt();
219        });
220
221        let result = guard.block_until(None, zx::MonotonicInstant::INFINITE);
222        assert_eq!(result, Err(WakeReason::Interrupted));
223        thread.join().expect("failed to join thread");
224    }
225
226    #[test]
227    fn test_wait_block_and_timeout() {
228        let event = InterruptibleEvent::new();
229
230        let guard = event.begin_wait();
231        let result = guard
232            .block_until(None, zx::MonotonicInstant::after(zx::MonotonicDuration::from_millis(20)));
233        assert_eq!(result, Err(WakeReason::DeadlineExpired));
234    }
235
236    #[test]
237    fn futex_ownership_is_transferred() {
238        use zx::HandleBased;
239
240        let event = Arc::new(InterruptibleEvent::new());
241
242        let (root_thread_handle, root_thread_koid) = fuchsia_runtime::with_thread_self(|thread| {
243            (thread.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap(), thread.get_koid().unwrap())
244        });
245
246        let event_for_blocked_thread = event.clone();
247
248        let blocked_thread = std::thread::spawn(move || {
249            let event = event_for_blocked_thread;
250            let guard = event.begin_wait();
251            guard.block_until(Some(&root_thread_handle), zx::MonotonicInstant::INFINITE).unwrap();
252        });
253
254        // Wait for the correct owner to appear. If for some reason futex PI breaks, it's likely
255        // that this test will time out rather than panicking outright. It would be nice to have
256        // a clear assertion here, but there's no existing API we can use to atomically wait for a
257        // waiter on the futex *and* see what owner they set. If we find ourselves writing a lot of
258        // tests like this we might consider setting up a fake vdso.
259        while event.futex.get_owner() != Some(root_thread_koid) {
260            std::thread::sleep(std::time::Duration::from_millis(100));
261        }
262
263        event.notify();
264        blocked_thread.join().unwrap();
265    }
266
267    #[test]
268    fn stale_pi_owner_is_noop() {
269        use zx::HandleBased;
270
271        let mut new_owner = None;
272        std::thread::scope(|s| {
273            s.spawn(|| {
274                new_owner = Some(
275                    fuchsia_runtime::with_thread_self(|thread| {
276                        thread.duplicate_handle(zx::Rights::SAME_RIGHTS)
277                    })
278                    .unwrap(),
279                );
280            });
281        });
282        let new_owner = new_owner.unwrap();
283
284        let event = InterruptibleEvent::new();
285        let guard = event.begin_wait();
286        let result = guard.block_until(
287            Some(&new_owner),
288            zx::MonotonicInstant::after(zx::MonotonicDuration::from_millis(20)),
289        );
290        assert_eq!(result, Err(WakeReason::DeadlineExpired));
291    }
292}