fuchsia_rcu/state_machine.rs
1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::atomic_stack::{AtomicListIterator, AtomicStack};
6use fuchsia_sync::Mutex;
7use std::cell::Cell;
8use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering, fence};
9use std::thread_local;
10
11#[cfg(feature = "rseq_backend")]
12use crate::read_counters::RcuReadCounters;
13
14type RcuCallback = Box<dyn FnOnce() + Send + Sync + 'static>;
15
16struct RcuControlBlock {
17 /// The generation counter.
18 ///
19 /// The generation counter is incremented whenever the state machine leaves the `Idle` state.
20 generation: AtomicUsize,
21
22 /// The read counters.
23 ///
24 /// Readers increment the counter for the generation that they are reading from. For example,
25 /// if the `generation` is even, then readers increment the counter for the `read_counters[0]`.
26 /// If the `generation` is odd, then readers increment the counter for the `read_counters[1]`.
27 #[cfg(not(feature = "rseq_backend"))]
28 read_counters: [AtomicUsize; 2],
29
30 #[cfg(feature = "rseq_backend")]
31 read_counters: RcuReadCounters,
32
33 /// The chain of callbacks that are waiting to be run.
34 ///
35 /// Writers add callbacks to this chain after writing to the object. The callbacks are run when
36 /// all currently in-flight read operations have completed.
37 callback_chain: AtomicStack<RcuCallback>,
38
39 /// The futex used to wait for the state machine to advance.
40 advancer: zx::Futex,
41
42 /// Callbacks that are ready to run after the next grace period.
43 waiting_callbacks: Mutex<AtomicListIterator<RcuCallback>>,
44}
45
46const ADVANCER_IDLE: i32 = 0;
47const ADVANCER_WAITING: i32 = 1;
48
49impl RcuControlBlock {
50 /// Create a new control block for the RCU state machine.
51 const fn new() -> Self {
52 #[cfg(feature = "rseq_backend")]
53 let read_counters = RcuReadCounters::new();
54
55 #[cfg(not(feature = "rseq_backend"))]
56 let read_counters = [AtomicUsize::new(0), AtomicUsize::new(0)];
57
58 Self {
59 generation: AtomicUsize::new(0),
60 read_counters,
61 callback_chain: AtomicStack::new(),
62 advancer: zx::Futex::new(ADVANCER_IDLE),
63 waiting_callbacks: Mutex::new(AtomicListIterator::empty()),
64 }
65 }
66}
67
68/// The control block for the RCU state machine.
69static RCU_CONTROL_BLOCK: RcuControlBlock = RcuControlBlock::new();
70
71#[derive(Default)]
72struct RcuThreadBlock {
73 /// The number of times the thread has nested into a read lock.
74 nesting_level: Cell<usize>,
75
76 /// The index of the read counter that the thread incremented when it entered its outermost read
77 /// lock.
78 counter_index: Cell<u8>,
79
80 /// Whether this thread has scheduled callbacks since the last time the thread called
81 /// `rcu_synchronize`.
82 has_pending_callbacks: Cell<bool>,
83}
84
85impl RcuThreadBlock {
86 /// Returns true if the thread is holding a read lock.
87 fn holding_read_lock(&self) -> bool {
88 self.nesting_level.get() > 0
89 }
90}
91
92thread_local! {
93 /// Thread-specific data for the RCU state machine.
94 ///
95 /// This data is used to track the nesting level of read locks and the index of the read counter
96 /// that the thread incremented when it entered its outermost read lock.
97 static RCU_THREAD_BLOCK: RcuThreadBlock = RcuThreadBlock::default();
98}
99
100/// Acquire a read lock.
101///
102/// This function is used to acquire a read lock on the RCU state machine. The RCU state machine
103/// defers calling callbacks until all currently in-flight read operations have completed.
104///
105/// Must be balanced by a call to `rcu_read_unlock` on the same thread.
106pub(crate) fn rcu_read_lock() {
107 RCU_THREAD_BLOCK.with(|thread_block| {
108 let nesting_level = thread_block.nesting_level.get();
109 if nesting_level > 0 {
110 // If this thread already has a read lock, increment the nesting level instead of the
111 // incrementing the read counter. This approach is a performance optimization to reduce
112 // the number of atomic operations that need to be performed.
113 thread_block.nesting_level.set(nesting_level + 1);
114 } else {
115 // This is the outermost read lock. Increment the read counter.
116 let control_block = &RCU_CONTROL_BLOCK;
117
118 // There's a race here where we capture `index` and then go on to increment the read
119 // counter. The choice of `index` here isn't actually important for correctness because
120 // we always wait at least two grace periods before calling the callbacks, so it doesn't
121 // matter which counter we increment. It does mean that a thread waiting for the read
122 // counter to drop to zero, could actually find that the read counter increases before
123 // it eventually reaches zero, which should be fine.
124 let index = control_block.generation.load(Ordering::Relaxed) & 1;
125
126 #[cfg(feature = "rseq_backend")]
127 {
128 control_block.read_counters.begin(index);
129 std::sync::atomic::compiler_fence(Ordering::SeqCst);
130 }
131
132 #[cfg(not(feature = "rseq_backend"))]
133 {
134 // Synchronization point [A] (see design.md)
135 control_block.read_counters[index].fetch_add(1, Ordering::SeqCst);
136 }
137
138 thread_block.counter_index.set(index as u8);
139 thread_block.nesting_level.set(1);
140 }
141 });
142}
143
144/// Release a read lock.
145///
146/// This function is used to release a read lock on the RCU state machine. See `rcu_read_lock` for
147/// more details.
148pub(crate) fn rcu_read_unlock() {
149 RCU_THREAD_BLOCK.with(|thread_block| {
150 let nesting_level = thread_block.nesting_level.get();
151 if nesting_level > 1 {
152 // If the nesting level is greater than 1, this is not the outermost read lock.
153 // Decrement the nesting level instead of the read counter.
154 thread_block.nesting_level.set(nesting_level - 1);
155 } else {
156 // This is the outermost read lock. Decrement the read counter.
157 let index = thread_block.counter_index.get() as usize;
158 let control_block = &RCU_CONTROL_BLOCK;
159
160 #[cfg(feature = "rseq_backend")]
161 {
162 std::sync::atomic::compiler_fence(Ordering::SeqCst);
163 control_block.read_counters.end(index);
164
165 // We cannot tell if this thread is the last thread to exit its read lock, so we
166 // always wake the advancer. The advancer will check if there are any active
167 // readers and will only advance the state machine if there are no active
168 // readers.
169 rcu_advancer_wake_all();
170 }
171
172 #[cfg(not(feature = "rseq_backend"))]
173 {
174 // Synchronization point [B] (see design.md)
175 let previous_count =
176 control_block.read_counters[index].fetch_sub(1, Ordering::SeqCst);
177 if previous_count == 1 {
178 rcu_advancer_wake_all();
179 }
180 }
181
182 thread_block.nesting_level.set(0);
183 thread_block.counter_index.set(u8::MAX);
184 }
185 });
186}
187
188/// Read the value of an RCU pointer.
189///
190/// This function cannot be called unless the current thread is holding a read lock. The returned
191/// pointer is valid until the read lock is released.
192pub(crate) fn rcu_read_pointer<T>(ptr: &AtomicPtr<T>) -> *const T {
193 // Synchronization point [D] (see design.md)
194 ptr.load(Ordering::Acquire)
195}
196
197/// Assign a new value to an RCU pointer.
198///
199/// Concurrent readers may continue to reference the old value of the pointer until the RCU state
200/// machine has made sufficient progress. To clean up the old value of the pointer, use `rcu_call`
201/// or `rcu_drop`, which defer processing until all in-flight read operations have completed.
202pub(crate) fn rcu_assign_pointer<T>(ptr: &AtomicPtr<T>, new_ptr: *mut T) {
203 // Synchronization point [E] (see design.md)
204 ptr.store(new_ptr, Ordering::Release);
205}
206
207/// Replace the value of an RCU pointer.
208///
209/// Concurrent readers may continue to reference the old value of the pointer until the RCU state
210/// machine has made sufficient progress. To clean up the old value of the pointer, use `rcu_call`
211/// or `rcu_drop`, which defer processing until all in-flight read operations have completed.
212pub(crate) fn rcu_replace_pointer<T>(ptr: &AtomicPtr<T>, new_ptr: *mut T) -> *mut T {
213 // Synchronization point [F] (see design.md)
214 ptr.swap(new_ptr, Ordering::AcqRel)
215}
216
217/// Call a callback to run after all in-flight read operations have completed.
218///
219/// To wait until the callback is ready to run, call `rcu_synchronize()`. The callback might be
220/// called from an arbitrary thread.
221///
222/// NOTE: The order in which callbacks are called is not guaranteed since they can be called
223/// concurrently from multiple threads.
224pub(crate) fn rcu_call(callback: impl FnOnce() + Send + Sync + 'static) {
225 RCU_THREAD_BLOCK.with(|block| {
226 block.has_pending_callbacks.set(true);
227 });
228
229 // We need to synchronize with rcu_read_lock. We need to ensure that all prior stores are
230 // visible to threads that have called rcu_read_lock. We must synchronize with both read
231 // counters using a store operation. We don't need to change the value.
232 fence(Ordering::Release);
233 RCU_CONTROL_BLOCK.read_counters[0].fetch_add(0, Ordering::Relaxed);
234 RCU_CONTROL_BLOCK.read_counters[1].fetch_add(0, Ordering::Relaxed);
235
236 // Even though we push the callback to the front of the stack, we reverse the order of the stack
237 // when we pop the callbacks from the stack to ensure that the callbacks are run in the order in
238 // which they were scheduled.
239
240 // Synchronization point [G] (see design.md)
241 RCU_CONTROL_BLOCK.callback_chain.push_front(Box::new(callback));
242}
243
244/// Schedule the object to be dropped after all in-flight read operations have completed.
245///
246/// To wait until the object is dropped, call `rcu_synchronize()`. The object might be dropped from
247/// an arbitrary thread.
248pub fn rcu_drop<T: Send + Sync + 'static>(value: T) {
249 rcu_call(move || {
250 std::mem::drop(value);
251 });
252}
253
254/// Check if there are any active readers for the given generation.
255fn has_active_readers(generation: usize) -> bool {
256 let index = generation & 1;
257
258 #[cfg(feature = "rseq_backend")]
259 {
260 return RCU_CONTROL_BLOCK.read_counters.has_active(index);
261 }
262
263 #[cfg(not(feature = "rseq_backend"))]
264 {
265 // Synchronization point [C] (see design.md)
266 RCU_CONTROL_BLOCK.read_counters[index].load(Ordering::SeqCst) > 0
267 }
268}
269
270/// Wake up all the threads that are waiting to advance the state machine.
271///
272/// Does nothing if no threads are waiting.
273fn rcu_advancer_wake_all() {
274 let advancer = &RCU_CONTROL_BLOCK.advancer;
275 if advancer.load(Ordering::SeqCst) == ADVANCER_WAITING {
276 advancer.store(ADVANCER_IDLE, Ordering::Relaxed);
277 advancer.wake_all();
278 }
279}
280
281/// Blocks the current thread until all in-flight read operations have completed for the given
282/// generation.
283///
284/// Postcondition: The number of active readers for the given generation is zero and the advancer
285/// futex contains `ADVANCER_IDLE`.
286fn rcu_advancer_wait(generation: usize) {
287 let advancer = &RCU_CONTROL_BLOCK.advancer;
288 loop {
289 // In order to avoid a race with `rcu_advancer_wake_all`, we must store `ADVANCER_WAITING`
290 // before checking if there are any active readers.
291 //
292 // In the single total order, either this store or the last decrement to the reader counter
293 // must happen first.
294 //
295 // (1) If this store happens first, then the last thread to decrement the reader counter
296 // for this generation will observe `ADVANCER_WAITING` and will reset the value to
297 // `ADVANCER_IDLE` and wake the futex, unblocking this thread.
298 //
299 // (2) If the last decrement to the reader counter happens first, then this thread will see
300 // that there are no active readers in this generation and avoid blocking on the futex.
301 advancer.store(ADVANCER_WAITING, Ordering::SeqCst);
302 if !has_active_readers(generation) {
303 break;
304 }
305 let _ = advancer.wait(ADVANCER_WAITING, None, zx::MonotonicInstant::INFINITE);
306 }
307 advancer.store(ADVANCER_IDLE, Ordering::SeqCst);
308}
309
310/// Advance the RCU state machine.
311///
312/// This function blocks until all in-flight read operations have completed for the current
313/// generation and all callbacks have been run.
314fn rcu_grace_period() {
315 let callbacks = {
316 let mut waiting_callbacks = RCU_CONTROL_BLOCK.waiting_callbacks.lock();
317
318 // We are in the *Idle* state.
319
320 // Swap out the callbacks that we can run when this grace period has passed with the
321 // callbacks that can run after the next period.
322 // Synchronization point [H] (see design.md)
323 let callbacks =
324 std::mem::replace(&mut *waiting_callbacks, RCU_CONTROL_BLOCK.callback_chain.take());
325 let generation = RCU_CONTROL_BLOCK.generation.fetch_add(1, Ordering::Relaxed);
326
327 // Enter the *Waiting* state.
328 rcu_advancer_wait(generation);
329
330 // Return to the *Idle* state.
331 callbacks
332 };
333
334 // We cannot control the order in which callbacks run since callbacks can be running on multiple
335 // threads concurrently.
336 for callback in callbacks {
337 callback();
338 }
339}
340
341/// Block until all in-flight read operations have completed. When this returns, the callbacks that
342/// are unblocked by those in-flight operations might still be running (or even not yet started) on
343/// another thread.
344pub fn rcu_synchronize() {
345 RCU_THREAD_BLOCK.with(|block| {
346 assert!(!block.holding_read_lock());
347 block.has_pending_callbacks.set(false);
348 });
349
350 // We need to run at least two grace periods to flush out all pending callbacks. See the
351 // comment in `rcu_read_lock` and the design to understand why.
352 rcu_grace_period();
353 rcu_grace_period();
354}
355
356/// If any callbacks have been scheduled from this thread, call `rcu_synchronize`.
357///
358/// If any callbacks have been scheduled from this thread, this function will block until the
359/// callbacks are unblocked and ready to be run (but have not yet necessarily finished, or even
360/// started). If no callbacks have been scheduled from this thread, this function will return
361/// immediately.
362pub fn rcu_run_callbacks() {
363 RCU_THREAD_BLOCK.with(|block| {
364 assert!(!block.holding_read_lock());
365 if block.has_pending_callbacks.get() {
366 rcu_synchronize();
367 }
368 })
369}
370
371#[cfg(test)]
372mod tests {
373 use super::*;
374 use std::sync::Arc;
375 use std::sync::atomic::{AtomicBool, Ordering};
376
377 #[test]
378 fn test_rcu_delay_regression() {
379 // This test relies on the global RCU state machine.
380 // It verifies that callbacks are NOT executed immediately after one grace period.
381
382 let flag = Arc::new(AtomicBool::new(false));
383 let moved_flag = flag.clone();
384
385 rcu_call(move || {
386 moved_flag.store(true, Ordering::SeqCst);
387 });
388
389 rcu_grace_period();
390
391 assert!(
392 !flag.load(Ordering::SeqCst),
393 "Callback executed too early! RCU requires 2 grace periods delay."
394 );
395
396 rcu_grace_period();
397 assert!(flag.load(Ordering::SeqCst), "Callback should have executed after 2 grace periods");
398 }
399
400 #[test]
401 fn test_rcu_synchronize() {
402 // This test relies on the global RCU state machine.
403 // It verifies that rcu_synchronize() blocks until all callbacks have been run.
404
405 let flag = Arc::new(AtomicBool::new(false));
406 let moved_flag = flag.clone();
407
408 rcu_call(move || {
409 moved_flag.store(true, Ordering::SeqCst);
410 });
411
412 rcu_synchronize();
413 assert!(
414 flag.load(Ordering::SeqCst),
415 "Callback should have executed after rcu_synchronize()"
416 );
417 }
418
419 #[test]
420 fn test_rcu_run_callbacks() {
421 // This test relies on the global RCU state machine.
422 // It verifies that rcu_run_callbacks() blocks until all callbacks have been run.
423
424 let flag = Arc::new(AtomicBool::new(false));
425 let moved_flag = flag.clone();
426
427 rcu_call(move || {
428 moved_flag.store(true, Ordering::SeqCst);
429 });
430
431 rcu_run_callbacks();
432 assert!(
433 flag.load(Ordering::SeqCst),
434 "Callback should have executed after rcu_run_callbacks()"
435 );
436 }
437}