Skip to main content

fuchsia_rcu/
state_machine.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::atomic_stack::{AtomicListIterator, AtomicStack};
6use fuchsia_sync::Mutex;
7use std::cell::Cell;
8use std::sync::atomic::{AtomicPtr, AtomicU8, AtomicUsize, Ordering, fence};
9use std::thread_local;
10
11#[cfg(feature = "rseq_backend")]
12use crate::read_counters::RcuReadCounters;
13
14type RcuCallback = Box<dyn FnOnce() + Send + Sync + 'static>;
15
16struct RcuControlBlock {
17    /// The generation counter.
18    ///
19    /// The generation counter is incremented whenever the state machine leaves the `Idle` state.
20    generation: AtomicUsize,
21
22    /// The read counters.
23    ///
24    /// Readers increment the counter for the generation that they are reading from. For example,
25    /// if the `generation` is even, then readers increment the counter for the `read_counters[0]`.
26    /// If the `generation` is odd, then readers increment the counter for the `read_counters[1]`.
27    #[cfg(not(feature = "rseq_backend"))]
28    read_counters: [AtomicUsize; 2],
29
30    #[cfg(feature = "rseq_backend")]
31    read_counters: RcuReadCounters,
32
33    /// The chain of callbacks that are waiting to be run.
34    ///
35    /// Writers add callbacks to this chain after writing to the object. The callbacks are run when
36    /// all currently in-flight read operations have completed.
37    callback_chain: AtomicStack<RcuCallback>,
38
39    /// The futex used to wait for the state machine to advance.
40    advancer: zx::Futex,
41
42    /// Callbacks that are ready to run after the next grace period.
43    waiting_callbacks: Mutex<AtomicListIterator<RcuCallback>>,
44}
45
46const ADVANCER_IDLE: i32 = 0;
47const ADVANCER_WAITING: i32 = 1;
48
49impl RcuControlBlock {
50    /// Create a new control block for the RCU state machine.
51    const fn new() -> Self {
52        #[cfg(feature = "rseq_backend")]
53        let read_counters = RcuReadCounters::new();
54
55        #[cfg(not(feature = "rseq_backend"))]
56        let read_counters = [AtomicUsize::new(0), AtomicUsize::new(0)];
57
58        Self {
59            generation: AtomicUsize::new(0),
60            read_counters,
61            callback_chain: AtomicStack::new(),
62            advancer: zx::Futex::new(ADVANCER_IDLE),
63            waiting_callbacks: Mutex::new(AtomicListIterator::empty()),
64        }
65    }
66}
67
68/// The control block for the RCU state machine.
69static RCU_CONTROL_BLOCK: RcuControlBlock = RcuControlBlock::new();
70
71struct RcuThreadBlock {
72    /// The number of times the thread has nested into a read lock.
73    nesting_level: AtomicUsize,
74
75    /// The index of the read counter that the thread incremented when it entered its outermost read
76    /// lock.
77    counter_index: AtomicU8,
78
79    /// Whether this thread has scheduled callbacks since the last time the thread called
80    /// `rcu_synchronize`.
81    has_pending_callbacks: Cell<bool>,
82}
83
84impl RcuThreadBlock {
85    /// Returns true if the thread is holding a read lock.
86    fn holding_read_lock(&self) -> bool {
87        self.nesting_level.load(Ordering::Relaxed) > 0
88    }
89}
90
91impl Default for RcuThreadBlock {
92    fn default() -> Self {
93        #[cfg(feature = "rseq_backend")]
94        fuchsia_rseq::rseq_register_thread();
95
96        Self {
97            nesting_level: AtomicUsize::new(0),
98            counter_index: AtomicU8::new(0),
99            has_pending_callbacks: Cell::new(false),
100        }
101    }
102}
103
104impl Drop for RcuThreadBlock {
105    fn drop(&mut self) {
106        #[cfg(feature = "rseq_backend")]
107        fuchsia_rseq::rseq_unregister_thread();
108    }
109}
110
111thread_local! {
112    /// Thread-specific data for the RCU state machine.
113    ///
114    /// This data is used to track the nesting level of read locks and the index of the read counter
115    /// that the thread incremented when it entered its outermost read lock.
116    static RCU_THREAD_BLOCK: RcuThreadBlock = RcuThreadBlock::default();
117}
118
119/// Exposes the thread-local counters for RCU stall detection.
120pub fn with_thread_block_counters<F>(f: F)
121where
122    F: FnOnce(*const AtomicUsize, *const AtomicU8),
123{
124    RCU_THREAD_BLOCK.with(|thread_block| {
125        f(&thread_block.nesting_level as *const _, &thread_block.counter_index as *const _);
126    });
127}
128
129/// Acquire a read lock.
130///
131/// This function is used to acquire a read lock on the RCU state machine. The RCU state machine
132/// defers calling callbacks until all currently in-flight read operations have completed.
133///
134/// Must be balanced by a call to `rcu_read_unlock` on the same thread.
135pub(crate) fn rcu_read_lock() {
136    RCU_THREAD_BLOCK.with(|thread_block| {
137        let nesting_level = thread_block.nesting_level.load(Ordering::Relaxed);
138        if nesting_level > 0 {
139            // If this thread already has a read lock, increment the nesting level instead of the
140            // incrementing the read counter. This approach is a performance optimization to reduce
141            // the number of atomic operations that need to be performed.
142            thread_block.nesting_level.store(nesting_level + 1, Ordering::Relaxed);
143        } else {
144            // This is the outermost read lock. Increment the read counter.
145            let control_block = &RCU_CONTROL_BLOCK;
146
147            // There's a race here where we capture `index` and then go on to increment the read
148            // counter.  The choice of `index` here isn't actually important for correctness because
149            // we always wait at least two grace periods before calling the callbacks, so it doesn't
150            // matter which counter we increment.  It does mean that a thread waiting for the read
151            // counter to drop to zero, could actually find that the read counter increases before
152            // it eventually reaches zero, which should be fine.
153            let index = control_block.generation.load(Ordering::Relaxed) & 1;
154
155            #[cfg(feature = "rseq_backend")]
156            {
157                control_block.read_counters.begin(index);
158                std::sync::atomic::compiler_fence(Ordering::SeqCst);
159            }
160
161            #[cfg(not(feature = "rseq_backend"))]
162            {
163                // Synchronization point [A] (see design.md)
164                control_block.read_counters[index].fetch_add(1, Ordering::SeqCst);
165            }
166
167            thread_block.counter_index.store(index as u8, Ordering::Relaxed);
168            thread_block.nesting_level.store(1, Ordering::Relaxed);
169        }
170    });
171}
172
173/// Release a read lock.
174///
175/// This function is used to release a read lock on the RCU state machine. See `rcu_read_lock` for
176/// more details.
177pub(crate) fn rcu_read_unlock() {
178    RCU_THREAD_BLOCK.with(|thread_block| {
179        let nesting_level = thread_block.nesting_level.load(Ordering::Relaxed);
180        if nesting_level > 1 {
181            // If the nesting level is greater than 1, this is not the outermost read lock.
182            // Decrement the nesting level instead of the read counter.
183            thread_block.nesting_level.store(nesting_level - 1, Ordering::Relaxed);
184        } else {
185            // This is the outermost read lock. Decrement the read counter.
186            let index = thread_block.counter_index.load(Ordering::Relaxed) as usize;
187            let control_block = &RCU_CONTROL_BLOCK;
188
189            #[cfg(feature = "rseq_backend")]
190            {
191                std::sync::atomic::compiler_fence(Ordering::SeqCst);
192                control_block.read_counters.end(index);
193
194                // We cannot tell if this thread is the last thread to exit its read lock, so we
195                // always wake the advancer. The advancer will check if there are any active
196                // readers and will only advance the state machine if there are no active
197                // readers.
198                rcu_advancer_wake_all();
199            }
200
201            #[cfg(not(feature = "rseq_backend"))]
202            {
203                // Synchronization point [B] (see design.md)
204                let previous_count =
205                    control_block.read_counters[index].fetch_sub(1, Ordering::SeqCst);
206                if previous_count == 1 {
207                    rcu_advancer_wake_all();
208                }
209            }
210
211            thread_block.nesting_level.store(0, Ordering::Relaxed);
212            thread_block.counter_index.store(u8::MAX, Ordering::Relaxed);
213        }
214    });
215}
216
217/// Read the value of an RCU pointer.
218///
219/// This function cannot be called unless the current thread is holding a read lock. The returned
220/// pointer is valid until the read lock is released.
221pub(crate) fn rcu_read_pointer<T>(ptr: &AtomicPtr<T>) -> *const T {
222    // Synchronization point [D] (see design.md)
223    ptr.load(Ordering::Acquire)
224}
225
226/// Assign a new value to an RCU pointer.
227///
228/// Concurrent readers may continue to reference the old value of the pointer until the RCU state
229/// machine has made sufficient progress. To clean up the old value of the pointer, use `rcu_call`
230/// or `rcu_drop`, which defer processing until all in-flight read operations have completed.
231pub(crate) fn rcu_assign_pointer<T>(ptr: &AtomicPtr<T>, new_ptr: *mut T) {
232    // Synchronization point [E] (see design.md)
233    ptr.store(new_ptr, Ordering::Release);
234}
235
236/// Replace the value of an RCU pointer.
237///
238/// Concurrent readers may continue to reference the old value of the pointer until the RCU state
239/// machine has made sufficient progress. To clean up the old value of the pointer, use `rcu_call`
240/// or `rcu_drop`, which defer processing until all in-flight read operations have completed.
241pub(crate) fn rcu_replace_pointer<T>(ptr: &AtomicPtr<T>, new_ptr: *mut T) -> *mut T {
242    // Synchronization point [F] (see design.md)
243    ptr.swap(new_ptr, Ordering::AcqRel)
244}
245
246/// Call a callback to run after all in-flight read operations have completed.
247///
248/// To wait until the callback is ready to run, call `rcu_synchronize()`. The callback might be
249/// called from an arbitrary thread.
250///
251/// NOTE: The order in which callbacks are called is not guaranteed since they can be called
252/// concurrently from multiple threads.
253pub(crate) fn rcu_call(callback: impl FnOnce() + Send + Sync + 'static) {
254    RCU_THREAD_BLOCK.with(|block| {
255        block.has_pending_callbacks.set(true);
256    });
257
258    // We need to synchronize with rcu_read_lock.  We need to ensure that all prior stores are
259    // visible to threads that have called rcu_read_lock.  We must synchronize with both read
260    // counters using a store operation.  We don't need to change the value.
261    fence(Ordering::Release);
262
263    #[cfg(not(feature = "rseq_backend"))]
264    {
265        RCU_CONTROL_BLOCK.read_counters[0].fetch_add(0, Ordering::Relaxed);
266        RCU_CONTROL_BLOCK.read_counters[1].fetch_add(0, Ordering::Relaxed);
267    }
268
269    // Even though we push the callback to the front of the stack, we reverse the order of the stack
270    // when we pop the callbacks from the stack to ensure that the callbacks are run in the order in
271    // which they were scheduled.
272
273    // Synchronization point [G] (see design.md)
274    RCU_CONTROL_BLOCK.callback_chain.push_front(Box::new(callback));
275}
276
277/// Schedule the object to be dropped after all in-flight read operations have completed.
278///
279/// To wait until the object is dropped, call `rcu_synchronize()`. The object might be dropped from
280/// an arbitrary thread.
281pub fn rcu_drop<T: Send + Sync + 'static>(value: T) {
282    rcu_call(move || {
283        std::mem::drop(value);
284    });
285}
286
287/// Check if there are any active readers for the given generation.
288fn has_active_readers(generation: usize) -> bool {
289    let index = generation & 1;
290
291    #[cfg(feature = "rseq_backend")]
292    {
293        return RCU_CONTROL_BLOCK.read_counters.has_active(index);
294    }
295
296    #[cfg(not(feature = "rseq_backend"))]
297    {
298        // Synchronization point [C] (see design.md)
299        RCU_CONTROL_BLOCK.read_counters[index].load(Ordering::SeqCst) > 0
300    }
301}
302
303/// Wake up all the threads that are waiting to advance the state machine.
304///
305/// Does nothing if no threads are waiting.
306fn rcu_advancer_wake_all() {
307    let advancer = &RCU_CONTROL_BLOCK.advancer;
308    if advancer.load(Ordering::SeqCst) == ADVANCER_WAITING {
309        advancer.store(ADVANCER_IDLE, Ordering::Relaxed);
310        advancer.wake_all();
311    }
312}
313
314/// Blocks the current thread until all in-flight read operations have completed for the given
315/// generation.
316///
317/// Postcondition: The number of active readers for the given generation is zero and the advancer
318/// futex contains `ADVANCER_IDLE`.
319fn rcu_advancer_wait(generation: usize) {
320    let advancer = &RCU_CONTROL_BLOCK.advancer;
321    loop {
322        // In order to avoid a race with `rcu_advancer_wake_all`, we must store `ADVANCER_WAITING`
323        // before checking if there are any active readers.
324        //
325        // In the single total order, either this store or the last decrement to the reader counter
326        // must happen first.
327        //
328        //  (1) If this store happens first, then the last thread to decrement the reader counter
329        //      for this generation will observe `ADVANCER_WAITING` and will reset the value to
330        //      `ADVANCER_IDLE` and wake the futex, unblocking this thread.
331        //
332        //  (2) If the last decrement to the reader counter happens first, then this thread will see
333        //      that there are no active readers in this generation and avoid blocking on the futex.
334        advancer.store(ADVANCER_WAITING, Ordering::SeqCst);
335        if !has_active_readers(generation) {
336            break;
337        }
338        let _ = advancer.wait(ADVANCER_WAITING, None, zx::MonotonicInstant::INFINITE);
339    }
340    advancer.store(ADVANCER_IDLE, Ordering::SeqCst);
341}
342
343/// Advance the RCU state machine.
344///
345/// This function blocks until all in-flight read operations have completed for the current
346/// generation and all callbacks have been run.
347fn rcu_grace_period() {
348    let callbacks = {
349        let mut waiting_callbacks = RCU_CONTROL_BLOCK.waiting_callbacks.lock();
350
351        // We are in the *Idle* state.
352
353        // Swap out the callbacks that we can run when this grace period has passed with the
354        // callbacks that can run after the next period.
355        // Synchronization point [H] (see design.md)
356        let callbacks =
357            std::mem::replace(&mut *waiting_callbacks, RCU_CONTROL_BLOCK.callback_chain.take());
358
359        // Issue an IPI to all CPUs to force them to serialize their execution.
360        // This ensures that all prior stores by all writers are visible to
361        // any thread that subsequently enters an RCU read-side critical section.
362        #[cfg(feature = "rseq_backend")]
363        unsafe {
364            zx::sys::zx_membarrier_sync_process_data()
365        };
366
367        let generation = RCU_CONTROL_BLOCK.generation.fetch_add(1, Ordering::Relaxed);
368
369        // Enter the *Waiting* state.
370        rcu_advancer_wait(generation);
371
372        // Return to the *Idle* state.
373        callbacks
374    };
375
376    // We cannot control the order in which callbacks run since callbacks can be running on multiple
377    // threads concurrently.
378    for callback in callbacks {
379        callback();
380    }
381}
382
383/// Block until all in-flight read operations have completed.  When this returns, the callbacks that
384/// are unblocked by those in-flight operations might still be running (or even not yet started) on
385/// another thread.
386pub fn rcu_synchronize() {
387    RCU_THREAD_BLOCK.with(|block| {
388        assert!(!block.holding_read_lock());
389        block.has_pending_callbacks.set(false);
390    });
391
392    // We need to run at least two grace periods to flush out all pending callbacks.  See the
393    // comment in `rcu_read_lock` and the design to understand why.
394    rcu_grace_period();
395    rcu_grace_period();
396}
397
398/// If any callbacks have been scheduled from this thread, call `rcu_synchronize`.
399///
400/// If any callbacks have been scheduled from this thread, this function will block until the
401/// callbacks are unblocked and ready to be run (but have not yet necessarily finished, or even
402/// started).  If no callbacks have been scheduled from this thread, this function will return
403/// immediately.
404pub fn rcu_run_callbacks() {
405    RCU_THREAD_BLOCK.with(|block| {
406        assert!(!block.holding_read_lock());
407        if block.has_pending_callbacks.get() {
408            rcu_synchronize();
409        }
410    })
411}
412
413#[cfg(test)]
414mod tests {
415    use super::*;
416    use std::sync::Arc;
417    use std::sync::atomic::{AtomicBool, Ordering};
418
419    #[test]
420    fn test_rcu_delay_regression() {
421        // This test relies on the global RCU state machine.
422        // It verifies that callbacks are NOT executed immediately after one grace period.
423
424        let flag = Arc::new(AtomicBool::new(false));
425        let moved_flag = flag.clone();
426
427        rcu_call(move || {
428            moved_flag.store(true, Ordering::SeqCst);
429        });
430
431        rcu_grace_period();
432
433        assert!(
434            !flag.load(Ordering::SeqCst),
435            "Callback executed too early! RCU requires 2 grace periods delay."
436        );
437
438        rcu_grace_period();
439        assert!(flag.load(Ordering::SeqCst), "Callback should have executed after 2 grace periods");
440    }
441
442    #[test]
443    fn test_rcu_synchronize() {
444        // This test relies on the global RCU state machine.
445        // It verifies that rcu_synchronize() blocks until all callbacks have been run.
446
447        let flag = Arc::new(AtomicBool::new(false));
448        let moved_flag = flag.clone();
449
450        rcu_call(move || {
451            moved_flag.store(true, Ordering::SeqCst);
452        });
453
454        rcu_synchronize();
455        assert!(
456            flag.load(Ordering::SeqCst),
457            "Callback should have executed after rcu_synchronize()"
458        );
459    }
460
461    #[test]
462    fn test_rcu_run_callbacks() {
463        // This test relies on the global RCU state machine.
464        // It verifies that rcu_run_callbacks() blocks until all callbacks have been run.
465
466        let flag = Arc::new(AtomicBool::new(false));
467        let moved_flag = flag.clone();
468
469        rcu_call(move || {
470            moved_flag.store(true, Ordering::SeqCst);
471        });
472
473        rcu_run_callbacks();
474        assert!(
475            flag.load(Ordering::SeqCst),
476            "Callback should have executed after rcu_run_callbacks()"
477        );
478    }
479}