Skip to main content

fuchsia_rcu/
state_machine.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::atomic_stack::{AtomicListIterator, AtomicStack};
6use fuchsia_sync::Mutex;
7use std::cell::Cell;
8use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering, fence};
9use std::thread_local;
10
11#[cfg(feature = "rseq_backend")]
12use crate::read_counters::RcuReadCounters;
13
14type RcuCallback = Box<dyn FnOnce() + Send + Sync + 'static>;
15
16struct RcuControlBlock {
17    /// The generation counter.
18    ///
19    /// The generation counter is incremented whenever the state machine leaves the `Idle` state.
20    generation: AtomicUsize,
21
22    /// The read counters.
23    ///
24    /// Readers increment the counter for the generation that they are reading from. For example,
25    /// if the `generation` is even, then readers increment the counter for the `read_counters[0]`.
26    /// If the `generation` is odd, then readers increment the counter for the `read_counters[1]`.
27    #[cfg(not(feature = "rseq_backend"))]
28    read_counters: [AtomicUsize; 2],
29
30    #[cfg(feature = "rseq_backend")]
31    read_counters: RcuReadCounters,
32
33    /// The chain of callbacks that are waiting to be run.
34    ///
35    /// Writers add callbacks to this chain after writing to the object. The callbacks are run when
36    /// all currently in-flight read operations have completed.
37    callback_chain: AtomicStack<RcuCallback>,
38
39    /// The futex used to wait for the state machine to advance.
40    advancer: zx::Futex,
41
42    /// Callbacks that are ready to run after the next grace period.
43    waiting_callbacks: Mutex<AtomicListIterator<RcuCallback>>,
44}
45
46const ADVANCER_IDLE: i32 = 0;
47const ADVANCER_WAITING: i32 = 1;
48
49impl RcuControlBlock {
50    /// Create a new control block for the RCU state machine.
51    const fn new() -> Self {
52        #[cfg(feature = "rseq_backend")]
53        let read_counters = RcuReadCounters::new();
54
55        #[cfg(not(feature = "rseq_backend"))]
56        let read_counters = [AtomicUsize::new(0), AtomicUsize::new(0)];
57
58        Self {
59            generation: AtomicUsize::new(0),
60            read_counters,
61            callback_chain: AtomicStack::new(),
62            advancer: zx::Futex::new(ADVANCER_IDLE),
63            waiting_callbacks: Mutex::new(AtomicListIterator::empty()),
64        }
65    }
66}
67
68/// The control block for the RCU state machine.
69static RCU_CONTROL_BLOCK: RcuControlBlock = RcuControlBlock::new();
70
71#[derive(Default)]
72struct RcuThreadBlock {
73    /// The number of times the thread has nested into a read lock.
74    nesting_level: Cell<usize>,
75
76    /// The index of the read counter that the thread incremented when it entered its outermost read
77    /// lock.
78    counter_index: Cell<u8>,
79
80    /// Whether this thread has scheduled callbacks since the last time the thread called
81    /// `rcu_synchronize`.
82    has_pending_callbacks: Cell<bool>,
83}
84
85impl RcuThreadBlock {
86    /// Returns true if the thread is holding a read lock.
87    fn holding_read_lock(&self) -> bool {
88        self.nesting_level.get() > 0
89    }
90}
91
92thread_local! {
93    /// Thread-specific data for the RCU state machine.
94    ///
95    /// This data is used to track the nesting level of read locks and the index of the read counter
96    /// that the thread incremented when it entered its outermost read lock.
97    static RCU_THREAD_BLOCK: RcuThreadBlock = RcuThreadBlock::default();
98}
99
100/// Acquire a read lock.
101///
102/// This function is used to acquire a read lock on the RCU state machine. The RCU state machine
103/// defers calling callbacks until all currently in-flight read operations have completed.
104///
105/// Must be balanced by a call to `rcu_read_unlock` on the same thread.
106pub(crate) fn rcu_read_lock() {
107    RCU_THREAD_BLOCK.with(|thread_block| {
108        let nesting_level = thread_block.nesting_level.get();
109        if nesting_level > 0 {
110            // If this thread already has a read lock, increment the nesting level instead of the
111            // incrementing the read counter. This approach is a performance optimization to reduce
112            // the number of atomic operations that need to be performed.
113            thread_block.nesting_level.set(nesting_level + 1);
114        } else {
115            // This is the outermost read lock. Increment the read counter.
116            let control_block = &RCU_CONTROL_BLOCK;
117
118            // There's a race here where we capture `index` and then go on to increment the read
119            // counter.  The choice of `index` here isn't actually important for correctness because
120            // we always wait at least two grace periods before calling the callbacks, so it doesn't
121            // matter which counter we increment.  It does mean that a thread waiting for the read
122            // counter to drop to zero, could actually find that the read counter increases before
123            // it eventually reaches zero, which should be fine.
124            let index = control_block.generation.load(Ordering::Relaxed) & 1;
125
126            #[cfg(feature = "rseq_backend")]
127            {
128                control_block.read_counters.begin(index);
129                std::sync::atomic::compiler_fence(Ordering::SeqCst);
130            }
131
132            #[cfg(not(feature = "rseq_backend"))]
133            {
134                // Synchronization point [A] (see design.md)
135                control_block.read_counters[index].fetch_add(1, Ordering::SeqCst);
136            }
137
138            thread_block.counter_index.set(index as u8);
139            thread_block.nesting_level.set(1);
140        }
141    });
142}
143
144/// Release a read lock.
145///
146/// This function is used to release a read lock on the RCU state machine. See `rcu_read_lock` for
147/// more details.
148pub(crate) fn rcu_read_unlock() {
149    RCU_THREAD_BLOCK.with(|thread_block| {
150        let nesting_level = thread_block.nesting_level.get();
151        if nesting_level > 1 {
152            // If the nesting level is greater than 1, this is not the outermost read lock.
153            // Decrement the nesting level instead of the read counter.
154            thread_block.nesting_level.set(nesting_level - 1);
155        } else {
156            // This is the outermost read lock. Decrement the read counter.
157            let index = thread_block.counter_index.get() as usize;
158            let control_block = &RCU_CONTROL_BLOCK;
159
160            #[cfg(feature = "rseq_backend")]
161            {
162                std::sync::atomic::compiler_fence(Ordering::SeqCst);
163                control_block.read_counters.end(index);
164
165                // We cannot tell if this thread is the last thread to exit its read lock, so we
166                // always wake the advancer. The advancer will check if there are any active
167                // readers and will only advance the state machine if there are no active
168                // readers.
169                rcu_advancer_wake_all();
170            }
171
172            #[cfg(not(feature = "rseq_backend"))]
173            {
174                // Synchronization point [B] (see design.md)
175                let previous_count =
176                    control_block.read_counters[index].fetch_sub(1, Ordering::SeqCst);
177                if previous_count == 1 {
178                    rcu_advancer_wake_all();
179                }
180            }
181
182            thread_block.nesting_level.set(0);
183            thread_block.counter_index.set(u8::MAX);
184        }
185    });
186}
187
188/// Read the value of an RCU pointer.
189///
190/// This function cannot be called unless the current thread is holding a read lock. The returned
191/// pointer is valid until the read lock is released.
192pub(crate) fn rcu_read_pointer<T>(ptr: &AtomicPtr<T>) -> *const T {
193    // Synchronization point [D] (see design.md)
194    ptr.load(Ordering::Acquire)
195}
196
197/// Assign a new value to an RCU pointer.
198///
199/// Concurrent readers may continue to reference the old value of the pointer until the RCU state
200/// machine has made sufficient progress. To clean up the old value of the pointer, use `rcu_call`
201/// or `rcu_drop`, which defer processing until all in-flight read operations have completed.
202pub(crate) fn rcu_assign_pointer<T>(ptr: &AtomicPtr<T>, new_ptr: *mut T) {
203    // Synchronization point [E] (see design.md)
204    ptr.store(new_ptr, Ordering::Release);
205}
206
207/// Replace the value of an RCU pointer.
208///
209/// Concurrent readers may continue to reference the old value of the pointer until the RCU state
210/// machine has made sufficient progress. To clean up the old value of the pointer, use `rcu_call`
211/// or `rcu_drop`, which defer processing until all in-flight read operations have completed.
212pub(crate) fn rcu_replace_pointer<T>(ptr: &AtomicPtr<T>, new_ptr: *mut T) -> *mut T {
213    // Synchronization point [F] (see design.md)
214    ptr.swap(new_ptr, Ordering::AcqRel)
215}
216
217/// Call a callback to run after all in-flight read operations have completed.
218///
219/// To wait until the callback is ready to run, call `rcu_synchronize()`. The callback might be
220/// called from an arbitrary thread.
221///
222/// NOTE: The order in which callbacks are called is not guaranteed since they can be called
223/// concurrently from multiple threads.
224pub(crate) fn rcu_call(callback: impl FnOnce() + Send + Sync + 'static) {
225    RCU_THREAD_BLOCK.with(|block| {
226        block.has_pending_callbacks.set(true);
227    });
228
229    // We need to synchronize with rcu_read_lock.  We need to ensure that all prior stores are
230    // visible to threads that have called rcu_read_lock.  We must synchronize with both read
231    // counters using a store operation.  We don't need to change the value.
232    fence(Ordering::Release);
233    RCU_CONTROL_BLOCK.read_counters[0].fetch_add(0, Ordering::Relaxed);
234    RCU_CONTROL_BLOCK.read_counters[1].fetch_add(0, Ordering::Relaxed);
235
236    // Even though we push the callback to the front of the stack, we reverse the order of the stack
237    // when we pop the callbacks from the stack to ensure that the callbacks are run in the order in
238    // which they were scheduled.
239
240    // Synchronization point [G] (see design.md)
241    RCU_CONTROL_BLOCK.callback_chain.push_front(Box::new(callback));
242}
243
244/// Schedule the object to be dropped after all in-flight read operations have completed.
245///
246/// To wait until the object is dropped, call `rcu_synchronize()`. The object might be dropped from
247/// an arbitrary thread.
248pub fn rcu_drop<T: Send + Sync + 'static>(value: T) {
249    rcu_call(move || {
250        std::mem::drop(value);
251    });
252}
253
254/// Check if there are any active readers for the given generation.
255fn has_active_readers(generation: usize) -> bool {
256    let index = generation & 1;
257
258    #[cfg(feature = "rseq_backend")]
259    {
260        return RCU_CONTROL_BLOCK.read_counters.has_active(index);
261    }
262
263    #[cfg(not(feature = "rseq_backend"))]
264    {
265        // Synchronization point [C] (see design.md)
266        RCU_CONTROL_BLOCK.read_counters[index].load(Ordering::SeqCst) > 0
267    }
268}
269
270/// Wake up all the threads that are waiting to advance the state machine.
271///
272/// Does nothing if no threads are waiting.
273fn rcu_advancer_wake_all() {
274    let advancer = &RCU_CONTROL_BLOCK.advancer;
275    if advancer.load(Ordering::SeqCst) == ADVANCER_WAITING {
276        advancer.store(ADVANCER_IDLE, Ordering::Relaxed);
277        advancer.wake_all();
278    }
279}
280
281/// Blocks the current thread until all in-flight read operations have completed for the given
282/// generation.
283///
284/// Postcondition: The number of active readers for the given generation is zero and the advancer
285/// futex contains `ADVANCER_IDLE`.
286fn rcu_advancer_wait(generation: usize) {
287    let advancer = &RCU_CONTROL_BLOCK.advancer;
288    loop {
289        // In order to avoid a race with `rcu_advancer_wake_all`, we must store `ADVANCER_WAITING`
290        // before checking if there are any active readers.
291        //
292        // In the single total order, either this store or the last decrement to the reader counter
293        // must happen first.
294        //
295        //  (1) If this store happens first, then the last thread to decrement the reader counter
296        //      for this generation will observe `ADVANCER_WAITING` and will reset the value to
297        //      `ADVANCER_IDLE` and wake the futex, unblocking this thread.
298        //
299        //  (2) If the last decrement to the reader counter happens first, then this thread will see
300        //      that there are no active readers in this generation and avoid blocking on the futex.
301        advancer.store(ADVANCER_WAITING, Ordering::SeqCst);
302        if !has_active_readers(generation) {
303            break;
304        }
305        let _ = advancer.wait(ADVANCER_WAITING, None, zx::MonotonicInstant::INFINITE);
306    }
307    advancer.store(ADVANCER_IDLE, Ordering::SeqCst);
308}
309
310/// Advance the RCU state machine.
311///
312/// This function blocks until all in-flight read operations have completed for the current
313/// generation and all callbacks have been run.
314fn rcu_grace_period() {
315    let callbacks = {
316        let mut waiting_callbacks = RCU_CONTROL_BLOCK.waiting_callbacks.lock();
317
318        // We are in the *Idle* state.
319
320        // Swap out the callbacks that we can run when this grace period has passed with the
321        // callbacks that can run after the next period.
322        // Synchronization point [H] (see design.md)
323        let callbacks =
324            std::mem::replace(&mut *waiting_callbacks, RCU_CONTROL_BLOCK.callback_chain.take());
325        let generation = RCU_CONTROL_BLOCK.generation.fetch_add(1, Ordering::Relaxed);
326
327        // Enter the *Waiting* state.
328        rcu_advancer_wait(generation);
329
330        // Return to the *Idle* state.
331        callbacks
332    };
333
334    // We cannot control the order in which callbacks run since callbacks can be running on multiple
335    // threads concurrently.
336    for callback in callbacks {
337        callback();
338    }
339}
340
341/// Block until all in-flight read operations have completed.  When this returns, the callbacks that
342/// are unblocked by those in-flight operations might still be running (or even not yet started) on
343/// another thread.
344pub fn rcu_synchronize() {
345    RCU_THREAD_BLOCK.with(|block| {
346        assert!(!block.holding_read_lock());
347        block.has_pending_callbacks.set(false);
348    });
349
350    // We need to run at least two grace periods to flush out all pending callbacks.  See the
351    // comment in `rcu_read_lock` and the design to understand why.
352    rcu_grace_period();
353    rcu_grace_period();
354}
355
356/// If any callbacks have been scheduled from this thread, call `rcu_synchronize`.
357///
358/// If any callbacks have been scheduled from this thread, this function will block until the
359/// callbacks are unblocked and ready to be run (but have not yet necessarily finished, or even
360/// started).  If no callbacks have been scheduled from this thread, this function will return
361/// immediately.
362pub fn rcu_run_callbacks() {
363    RCU_THREAD_BLOCK.with(|block| {
364        assert!(!block.holding_read_lock());
365        if block.has_pending_callbacks.get() {
366            rcu_synchronize();
367        }
368    })
369}
370
371#[cfg(test)]
372mod tests {
373    use super::*;
374    use std::sync::Arc;
375    use std::sync::atomic::{AtomicBool, Ordering};
376
377    #[test]
378    fn test_rcu_delay_regression() {
379        // This test relies on the global RCU state machine.
380        // It verifies that callbacks are NOT executed immediately after one grace period.
381
382        let flag = Arc::new(AtomicBool::new(false));
383        let moved_flag = flag.clone();
384
385        rcu_call(move || {
386            moved_flag.store(true, Ordering::SeqCst);
387        });
388
389        rcu_grace_period();
390
391        assert!(
392            !flag.load(Ordering::SeqCst),
393            "Callback executed too early! RCU requires 2 grace periods delay."
394        );
395
396        rcu_grace_period();
397        assert!(flag.load(Ordering::SeqCst), "Callback should have executed after 2 grace periods");
398    }
399
400    #[test]
401    fn test_rcu_synchronize() {
402        // This test relies on the global RCU state machine.
403        // It verifies that rcu_synchronize() blocks until all callbacks have been run.
404
405        let flag = Arc::new(AtomicBool::new(false));
406        let moved_flag = flag.clone();
407
408        rcu_call(move || {
409            moved_flag.store(true, Ordering::SeqCst);
410        });
411
412        rcu_synchronize();
413        assert!(
414            flag.load(Ordering::SeqCst),
415            "Callback should have executed after rcu_synchronize()"
416        );
417    }
418
419    #[test]
420    fn test_rcu_run_callbacks() {
421        // This test relies on the global RCU state machine.
422        // It verifies that rcu_run_callbacks() blocks until all callbacks have been run.
423
424        let flag = Arc::new(AtomicBool::new(false));
425        let moved_flag = flag.clone();
426
427        rcu_call(move || {
428            moved_flag.store(true, Ordering::SeqCst);
429        });
430
431        rcu_run_callbacks();
432        assert!(
433            flag.load(Ordering::SeqCst),
434            "Callback should have executed after rcu_run_callbacks()"
435        );
436    }
437}