fuchsia_rcu/
state_machine.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::atomic_stack::AtomicStack;
6use fuchsia_sync::Mutex;
7use std::cell::Cell;
8use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
9use std::thread_local;
10
11type RcuCallback = Box<dyn FnOnce() + Send + Sync + 'static>;
12
13/// The length of the queue of waiting callbacks.
14///
15/// The state machine waits for this many generations to complete before running these callbacks.
16const QUEUE_LENGTH: usize = 2;
17
18/// The queue of waiting callbacks.
19///
20/// The queue is a ring buffer of sets of callbacks of length `QUEUE_LENGTH`.
21struct CallbackQueue {
22    /// The index at which to add the next set of callbacks.
23    index: usize,
24
25    /// The callbacks that are waiting to be run.
26    ///
27    /// The callbacks are stored in a ring buffer.
28    callbacks: [Vec<RcuCallback>; QUEUE_LENGTH],
29}
30
31impl CallbackQueue {
32    /// Create an empty callback queue.
33    const fn new() -> Self {
34        Self { index: 0, callbacks: [Vec::new(), Vec::new()] }
35    }
36
37    /// Add a set of callbacks to the back of the queue.
38    ///
39    /// The caller is responsible for ensuring that there is an empty slot in the ring buffer to
40    /// store the callbacks.
41    fn push_back(&mut self, callbacks: Vec<RcuCallback>) {
42        assert!(self.callbacks[self.index].is_empty());
43        self.callbacks[self.index] = callbacks;
44        self.index = (self.index + 1) % QUEUE_LENGTH;
45    }
46
47    /// Pop the front set of callbacks from the queue.
48    ///
49    /// If the queue is empty, this function returns an empty vector.
50    fn pop_front(&mut self) -> Vec<RcuCallback> {
51        self.index = (self.index + 1) % QUEUE_LENGTH;
52        std::mem::take(&mut self.callbacks[self.index])
53    }
54}
55
56struct RcuControlBlock {
57    /// The generation counter.
58    ///
59    /// The generation counter is incremented whenever the state machine leaves the `Idle` state.
60    generation: AtomicUsize,
61
62    /// The read counters.
63    ///
64    /// Readers increment the counter for the generation that they are reading from. For example,
65    /// if the `generation` is even, then readers increment the counter for the `read_counters[0]`.
66    /// If the `generation` is odd, then readers increment the counter for the `read_counters[1]`.
67    read_counters: [AtomicUsize; 2],
68
69    /// The chain of callbacks that are waiting to be run.
70    ///
71    /// Writers add callbacks to this chain after writing to the object. The callbacks are run when
72    /// all currently in-flight read operations have completed.
73    callback_chain: AtomicStack<RcuCallback>,
74
75    /// The futex used to wait for the state machine to advance.
76    advancer: zx::Futex,
77
78    /// The queue of waiting callbacks.
79    ///
80    /// Callbacks are added to this queue when the state machine leaves the `Idle` state. They are
81    /// run when the state machine leaves the `Waiting` state after `QUEUE_LENGTH` generations
82    /// have completed.
83    waiting_callbacks: Mutex<CallbackQueue>,
84}
85
86const ADVANCER_IDLE: i32 = 0;
87const ADVANCER_WAITING: i32 = 1;
88
89impl RcuControlBlock {
90    /// Create a new control block for the RCU state machine.
91    const fn new() -> Self {
92        Self {
93            generation: AtomicUsize::new(0),
94            read_counters: [AtomicUsize::new(0), AtomicUsize::new(0)],
95            callback_chain: AtomicStack::new(),
96            advancer: zx::Futex::new(ADVANCER_IDLE),
97            waiting_callbacks: Mutex::new(CallbackQueue::new()),
98        }
99    }
100}
101
102/// The control block for the RCU state machine.
103static RCU_CONTROL_BLOCK: RcuControlBlock = RcuControlBlock::new();
104
105#[derive(Default)]
106struct RcuThreadBlock {
107    /// The number of times the thread has nested into a read lock.
108    nesting_level: Cell<usize>,
109
110    /// The index of the read counter that the thread incremented when it entered its outermost read
111    /// lock.
112    counter_index: Cell<u8>,
113
114    /// Whether this thread has scheduled callbacks since the last time the thread called
115    /// `rcu_synchronize`.
116    has_pending_callbacks: Cell<bool>,
117}
118
119impl RcuThreadBlock {
120    /// Returns true if the thread is holding a read lock.
121    fn holding_read_lock(&self) -> bool {
122        self.nesting_level.get() > 0
123    }
124}
125
126thread_local! {
127    /// Thread-specific data for the RCU state machine.
128    ///
129    /// This data is used to track the nesting level of read locks and the index of the read counter
130    /// that the thread incremented when it entered its outermost read lock.
131    static RCU_THREAD_BLOCK: RcuThreadBlock = RcuThreadBlock::default();
132}
133
134/// Acquire a read lock.
135///
136/// This function is used to acquire a read lock on the RCU state machine. The RCU state machine
137/// defers calling callbacks until all currently in-flight read operations have completed.
138///
139/// Must be balanced by a call to `rcu_read_unlock` on the same thread.
140pub(crate) fn rcu_read_lock() {
141    RCU_THREAD_BLOCK.with(|block| {
142        let nesting_level = block.nesting_level.get();
143        if nesting_level > 0 {
144            // If this thread already has a read lock, increment the nesting level instead of the
145            // incrementing the read counter. This approach is a performance optimization to reduce
146            // the number of atomic operations that need to be performed.
147            block.nesting_level.set(nesting_level + 1);
148        } else {
149            // This is the outermost read lock. Increment the read counter.
150            let index = RCU_CONTROL_BLOCK.generation.load(Ordering::Relaxed) & 1;
151            // Synchronization point [A] (see design.md)
152            RCU_CONTROL_BLOCK.read_counters[index].fetch_add(1, Ordering::SeqCst);
153            block.counter_index.set(index as u8);
154            block.nesting_level.set(1);
155        }
156    });
157}
158
159/// Release a read lock.
160///
161/// This function is used to release a read lock on the RCU state machine. See `rcu_read_lock` for
162/// more details.
163pub(crate) fn rcu_read_unlock() {
164    RCU_THREAD_BLOCK.with(|block| {
165        let nesting_level = block.nesting_level.get();
166        if nesting_level > 1 {
167            // If the nesting level is greater than 1, this is not the outermost read lock.
168            // Decrement the nesting level instead of the read counter.
169            block.nesting_level.set(nesting_level - 1);
170        } else {
171            // This is the outermost read lock. Decrement the read counter.
172            let index = block.counter_index.get() as usize;
173            // Synchronization point [B] (see design.md)
174            let previous_count =
175                RCU_CONTROL_BLOCK.read_counters[index].fetch_sub(1, Ordering::SeqCst);
176            if previous_count == 1 {
177                rcu_advancer_wake_all();
178            }
179            block.nesting_level.set(0);
180            block.counter_index.set(u8::MAX);
181        }
182    });
183}
184
185/// Read the value of an RCU pointer.
186///
187/// This function cannot be called unless the current thread is holding a read lock. The returned
188/// pointer is valid until the read lock is released.
189pub(crate) fn rcu_read_pointer<T>(ptr: &AtomicPtr<T>) -> *const T {
190    // Synchronization point [D] (see design.md)
191    ptr.load(Ordering::Acquire)
192}
193
194/// Assign a new value to an RCU pointer.
195///
196/// Concurrent readers may continue to reference the old value of the pointer until the RCU state
197/// machine has made sufficient progress. To clean up the old value of the pointer, use `rcu_call`
198/// or `rcu_drop`, which defer processing until all in-flight read operations have completed.
199pub(crate) fn rcu_assign_pointer<T>(ptr: &AtomicPtr<T>, new_ptr: *mut T) {
200    // Synchronization point [E] (see design.md)
201    ptr.store(new_ptr, Ordering::Release);
202}
203
204/// Replace the value of an RCU pointer.
205///
206/// Concurrent readers may continue to reference the old value of the pointer until the RCU state
207/// machine has made sufficient progress. To clean up the old value of the pointer, use `rcu_call`
208/// or `rcu_drop`, which defer processing until all in-flight read operations have completed.
209pub(crate) fn rcu_replace_pointer<T>(ptr: &AtomicPtr<T>, new_ptr: *mut T) -> *mut T {
210    // Synchronization point [F] (see design.md)
211    ptr.swap(new_ptr, Ordering::AcqRel)
212}
213
214/// Call a callback to run after all in-flight read operations have completed.
215///
216/// To wait until the callback is run, call `rcu_synchronize()`. The callback might be called from
217/// an arbitrary thread.
218pub(crate) fn rcu_call(callback: impl FnOnce() + Send + Sync + 'static) {
219    RCU_THREAD_BLOCK.with(|block| {
220        block.has_pending_callbacks.set(true);
221    });
222
223    // Even though we push the callback to the front of the stack, we reverse the order of the stack
224    // when we pop the callbacks from the stack to ensure that the callbacks are run in the order in
225    // which they were scheduled.
226
227    // Synchronization point [G] (see design.md)
228    RCU_CONTROL_BLOCK.callback_chain.push_front(Box::new(callback));
229}
230
231/// Schedule the object to be dropped after all in-flight read operations have completed.
232///
233/// To wait until the object is dropped, call `rcu_synchronize()`. The object might be dropped from
234/// an arbitrary thread.
235pub fn rcu_drop<T: Send + Sync + 'static>(value: T) {
236    rcu_call(move || {
237        std::mem::drop(value);
238    });
239}
240
241/// Check if there are any active readers for the given generation.
242fn has_active_readers(generation: usize) -> bool {
243    let i = generation & 1;
244    // Synchronization point [C] (see design.md)
245    RCU_CONTROL_BLOCK.read_counters[i].load(Ordering::SeqCst) > 0
246}
247
248/// Wake up all the threads that are waiting to advance the state machine.
249///
250/// Does nothing if no threads are waiting.
251fn rcu_advancer_wake_all() {
252    let advancer = &RCU_CONTROL_BLOCK.advancer;
253    if advancer.load(Ordering::SeqCst) == ADVANCER_WAITING {
254        advancer.store(ADVANCER_IDLE, Ordering::Relaxed);
255        advancer.wake_all();
256    }
257}
258
259/// Blocks the current thread until all in-flight read operations have completed for the given
260/// generation.
261///
262/// Postcondition: The number of active readers for the given generation is zero and the advancer
263/// futex contains `ADVANCER_IDLE`.
264fn rcu_advancer_wait(generation: usize) {
265    let advancer = &RCU_CONTROL_BLOCK.advancer;
266    loop {
267        // In order to avoid a race with `rcu_advancer_wake_all`, we must store `ADVANCER_WAITING`
268        // before checking if there are any active readers.
269        //
270        // In the single total order, either this store or the last decrement to the reader counter
271        // must happen first.
272        //
273        //  (1) If this store happens first, then the last thread to decrement the reader counter
274        //      for this generation will observe `ADVANCER_WAITING` and will reset the value to
275        //      `ADVANCER_IDLE` and wake the futex, unblocking this thread.
276        //
277        //  (2) If the last decrement to the reader counter happens first, then this thread will see
278        //      that there are no active readers in this generation and avoid blocking on the futex.
279        advancer.store(ADVANCER_WAITING, Ordering::SeqCst);
280        if !has_active_readers(generation) {
281            break;
282        }
283        let _ = advancer.wait(ADVANCER_WAITING, None, zx::MonotonicInstant::INFINITE);
284    }
285    advancer.store(ADVANCER_IDLE, Ordering::SeqCst);
286}
287
288/// Advance the RCU state machine.
289///
290/// This function blocks until all in-flight read operations have completed for the current
291/// generation and all callbacks have been run.
292fn rcu_grace_period() {
293    let callbacks = {
294        let mut waiting_callbacks = RCU_CONTROL_BLOCK.waiting_callbacks.lock();
295        // We are in the *Idle* state.
296
297        // Synchronization point [H] (see design.md)
298        waiting_callbacks.push_back(RCU_CONTROL_BLOCK.callback_chain.drain());
299        let generation = RCU_CONTROL_BLOCK.generation.fetch_add(1, Ordering::Relaxed);
300
301        // Enter the *Waiting* state.
302        rcu_advancer_wait(generation);
303        waiting_callbacks.pop_front()
304
305        // Return to the *Idle* state.
306    };
307
308    // Run the callbacks in reverse order to ensure that the callbacks are run in the order in which
309    // they were scheduled.
310    for callback in callbacks.into_iter().rev() {
311        callback();
312    }
313}
314
315/// Block until all in-flight read operations and callbacks have completed.
316pub fn rcu_synchronize() {
317    RCU_THREAD_BLOCK.with(|block| {
318        assert!(!block.holding_read_lock());
319        block.has_pending_callbacks.set(false);
320    });
321    for _ in 0..QUEUE_LENGTH {
322        rcu_grace_period();
323    }
324}
325
326/// Run all callbacks that have been scheduled from this thread.
327///
328/// If any callbacks have been scheduled from this thread, this function will block until all
329/// callbacks have been run. If no callbacks have been scheduled from this thread, this function
330/// will return immediately.
331pub fn rcu_run_callbacks() {
332    RCU_THREAD_BLOCK.with(|block| {
333        assert!(!block.holding_read_lock());
334        if block.has_pending_callbacks.get() {
335            rcu_synchronize();
336        }
337    })
338}