Skip to main content

fuchsia_rcu/
state_machine.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::atomic_stack::{AtomicListIterator, AtomicStack};
6use fuchsia_sync::Mutex;
7use std::cell::Cell;
8use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering, fence};
9use std::thread_local;
10
11#[cfg(feature = "rseq_backend")]
12use crate::read_counters::RcuReadCounters;
13
14type RcuCallback = Box<dyn FnOnce() + Send + Sync + 'static>;
15
16struct RcuControlBlock {
17    /// The generation counter.
18    ///
19    /// The generation counter is incremented whenever the state machine leaves the `Idle` state.
20    generation: AtomicUsize,
21
22    /// The read counters.
23    ///
24    /// Readers increment the counter for the generation that they are reading from. For example,
25    /// if the `generation` is even, then readers increment the counter for the `read_counters[0]`.
26    /// If the `generation` is odd, then readers increment the counter for the `read_counters[1]`.
27    #[cfg(not(feature = "rseq_backend"))]
28    read_counters: [AtomicUsize; 2],
29
30    #[cfg(feature = "rseq_backend")]
31    read_counters: RcuReadCounters,
32
33    /// The chain of callbacks that are waiting to be run.
34    ///
35    /// Writers add callbacks to this chain after writing to the object. The callbacks are run when
36    /// all currently in-flight read operations have completed.
37    callback_chain: AtomicStack<RcuCallback>,
38
39    /// The futex used to wait for the state machine to advance.
40    advancer: zx::Futex,
41
42    /// Callbacks that are ready to run after the next grace period.
43    waiting_callbacks: Mutex<AtomicListIterator<RcuCallback>>,
44}
45
46const ADVANCER_IDLE: i32 = 0;
47const ADVANCER_WAITING: i32 = 1;
48
49impl RcuControlBlock {
50    /// Create a new control block for the RCU state machine.
51    const fn new() -> Self {
52        #[cfg(feature = "rseq_backend")]
53        let read_counters = RcuReadCounters::new();
54
55        #[cfg(not(feature = "rseq_backend"))]
56        let read_counters = [AtomicUsize::new(0), AtomicUsize::new(0)];
57
58        Self {
59            generation: AtomicUsize::new(0),
60            read_counters,
61            callback_chain: AtomicStack::new(),
62            advancer: zx::Futex::new(ADVANCER_IDLE),
63            waiting_callbacks: Mutex::new(AtomicListIterator::empty()),
64        }
65    }
66}
67
68/// The control block for the RCU state machine.
69static RCU_CONTROL_BLOCK: RcuControlBlock = RcuControlBlock::new();
70
71struct RcuThreadBlock {
72    /// The number of times the thread has nested into a read lock.
73    nesting_level: Cell<usize>,
74
75    /// The index of the read counter that the thread incremented when it entered its outermost read
76    /// lock.
77    counter_index: Cell<u8>,
78
79    /// Whether this thread has scheduled callbacks since the last time the thread called
80    /// `rcu_synchronize`.
81    has_pending_callbacks: Cell<bool>,
82}
83
84impl RcuThreadBlock {
85    /// Returns true if the thread is holding a read lock.
86    fn holding_read_lock(&self) -> bool {
87        self.nesting_level.get() > 0
88    }
89}
90
91impl Default for RcuThreadBlock {
92    fn default() -> Self {
93        #[cfg(feature = "rseq_backend")]
94        fuchsia_rseq::rseq_register_thread().expect("failed to register thread");
95
96        Self {
97            nesting_level: Cell::new(0),
98            counter_index: Cell::new(0),
99            has_pending_callbacks: Cell::new(false),
100        }
101    }
102}
103
104impl Drop for RcuThreadBlock {
105    fn drop(&mut self) {
106        #[cfg(feature = "rseq_backend")]
107        fuchsia_rseq::rseq_unregister_thread().expect("failed to unregister thread");
108    }
109}
110
111thread_local! {
112    /// Thread-specific data for the RCU state machine.
113    ///
114    /// This data is used to track the nesting level of read locks and the index of the read counter
115    /// that the thread incremented when it entered its outermost read lock.
116    static RCU_THREAD_BLOCK: RcuThreadBlock = RcuThreadBlock::default();
117}
118
119/// Acquire a read lock.
120///
121/// This function is used to acquire a read lock on the RCU state machine. The RCU state machine
122/// defers calling callbacks until all currently in-flight read operations have completed.
123///
124/// Must be balanced by a call to `rcu_read_unlock` on the same thread.
125pub(crate) fn rcu_read_lock() {
126    RCU_THREAD_BLOCK.with(|thread_block| {
127        let nesting_level = thread_block.nesting_level.get();
128        if nesting_level > 0 {
129            // If this thread already has a read lock, increment the nesting level instead of the
130            // incrementing the read counter. This approach is a performance optimization to reduce
131            // the number of atomic operations that need to be performed.
132            thread_block.nesting_level.set(nesting_level + 1);
133        } else {
134            // This is the outermost read lock. Increment the read counter.
135            let control_block = &RCU_CONTROL_BLOCK;
136
137            // There's a race here where we capture `index` and then go on to increment the read
138            // counter.  The choice of `index` here isn't actually important for correctness because
139            // we always wait at least two grace periods before calling the callbacks, so it doesn't
140            // matter which counter we increment.  It does mean that a thread waiting for the read
141            // counter to drop to zero, could actually find that the read counter increases before
142            // it eventually reaches zero, which should be fine.
143            let index = control_block.generation.load(Ordering::Relaxed) & 1;
144
145            #[cfg(feature = "rseq_backend")]
146            {
147                control_block.read_counters.begin(index);
148                std::sync::atomic::compiler_fence(Ordering::SeqCst);
149            }
150
151            #[cfg(not(feature = "rseq_backend"))]
152            {
153                // Synchronization point [A] (see design.md)
154                control_block.read_counters[index].fetch_add(1, Ordering::SeqCst);
155            }
156
157            thread_block.counter_index.set(index as u8);
158            thread_block.nesting_level.set(1);
159        }
160    });
161}
162
163/// Release a read lock.
164///
165/// This function is used to release a read lock on the RCU state machine. See `rcu_read_lock` for
166/// more details.
167pub(crate) fn rcu_read_unlock() {
168    RCU_THREAD_BLOCK.with(|thread_block| {
169        let nesting_level = thread_block.nesting_level.get();
170        if nesting_level > 1 {
171            // If the nesting level is greater than 1, this is not the outermost read lock.
172            // Decrement the nesting level instead of the read counter.
173            thread_block.nesting_level.set(nesting_level - 1);
174        } else {
175            // This is the outermost read lock. Decrement the read counter.
176            let index = thread_block.counter_index.get() as usize;
177            let control_block = &RCU_CONTROL_BLOCK;
178
179            #[cfg(feature = "rseq_backend")]
180            {
181                std::sync::atomic::compiler_fence(Ordering::SeqCst);
182                control_block.read_counters.end(index);
183
184                // We cannot tell if this thread is the last thread to exit its read lock, so we
185                // always wake the advancer. The advancer will check if there are any active
186                // readers and will only advance the state machine if there are no active
187                // readers.
188                rcu_advancer_wake_all();
189            }
190
191            #[cfg(not(feature = "rseq_backend"))]
192            {
193                // Synchronization point [B] (see design.md)
194                let previous_count =
195                    control_block.read_counters[index].fetch_sub(1, Ordering::SeqCst);
196                if previous_count == 1 {
197                    rcu_advancer_wake_all();
198                }
199            }
200
201            thread_block.nesting_level.set(0);
202            thread_block.counter_index.set(u8::MAX);
203        }
204    });
205}
206
207/// Read the value of an RCU pointer.
208///
209/// This function cannot be called unless the current thread is holding a read lock. The returned
210/// pointer is valid until the read lock is released.
211pub(crate) fn rcu_read_pointer<T>(ptr: &AtomicPtr<T>) -> *const T {
212    // Synchronization point [D] (see design.md)
213    ptr.load(Ordering::Acquire)
214}
215
216/// Assign a new value to an RCU pointer.
217///
218/// Concurrent readers may continue to reference the old value of the pointer until the RCU state
219/// machine has made sufficient progress. To clean up the old value of the pointer, use `rcu_call`
220/// or `rcu_drop`, which defer processing until all in-flight read operations have completed.
221pub(crate) fn rcu_assign_pointer<T>(ptr: &AtomicPtr<T>, new_ptr: *mut T) {
222    // Synchronization point [E] (see design.md)
223    ptr.store(new_ptr, Ordering::Release);
224}
225
226/// Replace the value of an RCU pointer.
227///
228/// Concurrent readers may continue to reference the old value of the pointer until the RCU state
229/// machine has made sufficient progress. To clean up the old value of the pointer, use `rcu_call`
230/// or `rcu_drop`, which defer processing until all in-flight read operations have completed.
231pub(crate) fn rcu_replace_pointer<T>(ptr: &AtomicPtr<T>, new_ptr: *mut T) -> *mut T {
232    // Synchronization point [F] (see design.md)
233    ptr.swap(new_ptr, Ordering::AcqRel)
234}
235
236/// Call a callback to run after all in-flight read operations have completed.
237///
238/// To wait until the callback is ready to run, call `rcu_synchronize()`. The callback might be
239/// called from an arbitrary thread.
240///
241/// NOTE: The order in which callbacks are called is not guaranteed since they can be called
242/// concurrently from multiple threads.
243pub(crate) fn rcu_call(callback: impl FnOnce() + Send + Sync + 'static) {
244    RCU_THREAD_BLOCK.with(|block| {
245        block.has_pending_callbacks.set(true);
246    });
247
248    // We need to synchronize with rcu_read_lock.  We need to ensure that all prior stores are
249    // visible to threads that have called rcu_read_lock.  We must synchronize with both read
250    // counters using a store operation.  We don't need to change the value.
251    fence(Ordering::Release);
252
253    #[cfg(not(feature = "rseq_backend"))]
254    {
255        RCU_CONTROL_BLOCK.read_counters[0].fetch_add(0, Ordering::Relaxed);
256        RCU_CONTROL_BLOCK.read_counters[1].fetch_add(0, Ordering::Relaxed);
257    }
258
259    // Even though we push the callback to the front of the stack, we reverse the order of the stack
260    // when we pop the callbacks from the stack to ensure that the callbacks are run in the order in
261    // which they were scheduled.
262
263    // Synchronization point [G] (see design.md)
264    RCU_CONTROL_BLOCK.callback_chain.push_front(Box::new(callback));
265}
266
267/// Schedule the object to be dropped after all in-flight read operations have completed.
268///
269/// To wait until the object is dropped, call `rcu_synchronize()`. The object might be dropped from
270/// an arbitrary thread.
271pub fn rcu_drop<T: Send + Sync + 'static>(value: T) {
272    rcu_call(move || {
273        std::mem::drop(value);
274    });
275}
276
277/// Check if there are any active readers for the given generation.
278fn has_active_readers(generation: usize) -> bool {
279    let index = generation & 1;
280
281    #[cfg(feature = "rseq_backend")]
282    {
283        return RCU_CONTROL_BLOCK.read_counters.has_active(index);
284    }
285
286    #[cfg(not(feature = "rseq_backend"))]
287    {
288        // Synchronization point [C] (see design.md)
289        RCU_CONTROL_BLOCK.read_counters[index].load(Ordering::SeqCst) > 0
290    }
291}
292
293/// Wake up all the threads that are waiting to advance the state machine.
294///
295/// Does nothing if no threads are waiting.
296fn rcu_advancer_wake_all() {
297    let advancer = &RCU_CONTROL_BLOCK.advancer;
298    if advancer.load(Ordering::SeqCst) == ADVANCER_WAITING {
299        advancer.store(ADVANCER_IDLE, Ordering::Relaxed);
300        advancer.wake_all();
301    }
302}
303
304/// Blocks the current thread until all in-flight read operations have completed for the given
305/// generation.
306///
307/// Postcondition: The number of active readers for the given generation is zero and the advancer
308/// futex contains `ADVANCER_IDLE`.
309fn rcu_advancer_wait(generation: usize) {
310    let advancer = &RCU_CONTROL_BLOCK.advancer;
311    loop {
312        // In order to avoid a race with `rcu_advancer_wake_all`, we must store `ADVANCER_WAITING`
313        // before checking if there are any active readers.
314        //
315        // In the single total order, either this store or the last decrement to the reader counter
316        // must happen first.
317        //
318        //  (1) If this store happens first, then the last thread to decrement the reader counter
319        //      for this generation will observe `ADVANCER_WAITING` and will reset the value to
320        //      `ADVANCER_IDLE` and wake the futex, unblocking this thread.
321        //
322        //  (2) If the last decrement to the reader counter happens first, then this thread will see
323        //      that there are no active readers in this generation and avoid blocking on the futex.
324        advancer.store(ADVANCER_WAITING, Ordering::SeqCst);
325        if !has_active_readers(generation) {
326            break;
327        }
328        let _ = advancer.wait(ADVANCER_WAITING, None, zx::MonotonicInstant::INFINITE);
329    }
330    advancer.store(ADVANCER_IDLE, Ordering::SeqCst);
331}
332
333/// Advance the RCU state machine.
334///
335/// This function blocks until all in-flight read operations have completed for the current
336/// generation and all callbacks have been run.
337fn rcu_grace_period() {
338    let callbacks = {
339        let mut waiting_callbacks = RCU_CONTROL_BLOCK.waiting_callbacks.lock();
340
341        // We are in the *Idle* state.
342
343        // Swap out the callbacks that we can run when this grace period has passed with the
344        // callbacks that can run after the next period.
345        // Synchronization point [H] (see design.md)
346        let callbacks =
347            std::mem::replace(&mut *waiting_callbacks, RCU_CONTROL_BLOCK.callback_chain.take());
348
349        // Issue an IPI to all CPUs to force them to serialize their execution.
350        // This ensures that all prior stores by all writers are visible to
351        // any thread that subsequently enters an RCU read-side critical section.
352        #[cfg(feature = "rseq_backend")]
353        {
354            let status =
355                unsafe { zx::sys::zx_system_barrier(zx::sys::ZX_SYSTEM_BARRIER_DATA_MEMORY) };
356            debug_assert_eq!(status, zx::sys::ZX_OK);
357        }
358
359        let generation = RCU_CONTROL_BLOCK.generation.fetch_add(1, Ordering::Relaxed);
360
361        // Enter the *Waiting* state.
362        rcu_advancer_wait(generation);
363
364        // Return to the *Idle* state.
365        callbacks
366    };
367
368    // We cannot control the order in which callbacks run since callbacks can be running on multiple
369    // threads concurrently.
370    for callback in callbacks {
371        callback();
372    }
373}
374
375/// Block until all in-flight read operations have completed.  When this returns, the callbacks that
376/// are unblocked by those in-flight operations might still be running (or even not yet started) on
377/// another thread.
378pub fn rcu_synchronize() {
379    RCU_THREAD_BLOCK.with(|block| {
380        assert!(!block.holding_read_lock());
381        block.has_pending_callbacks.set(false);
382    });
383
384    // We need to run at least two grace periods to flush out all pending callbacks.  See the
385    // comment in `rcu_read_lock` and the design to understand why.
386    rcu_grace_period();
387    rcu_grace_period();
388}
389
390/// If any callbacks have been scheduled from this thread, call `rcu_synchronize`.
391///
392/// If any callbacks have been scheduled from this thread, this function will block until the
393/// callbacks are unblocked and ready to be run (but have not yet necessarily finished, or even
394/// started).  If no callbacks have been scheduled from this thread, this function will return
395/// immediately.
396pub fn rcu_run_callbacks() {
397    RCU_THREAD_BLOCK.with(|block| {
398        assert!(!block.holding_read_lock());
399        if block.has_pending_callbacks.get() {
400            rcu_synchronize();
401        }
402    })
403}
404
405#[cfg(test)]
406mod tests {
407    use super::*;
408    use std::sync::Arc;
409    use std::sync::atomic::{AtomicBool, Ordering};
410
411    #[test]
412    fn test_rcu_delay_regression() {
413        // This test relies on the global RCU state machine.
414        // It verifies that callbacks are NOT executed immediately after one grace period.
415
416        let flag = Arc::new(AtomicBool::new(false));
417        let moved_flag = flag.clone();
418
419        rcu_call(move || {
420            moved_flag.store(true, Ordering::SeqCst);
421        });
422
423        rcu_grace_period();
424
425        assert!(
426            !flag.load(Ordering::SeqCst),
427            "Callback executed too early! RCU requires 2 grace periods delay."
428        );
429
430        rcu_grace_period();
431        assert!(flag.load(Ordering::SeqCst), "Callback should have executed after 2 grace periods");
432    }
433
434    #[test]
435    fn test_rcu_synchronize() {
436        // This test relies on the global RCU state machine.
437        // It verifies that rcu_synchronize() blocks until all callbacks have been run.
438
439        let flag = Arc::new(AtomicBool::new(false));
440        let moved_flag = flag.clone();
441
442        rcu_call(move || {
443            moved_flag.store(true, Ordering::SeqCst);
444        });
445
446        rcu_synchronize();
447        assert!(
448            flag.load(Ordering::SeqCst),
449            "Callback should have executed after rcu_synchronize()"
450        );
451    }
452
453    #[test]
454    fn test_rcu_run_callbacks() {
455        // This test relies on the global RCU state machine.
456        // It verifies that rcu_run_callbacks() blocks until all callbacks have been run.
457
458        let flag = Arc::new(AtomicBool::new(false));
459        let moved_flag = flag.clone();
460
461        rcu_call(move || {
462            moved_flag.store(true, Ordering::SeqCst);
463        });
464
465        rcu_run_callbacks();
466        assert!(
467            flag.load(Ordering::SeqCst),
468            "Callback should have executed after rcu_run_callbacks()"
469        );
470    }
471}