fuchsia_rcu/state_machine.rs
1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::atomic_stack::{AtomicListIterator, AtomicStack};
6use fuchsia_sync::Mutex;
7use std::cell::Cell;
8use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering, fence};
9use std::thread_local;
10
11#[cfg(feature = "rseq_backend")]
12use crate::read_counters::RcuReadCounters;
13
14type RcuCallback = Box<dyn FnOnce() + Send + Sync + 'static>;
15
16struct RcuControlBlock {
17 /// The generation counter.
18 ///
19 /// The generation counter is incremented whenever the state machine leaves the `Idle` state.
20 generation: AtomicUsize,
21
22 /// The read counters.
23 ///
24 /// Readers increment the counter for the generation that they are reading from. For example,
25 /// if the `generation` is even, then readers increment the counter for the `read_counters[0]`.
26 /// If the `generation` is odd, then readers increment the counter for the `read_counters[1]`.
27 #[cfg(not(feature = "rseq_backend"))]
28 read_counters: [AtomicUsize; 2],
29
30 #[cfg(feature = "rseq_backend")]
31 read_counters: RcuReadCounters,
32
33 /// The chain of callbacks that are waiting to be run.
34 ///
35 /// Writers add callbacks to this chain after writing to the object. The callbacks are run when
36 /// all currently in-flight read operations have completed.
37 callback_chain: AtomicStack<RcuCallback>,
38
39 /// The futex used to wait for the state machine to advance.
40 advancer: zx::Futex,
41
42 /// Callbacks that are ready to run after the next grace period.
43 waiting_callbacks: Mutex<AtomicListIterator<RcuCallback>>,
44}
45
46const ADVANCER_IDLE: i32 = 0;
47const ADVANCER_WAITING: i32 = 1;
48
49impl RcuControlBlock {
50 /// Create a new control block for the RCU state machine.
51 const fn new() -> Self {
52 #[cfg(feature = "rseq_backend")]
53 let read_counters = RcuReadCounters::new();
54
55 #[cfg(not(feature = "rseq_backend"))]
56 let read_counters = [AtomicUsize::new(0), AtomicUsize::new(0)];
57
58 Self {
59 generation: AtomicUsize::new(0),
60 read_counters,
61 callback_chain: AtomicStack::new(),
62 advancer: zx::Futex::new(ADVANCER_IDLE),
63 waiting_callbacks: Mutex::new(AtomicListIterator::empty()),
64 }
65 }
66}
67
68/// The control block for the RCU state machine.
69static RCU_CONTROL_BLOCK: RcuControlBlock = RcuControlBlock::new();
70
71struct RcuThreadBlock {
72 /// The number of times the thread has nested into a read lock.
73 nesting_level: Cell<usize>,
74
75 /// The index of the read counter that the thread incremented when it entered its outermost read
76 /// lock.
77 counter_index: Cell<u8>,
78
79 /// Whether this thread has scheduled callbacks since the last time the thread called
80 /// `rcu_synchronize`.
81 has_pending_callbacks: Cell<bool>,
82}
83
84impl RcuThreadBlock {
85 /// Returns true if the thread is holding a read lock.
86 fn holding_read_lock(&self) -> bool {
87 self.nesting_level.get() > 0
88 }
89}
90
91impl Default for RcuThreadBlock {
92 fn default() -> Self {
93 #[cfg(feature = "rseq_backend")]
94 fuchsia_rseq::rseq_register_thread().expect("failed to register thread");
95
96 Self {
97 nesting_level: Cell::new(0),
98 counter_index: Cell::new(0),
99 has_pending_callbacks: Cell::new(false),
100 }
101 }
102}
103
104impl Drop for RcuThreadBlock {
105 fn drop(&mut self) {
106 #[cfg(feature = "rseq_backend")]
107 fuchsia_rseq::rseq_unregister_thread().expect("failed to unregister thread");
108 }
109}
110
111thread_local! {
112 /// Thread-specific data for the RCU state machine.
113 ///
114 /// This data is used to track the nesting level of read locks and the index of the read counter
115 /// that the thread incremented when it entered its outermost read lock.
116 static RCU_THREAD_BLOCK: RcuThreadBlock = RcuThreadBlock::default();
117}
118
119/// Acquire a read lock.
120///
121/// This function is used to acquire a read lock on the RCU state machine. The RCU state machine
122/// defers calling callbacks until all currently in-flight read operations have completed.
123///
124/// Must be balanced by a call to `rcu_read_unlock` on the same thread.
125pub(crate) fn rcu_read_lock() {
126 RCU_THREAD_BLOCK.with(|thread_block| {
127 let nesting_level = thread_block.nesting_level.get();
128 if nesting_level > 0 {
129 // If this thread already has a read lock, increment the nesting level instead of the
130 // incrementing the read counter. This approach is a performance optimization to reduce
131 // the number of atomic operations that need to be performed.
132 thread_block.nesting_level.set(nesting_level + 1);
133 } else {
134 // This is the outermost read lock. Increment the read counter.
135 let control_block = &RCU_CONTROL_BLOCK;
136
137 // There's a race here where we capture `index` and then go on to increment the read
138 // counter. The choice of `index` here isn't actually important for correctness because
139 // we always wait at least two grace periods before calling the callbacks, so it doesn't
140 // matter which counter we increment. It does mean that a thread waiting for the read
141 // counter to drop to zero, could actually find that the read counter increases before
142 // it eventually reaches zero, which should be fine.
143 let index = control_block.generation.load(Ordering::Relaxed) & 1;
144
145 #[cfg(feature = "rseq_backend")]
146 {
147 control_block.read_counters.begin(index);
148 std::sync::atomic::compiler_fence(Ordering::SeqCst);
149 }
150
151 #[cfg(not(feature = "rseq_backend"))]
152 {
153 // Synchronization point [A] (see design.md)
154 control_block.read_counters[index].fetch_add(1, Ordering::SeqCst);
155 }
156
157 thread_block.counter_index.set(index as u8);
158 thread_block.nesting_level.set(1);
159 }
160 });
161}
162
163/// Release a read lock.
164///
165/// This function is used to release a read lock on the RCU state machine. See `rcu_read_lock` for
166/// more details.
167pub(crate) fn rcu_read_unlock() {
168 RCU_THREAD_BLOCK.with(|thread_block| {
169 let nesting_level = thread_block.nesting_level.get();
170 if nesting_level > 1 {
171 // If the nesting level is greater than 1, this is not the outermost read lock.
172 // Decrement the nesting level instead of the read counter.
173 thread_block.nesting_level.set(nesting_level - 1);
174 } else {
175 // This is the outermost read lock. Decrement the read counter.
176 let index = thread_block.counter_index.get() as usize;
177 let control_block = &RCU_CONTROL_BLOCK;
178
179 #[cfg(feature = "rseq_backend")]
180 {
181 std::sync::atomic::compiler_fence(Ordering::SeqCst);
182 control_block.read_counters.end(index);
183
184 // We cannot tell if this thread is the last thread to exit its read lock, so we
185 // always wake the advancer. The advancer will check if there are any active
186 // readers and will only advance the state machine if there are no active
187 // readers.
188 rcu_advancer_wake_all();
189 }
190
191 #[cfg(not(feature = "rseq_backend"))]
192 {
193 // Synchronization point [B] (see design.md)
194 let previous_count =
195 control_block.read_counters[index].fetch_sub(1, Ordering::SeqCst);
196 if previous_count == 1 {
197 rcu_advancer_wake_all();
198 }
199 }
200
201 thread_block.nesting_level.set(0);
202 thread_block.counter_index.set(u8::MAX);
203 }
204 });
205}
206
207/// Read the value of an RCU pointer.
208///
209/// This function cannot be called unless the current thread is holding a read lock. The returned
210/// pointer is valid until the read lock is released.
211pub(crate) fn rcu_read_pointer<T>(ptr: &AtomicPtr<T>) -> *const T {
212 // Synchronization point [D] (see design.md)
213 ptr.load(Ordering::Acquire)
214}
215
216/// Assign a new value to an RCU pointer.
217///
218/// Concurrent readers may continue to reference the old value of the pointer until the RCU state
219/// machine has made sufficient progress. To clean up the old value of the pointer, use `rcu_call`
220/// or `rcu_drop`, which defer processing until all in-flight read operations have completed.
221pub(crate) fn rcu_assign_pointer<T>(ptr: &AtomicPtr<T>, new_ptr: *mut T) {
222 // Synchronization point [E] (see design.md)
223 ptr.store(new_ptr, Ordering::Release);
224}
225
226/// Replace the value of an RCU pointer.
227///
228/// Concurrent readers may continue to reference the old value of the pointer until the RCU state
229/// machine has made sufficient progress. To clean up the old value of the pointer, use `rcu_call`
230/// or `rcu_drop`, which defer processing until all in-flight read operations have completed.
231pub(crate) fn rcu_replace_pointer<T>(ptr: &AtomicPtr<T>, new_ptr: *mut T) -> *mut T {
232 // Synchronization point [F] (see design.md)
233 ptr.swap(new_ptr, Ordering::AcqRel)
234}
235
236/// Call a callback to run after all in-flight read operations have completed.
237///
238/// To wait until the callback is ready to run, call `rcu_synchronize()`. The callback might be
239/// called from an arbitrary thread.
240///
241/// NOTE: The order in which callbacks are called is not guaranteed since they can be called
242/// concurrently from multiple threads.
243pub(crate) fn rcu_call(callback: impl FnOnce() + Send + Sync + 'static) {
244 RCU_THREAD_BLOCK.with(|block| {
245 block.has_pending_callbacks.set(true);
246 });
247
248 // We need to synchronize with rcu_read_lock. We need to ensure that all prior stores are
249 // visible to threads that have called rcu_read_lock. We must synchronize with both read
250 // counters using a store operation. We don't need to change the value.
251 fence(Ordering::Release);
252
253 #[cfg(not(feature = "rseq_backend"))]
254 {
255 RCU_CONTROL_BLOCK.read_counters[0].fetch_add(0, Ordering::Relaxed);
256 RCU_CONTROL_BLOCK.read_counters[1].fetch_add(0, Ordering::Relaxed);
257 }
258
259 // Even though we push the callback to the front of the stack, we reverse the order of the stack
260 // when we pop the callbacks from the stack to ensure that the callbacks are run in the order in
261 // which they were scheduled.
262
263 // Synchronization point [G] (see design.md)
264 RCU_CONTROL_BLOCK.callback_chain.push_front(Box::new(callback));
265}
266
267/// Schedule the object to be dropped after all in-flight read operations have completed.
268///
269/// To wait until the object is dropped, call `rcu_synchronize()`. The object might be dropped from
270/// an arbitrary thread.
271pub fn rcu_drop<T: Send + Sync + 'static>(value: T) {
272 rcu_call(move || {
273 std::mem::drop(value);
274 });
275}
276
277/// Check if there are any active readers for the given generation.
278fn has_active_readers(generation: usize) -> bool {
279 let index = generation & 1;
280
281 #[cfg(feature = "rseq_backend")]
282 {
283 return RCU_CONTROL_BLOCK.read_counters.has_active(index);
284 }
285
286 #[cfg(not(feature = "rseq_backend"))]
287 {
288 // Synchronization point [C] (see design.md)
289 RCU_CONTROL_BLOCK.read_counters[index].load(Ordering::SeqCst) > 0
290 }
291}
292
293/// Wake up all the threads that are waiting to advance the state machine.
294///
295/// Does nothing if no threads are waiting.
296fn rcu_advancer_wake_all() {
297 let advancer = &RCU_CONTROL_BLOCK.advancer;
298 if advancer.load(Ordering::SeqCst) == ADVANCER_WAITING {
299 advancer.store(ADVANCER_IDLE, Ordering::Relaxed);
300 advancer.wake_all();
301 }
302}
303
304/// Blocks the current thread until all in-flight read operations have completed for the given
305/// generation.
306///
307/// Postcondition: The number of active readers for the given generation is zero and the advancer
308/// futex contains `ADVANCER_IDLE`.
309fn rcu_advancer_wait(generation: usize) {
310 let advancer = &RCU_CONTROL_BLOCK.advancer;
311 loop {
312 // In order to avoid a race with `rcu_advancer_wake_all`, we must store `ADVANCER_WAITING`
313 // before checking if there are any active readers.
314 //
315 // In the single total order, either this store or the last decrement to the reader counter
316 // must happen first.
317 //
318 // (1) If this store happens first, then the last thread to decrement the reader counter
319 // for this generation will observe `ADVANCER_WAITING` and will reset the value to
320 // `ADVANCER_IDLE` and wake the futex, unblocking this thread.
321 //
322 // (2) If the last decrement to the reader counter happens first, then this thread will see
323 // that there are no active readers in this generation and avoid blocking on the futex.
324 advancer.store(ADVANCER_WAITING, Ordering::SeqCst);
325 if !has_active_readers(generation) {
326 break;
327 }
328 let _ = advancer.wait(ADVANCER_WAITING, None, zx::MonotonicInstant::INFINITE);
329 }
330 advancer.store(ADVANCER_IDLE, Ordering::SeqCst);
331}
332
333/// Advance the RCU state machine.
334///
335/// This function blocks until all in-flight read operations have completed for the current
336/// generation and all callbacks have been run.
337fn rcu_grace_period() {
338 let callbacks = {
339 let mut waiting_callbacks = RCU_CONTROL_BLOCK.waiting_callbacks.lock();
340
341 // We are in the *Idle* state.
342
343 // Swap out the callbacks that we can run when this grace period has passed with the
344 // callbacks that can run after the next period.
345 // Synchronization point [H] (see design.md)
346 let callbacks =
347 std::mem::replace(&mut *waiting_callbacks, RCU_CONTROL_BLOCK.callback_chain.take());
348
349 // Issue an IPI to all CPUs to force them to serialize their execution.
350 // This ensures that all prior stores by all writers are visible to
351 // any thread that subsequently enters an RCU read-side critical section.
352 #[cfg(feature = "rseq_backend")]
353 {
354 let status =
355 unsafe { zx::sys::zx_system_barrier(zx::sys::ZX_SYSTEM_BARRIER_DATA_MEMORY) };
356 debug_assert_eq!(status, zx::sys::ZX_OK);
357 }
358
359 let generation = RCU_CONTROL_BLOCK.generation.fetch_add(1, Ordering::Relaxed);
360
361 // Enter the *Waiting* state.
362 rcu_advancer_wait(generation);
363
364 // Return to the *Idle* state.
365 callbacks
366 };
367
368 // We cannot control the order in which callbacks run since callbacks can be running on multiple
369 // threads concurrently.
370 for callback in callbacks {
371 callback();
372 }
373}
374
375/// Block until all in-flight read operations have completed. When this returns, the callbacks that
376/// are unblocked by those in-flight operations might still be running (or even not yet started) on
377/// another thread.
378pub fn rcu_synchronize() {
379 RCU_THREAD_BLOCK.with(|block| {
380 assert!(!block.holding_read_lock());
381 block.has_pending_callbacks.set(false);
382 });
383
384 // We need to run at least two grace periods to flush out all pending callbacks. See the
385 // comment in `rcu_read_lock` and the design to understand why.
386 rcu_grace_period();
387 rcu_grace_period();
388}
389
390/// If any callbacks have been scheduled from this thread, call `rcu_synchronize`.
391///
392/// If any callbacks have been scheduled from this thread, this function will block until the
393/// callbacks are unblocked and ready to be run (but have not yet necessarily finished, or even
394/// started). If no callbacks have been scheduled from this thread, this function will return
395/// immediately.
396pub fn rcu_run_callbacks() {
397 RCU_THREAD_BLOCK.with(|block| {
398 assert!(!block.holding_read_lock());
399 if block.has_pending_callbacks.get() {
400 rcu_synchronize();
401 }
402 })
403}
404
405#[cfg(test)]
406mod tests {
407 use super::*;
408 use std::sync::Arc;
409 use std::sync::atomic::{AtomicBool, Ordering};
410
411 #[test]
412 fn test_rcu_delay_regression() {
413 // This test relies on the global RCU state machine.
414 // It verifies that callbacks are NOT executed immediately after one grace period.
415
416 let flag = Arc::new(AtomicBool::new(false));
417 let moved_flag = flag.clone();
418
419 rcu_call(move || {
420 moved_flag.store(true, Ordering::SeqCst);
421 });
422
423 rcu_grace_period();
424
425 assert!(
426 !flag.load(Ordering::SeqCst),
427 "Callback executed too early! RCU requires 2 grace periods delay."
428 );
429
430 rcu_grace_period();
431 assert!(flag.load(Ordering::SeqCst), "Callback should have executed after 2 grace periods");
432 }
433
434 #[test]
435 fn test_rcu_synchronize() {
436 // This test relies on the global RCU state machine.
437 // It verifies that rcu_synchronize() blocks until all callbacks have been run.
438
439 let flag = Arc::new(AtomicBool::new(false));
440 let moved_flag = flag.clone();
441
442 rcu_call(move || {
443 moved_flag.store(true, Ordering::SeqCst);
444 });
445
446 rcu_synchronize();
447 assert!(
448 flag.load(Ordering::SeqCst),
449 "Callback should have executed after rcu_synchronize()"
450 );
451 }
452
453 #[test]
454 fn test_rcu_run_callbacks() {
455 // This test relies on the global RCU state machine.
456 // It verifies that rcu_run_callbacks() blocks until all callbacks have been run.
457
458 let flag = Arc::new(AtomicBool::new(false));
459 let moved_flag = flag.clone();
460
461 rcu_call(move || {
462 moved_flag.store(true, Ordering::SeqCst);
463 });
464
465 rcu_run_callbacks();
466 assert!(
467 flag.load(Ordering::SeqCst),
468 "Callback should have executed after rcu_run_callbacks()"
469 );
470 }
471}