starnix_sync/interruptible_event.rs
1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::sync::Arc;
6use std::sync::atomic::Ordering;
7
8/// A blocking object that can either be notified normally or interrupted
9///
10/// To block using an `InterruptibleEvent`, first call `begin_wait`. At this point, the event is
11/// in the "waiting" state, and future calls to `notify` or `interrupt` will terminate the wait.
12///
13/// After `begin_wait` returns, call `block_until` to block the current thread until one of the
14/// following conditions occur:
15///
16/// 1. The given deadline expires.
17/// 2. At least one of the `notify` or `interrupt` functions were called after `begin_wait`.
18///
19/// It's safe to call `notify` or `interrupt` at any time. However, calls to `begin_wait` and
20/// `block_until` must alternate, starting with `begin_wait`.
21///
22/// `InterruptibleEvent` uses two-phase waiting so that clients can register for notification,
23/// perform some related work, and then start blocking. This approach ensures that clients do not
24/// miss notifications that arrive after they perform the related work but before they actually
25/// start blocking.
26#[derive(Debug)]
27pub struct InterruptibleEvent {
28 futex: zx::Futex,
29}
30
31/// The initial state.
32///
33/// * Transitions to `WAITING` after `begin_wait`.
34const READY: i32 = 0;
35
36/// The event is waiting for a notification or an interruption.
37///
38/// * Transitions to `NOTIFIED` after `notify`.
39/// * Transitions to `INTERRUPTED` after `interrupt`.
40/// * Transitions to `READY` if the deadline for `block_until` expires.
41const WAITING: i32 = 1;
42
43/// The event has been notified and will wake up.
44///
45/// * Transitions to `READY` after `block_until` processes the notification.
46const NOTIFIED: i32 = 2;
47
48/// The event has been interrupted and will wake up.
49///
50/// * Transitions to `READY` after `block_until` processes the interruption.
51const INTERRUPTED: i32 = 3;
52
53/// A guard object to enforce that clients call `begin_wait` before `block_until`.
54#[must_use = "call block_until to advance the event state machine"]
55pub struct EventWaitGuard<'a> {
56 event: &'a Arc<InterruptibleEvent>,
57}
58
59impl<'a> EventWaitGuard<'a> {
60 /// The underlying event associated with this guard.
61 pub fn event(&self) -> &'a Arc<InterruptibleEvent> {
62 self.event
63 }
64
65 /// Returns the owner of the underlying futex, if any.
66 pub fn get_owner(&self) -> Option<zx::Koid> {
67 self.event.get_owner()
68 }
69
70 /// Block the thread until either `deadline` expires, the event is notified, or the event is
71 /// interrupted.
72 pub fn block_until(
73 self,
74 new_owner: Option<&zx::Thread>,
75 deadline: zx::MonotonicInstant,
76 ) -> Result<(), WakeReason> {
77 self.event.block_until(new_owner, deadline)
78 }
79}
80
81/// A description of why a `block_until` returned without the event being notified.
82#[derive(Debug, PartialEq, Eq)]
83pub enum WakeReason {
84 /// `block_until` returned because another thread interrupted the wait using `interrupt`.
85 Interrupted,
86
87 /// `block_until` returned because the given deadline expired.
88 DeadlineExpired,
89}
90
91impl Default for InterruptibleEvent {
92 fn default() -> Self {
93 InterruptibleEvent { futex: zx::Futex::new(0) }
94 }
95}
96
97impl InterruptibleEvent {
98 pub fn new() -> Arc<Self> {
99 Arc::new(Self::default())
100 }
101
102 /// Returns the owner of the underlying futex, if any.
103 pub fn get_owner(&self) -> Option<zx::Koid> {
104 self.futex.get_owner()
105 }
106
107 /// Called to initiate a wait.
108 ///
109 /// Calls to `notify` or `interrupt` after this function returns will cause the event to wake
110 /// up. Calls to those functions prior to calling `begin_wait` will be ignored.
111 ///
112 /// Once called, this function cannot be called again until `block_until` returns. Otherwise,
113 /// this function will panic.
114 pub fn begin_wait<'a>(self: &'a Arc<Self>) -> EventWaitGuard<'a> {
115 self.futex
116 .compare_exchange(READY, WAITING, Ordering::Relaxed, Ordering::Relaxed)
117 .expect("Tried to begin waiting on an event when not ready.");
118 EventWaitGuard { event: self }
119 }
120
121 fn block_until(
122 &self,
123 new_owner: Option<&zx::Thread>,
124 deadline: zx::MonotonicInstant,
125 ) -> Result<(), WakeReason> {
126 // We need to loop around the call to zx_futex_wake because we can receive spurious
127 // wakeups.
128 loop {
129 match self.futex.wait(WAITING, new_owner, deadline) {
130 // The deadline expired while we were sleeping.
131 Err(zx::Status::TIMED_OUT) => {
132 self.futex.store(READY, Ordering::Relaxed);
133 return Err(WakeReason::DeadlineExpired);
134 }
135 // The value changed before we went to sleep.
136 Err(zx::Status::BAD_STATE) => (),
137 Err(e) => panic!("Unexpected error from zx_futex_wait: {e}"),
138 Ok(()) => (),
139 }
140
141 let state = self.futex.load(Ordering::Acquire);
142
143 match state {
144 // If we're still in the `WAITING` state, then the wake ended spuriously and we
145 // need to go back to sleep.
146 WAITING => continue,
147 NOTIFIED => {
148 // We use a store here rather than a compare_exchange because other threads are
149 // only allowed to write to this value in the `WAITING` state and we are in the
150 // `NOTIFIED` state.
151 self.futex.store(READY, Ordering::Relaxed);
152 return Ok(());
153 }
154 INTERRUPTED => {
155 // We use a store here rather than a compare_exchange because other threads are
156 // only allowed to write to this value in the `WAITING` state and we are in the
157 // `INTERRUPTED` state.
158 self.futex.store(READY, Ordering::Relaxed);
159 return Err(WakeReason::Interrupted);
160 }
161 _ => {
162 panic!("Unexpected event state: {state}");
163 }
164 }
165 }
166 }
167
168 /// Wake up the event normally.
169 ///
170 /// If this function is called before `begin_wait`, this notification is ignored. Calling this
171 /// function repeatedly has no effect. If both `notify` and `interrupt` are called, the state
172 /// observed by `block_until` is a race.
173 pub fn notify(&self) {
174 self.wake(NOTIFIED);
175 }
176
177 /// Wake up the event because of an interruption.
178 ///
179 /// If this function is called before `begin_wait`, this notification is ignored. Calling this
180 /// function repeatedly has no effect. If both `notify` and `interrupt` are called, the state
181 /// observed by `block_until` is a race.
182 pub fn interrupt(&self) {
183 self.wake(INTERRUPTED);
184 }
185
186 fn wake(&self, state: i32) {
187 // See <https://marabos.nl/atomics/hardware.html#failing-compare-exchange> for why we issue
188 // this load before the `compare_exchange` below.
189 let observed = self.futex.load(Ordering::Relaxed);
190 if observed == WAITING
191 && self
192 .futex
193 .compare_exchange(WAITING, state, Ordering::Release, Ordering::Relaxed)
194 .is_ok()
195 {
196 self.futex.wake_all();
197 }
198 }
199}
200
201#[cfg(test)]
202mod test {
203 use super::*;
204
205 #[test]
206 fn test_wait_block_and_notify() {
207 let event = InterruptibleEvent::new();
208
209 let guard = event.begin_wait();
210
211 let other_event = Arc::clone(&event);
212 let thread = std::thread::spawn(move || {
213 other_event.notify();
214 });
215
216 guard.block_until(None, zx::MonotonicInstant::INFINITE).expect("failed to be notified");
217 thread.join().expect("failed to join thread");
218 }
219
220 #[test]
221 fn test_wait_block_and_interrupt() {
222 let event = InterruptibleEvent::new();
223
224 let guard = event.begin_wait();
225
226 let other_event = Arc::clone(&event);
227 let thread = std::thread::spawn(move || {
228 other_event.interrupt();
229 });
230
231 let result = guard.block_until(None, zx::MonotonicInstant::INFINITE);
232 assert_eq!(result, Err(WakeReason::Interrupted));
233 thread.join().expect("failed to join thread");
234 }
235
236 #[test]
237 fn test_wait_block_and_timeout() {
238 let event = InterruptibleEvent::new();
239
240 let guard = event.begin_wait();
241 let result = guard
242 .block_until(None, zx::MonotonicInstant::after(zx::MonotonicDuration::from_millis(20)));
243 assert_eq!(result, Err(WakeReason::DeadlineExpired));
244 }
245
246 #[test]
247 fn futex_ownership_is_transferred() {
248 let event = Arc::new(InterruptibleEvent::new());
249
250 let (root_thread_handle, root_thread_koid) = fuchsia_runtime::with_thread_self(|thread| {
251 (thread.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap(), thread.koid().unwrap())
252 });
253
254 let event_for_blocked_thread = event.clone();
255
256 let blocked_thread = std::thread::spawn(move || {
257 let event = event_for_blocked_thread;
258 let guard = event.begin_wait();
259 guard.block_until(Some(&root_thread_handle), zx::MonotonicInstant::INFINITE).unwrap();
260 });
261
262 // Wait for the correct owner to appear.
263 // TODO(b/502692311): Replace this polling loop if it starts timing out.
264 while event.get_owner() != Some(root_thread_koid) {
265 std::thread::sleep(std::time::Duration::from_millis(100));
266 }
267
268 event.notify();
269 blocked_thread.join().unwrap();
270 }
271
272 #[test]
273 fn stale_pi_owner_is_noop() {
274 let mut new_owner = None;
275 std::thread::scope(|s| {
276 s.spawn(|| {
277 new_owner = Some(
278 fuchsia_runtime::with_thread_self(|thread| {
279 thread.duplicate_handle(zx::Rights::SAME_RIGHTS)
280 })
281 .unwrap(),
282 );
283 });
284 });
285 let new_owner = new_owner.unwrap();
286
287 let event = InterruptibleEvent::new();
288 let guard = event.begin_wait();
289 let result = guard.block_until(
290 Some(&new_owner),
291 zx::MonotonicInstant::after(zx::MonotonicDuration::from_millis(20)),
292 );
293 assert_eq!(result, Err(WakeReason::DeadlineExpired));
294 }
295}