fuchsia_rcu/state_machine.rs
1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::atomic_stack::AtomicStack;
6use fuchsia_sync::Mutex;
7use std::cell::Cell;
8use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
9use std::thread_local;
10
11type RcuCallback = Box<dyn FnOnce() + Send + Sync + 'static>;
12
13/// The length of the queue of waiting callbacks.
14///
15/// The state machine waits for this many generations to complete before running these callbacks.
16const QUEUE_LENGTH: usize = 2;
17
18/// The queue of waiting callbacks.
19///
20/// The queue is a ring buffer of sets of callbacks of length `QUEUE_LENGTH`.
21struct CallbackQueue {
22 /// The index at which to add the next set of callbacks.
23 index: usize,
24
25 /// The callbacks that are waiting to be run.
26 ///
27 /// The callbacks are stored in a ring buffer.
28 callbacks: [Vec<RcuCallback>; QUEUE_LENGTH],
29}
30
31impl CallbackQueue {
32 /// Create an empty callback queue.
33 const fn new() -> Self {
34 Self { index: 0, callbacks: [Vec::new(), Vec::new()] }
35 }
36
37 /// Add a set of callbacks to the back of the queue.
38 ///
39 /// The caller is responsible for ensuring that there is an empty slot in the ring buffer to
40 /// store the callbacks.
41 fn push_back(&mut self, callbacks: Vec<RcuCallback>) {
42 assert!(self.callbacks[self.index].is_empty());
43 self.callbacks[self.index] = callbacks;
44 self.index = (self.index + 1) % QUEUE_LENGTH;
45 }
46
47 /// Pop the front set of callbacks from the queue.
48 ///
49 /// If the queue is empty, this function returns an empty vector.
50 fn pop_front(&mut self) -> Vec<RcuCallback> {
51 self.index = (self.index + 1) % QUEUE_LENGTH;
52 std::mem::take(&mut self.callbacks[self.index])
53 }
54}
55
56struct RcuControlBlock {
57 /// The generation counter.
58 ///
59 /// The generation counter is incremented whenever the state machine leaves the `Idle` state.
60 generation: AtomicUsize,
61
62 /// The read counters.
63 ///
64 /// Readers increment the counter for the generation that they are reading from. For example,
65 /// if the `generation` is even, then readers increment the counter for the `read_counters[0]`.
66 /// If the `generation` is odd, then readers increment the counter for the `read_counters[1]`.
67 read_counters: [AtomicUsize; 2],
68
69 /// The chain of callbacks that are waiting to be run.
70 ///
71 /// Writers add callbacks to this chain after writing to the object. The callbacks are run when
72 /// all currently in-flight read operations have completed.
73 callback_chain: AtomicStack<RcuCallback>,
74
75 /// The futex used to wait for the state machine to advance.
76 advancer: zx::Futex,
77
78 /// The queue of waiting callbacks.
79 ///
80 /// Callbacks are added to this queue when the state machine leaves the `Idle` state. They are
81 /// run when the state machine leaves the `Waiting` state after `QUEUE_LENGTH` generations
82 /// have completed.
83 waiting_callbacks: Mutex<CallbackQueue>,
84}
85
86const ADVANCER_IDLE: i32 = 0;
87const ADVANCER_WAITING: i32 = 1;
88
89impl RcuControlBlock {
90 /// Create a new control block for the RCU state machine.
91 const fn new() -> Self {
92 Self {
93 generation: AtomicUsize::new(0),
94 read_counters: [AtomicUsize::new(0), AtomicUsize::new(0)],
95 callback_chain: AtomicStack::new(),
96 advancer: zx::Futex::new(ADVANCER_IDLE),
97 waiting_callbacks: Mutex::new(CallbackQueue::new()),
98 }
99 }
100}
101
102/// The control block for the RCU state machine.
103static RCU_CONTROL_BLOCK: RcuControlBlock = RcuControlBlock::new();
104
105#[derive(Default)]
106struct RcuThreadBlock {
107 /// The number of times the thread has nested into a read lock.
108 nesting_level: Cell<usize>,
109
110 /// The index of the read counter that the thread incremented when it entered its outermost read
111 /// lock.
112 counter_index: Cell<u8>,
113
114 /// Whether this thread has scheduled callbacks since the last time the thread called
115 /// `rcu_synchronize`.
116 has_pending_callbacks: Cell<bool>,
117}
118
119impl RcuThreadBlock {
120 /// Returns true if the thread is holding a read lock.
121 fn holding_read_lock(&self) -> bool {
122 self.nesting_level.get() > 0
123 }
124}
125
126thread_local! {
127 /// Thread-specific data for the RCU state machine.
128 ///
129 /// This data is used to track the nesting level of read locks and the index of the read counter
130 /// that the thread incremented when it entered its outermost read lock.
131 static RCU_THREAD_BLOCK: RcuThreadBlock = RcuThreadBlock::default();
132}
133
134/// Acquire a read lock.
135///
136/// This function is used to acquire a read lock on the RCU state machine. The RCU state machine
137/// defers calling callbacks until all currently in-flight read operations have completed.
138///
139/// Must be balanced by a call to `rcu_read_unlock` on the same thread.
140pub(crate) fn rcu_read_lock() {
141 RCU_THREAD_BLOCK.with(|block| {
142 let nesting_level = block.nesting_level.get();
143 if nesting_level > 0 {
144 // If this thread already has a read lock, increment the nesting level instead of the
145 // incrementing the read counter. This approach is a performance optimization to reduce
146 // the number of atomic operations that need to be performed.
147 block.nesting_level.set(nesting_level + 1);
148 } else {
149 // This is the outermost read lock. Increment the read counter.
150 let index = RCU_CONTROL_BLOCK.generation.load(Ordering::Relaxed) & 1;
151 // Synchronization point [A] (see design.md)
152 RCU_CONTROL_BLOCK.read_counters[index].fetch_add(1, Ordering::SeqCst);
153 block.counter_index.set(index as u8);
154 block.nesting_level.set(1);
155 }
156 });
157}
158
159/// Release a read lock.
160///
161/// This function is used to release a read lock on the RCU state machine. See `rcu_read_lock` for
162/// more details.
163pub(crate) fn rcu_read_unlock() {
164 RCU_THREAD_BLOCK.with(|block| {
165 let nesting_level = block.nesting_level.get();
166 if nesting_level > 1 {
167 // If the nesting level is greater than 1, this is not the outermost read lock.
168 // Decrement the nesting level instead of the read counter.
169 block.nesting_level.set(nesting_level - 1);
170 } else {
171 // This is the outermost read lock. Decrement the read counter.
172 let index = block.counter_index.get() as usize;
173 // Synchronization point [B] (see design.md)
174 let previous_count =
175 RCU_CONTROL_BLOCK.read_counters[index].fetch_sub(1, Ordering::SeqCst);
176 if previous_count == 1 {
177 rcu_advancer_wake_all();
178 }
179 block.nesting_level.set(0);
180 block.counter_index.set(u8::MAX);
181 }
182 });
183}
184
185/// Read the value of an RCU pointer.
186///
187/// This function cannot be called unless the current thread is holding a read lock. The returned
188/// pointer is valid until the read lock is released.
189pub(crate) fn rcu_read_pointer<T>(ptr: &AtomicPtr<T>) -> *const T {
190 // Synchronization point [D] (see design.md)
191 ptr.load(Ordering::Acquire)
192}
193
194/// Assign a new value to an RCU pointer.
195///
196/// Concurrent readers may continue to reference the old value of the pointer until the RCU state
197/// machine has made sufficient progress. To clean up the old value of the pointer, use `rcu_call`
198/// or `rcu_drop`, which defer processing until all in-flight read operations have completed.
199pub(crate) fn rcu_assign_pointer<T>(ptr: &AtomicPtr<T>, new_ptr: *mut T) {
200 // Synchronization point [E] (see design.md)
201 ptr.store(new_ptr, Ordering::Release);
202}
203
204/// Replace the value of an RCU pointer.
205///
206/// Concurrent readers may continue to reference the old value of the pointer until the RCU state
207/// machine has made sufficient progress. To clean up the old value of the pointer, use `rcu_call`
208/// or `rcu_drop`, which defer processing until all in-flight read operations have completed.
209pub(crate) fn rcu_replace_pointer<T>(ptr: &AtomicPtr<T>, new_ptr: *mut T) -> *mut T {
210 // Synchronization point [F] (see design.md)
211 ptr.swap(new_ptr, Ordering::AcqRel)
212}
213
214/// Call a callback to run after all in-flight read operations have completed.
215///
216/// To wait until the callback is run, call `rcu_synchronize()`. The callback might be called from
217/// an arbitrary thread.
218pub(crate) fn rcu_call(callback: impl FnOnce() + Send + Sync + 'static) {
219 RCU_THREAD_BLOCK.with(|block| {
220 block.has_pending_callbacks.set(true);
221 });
222
223 // Even though we push the callback to the front of the stack, we reverse the order of the stack
224 // when we pop the callbacks from the stack to ensure that the callbacks are run in the order in
225 // which they were scheduled.
226
227 // Synchronization point [G] (see design.md)
228 RCU_CONTROL_BLOCK.callback_chain.push_front(Box::new(callback));
229}
230
231/// Schedule the object to be dropped after all in-flight read operations have completed.
232///
233/// To wait until the object is dropped, call `rcu_synchronize()`. The object might be dropped from
234/// an arbitrary thread.
235pub fn rcu_drop<T: Send + Sync + 'static>(value: T) {
236 rcu_call(move || {
237 std::mem::drop(value);
238 });
239}
240
241/// Check if there are any active readers for the given generation.
242fn has_active_readers(generation: usize) -> bool {
243 let i = generation & 1;
244 // Synchronization point [C] (see design.md)
245 RCU_CONTROL_BLOCK.read_counters[i].load(Ordering::SeqCst) > 0
246}
247
248/// Wake up all the threads that are waiting to advance the state machine.
249///
250/// Does nothing if no threads are waiting.
251fn rcu_advancer_wake_all() {
252 let advancer = &RCU_CONTROL_BLOCK.advancer;
253 if advancer.load(Ordering::SeqCst) == ADVANCER_WAITING {
254 advancer.store(ADVANCER_IDLE, Ordering::Relaxed);
255 advancer.wake_all();
256 }
257}
258
259/// Blocks the current thread until all in-flight read operations have completed for the given
260/// generation.
261///
262/// Postcondition: The number of active readers for the given generation is zero and the advancer
263/// futex contains `ADVANCER_IDLE`.
264fn rcu_advancer_wait(generation: usize) {
265 let advancer = &RCU_CONTROL_BLOCK.advancer;
266 loop {
267 // In order to avoid a race with `rcu_advancer_wake_all`, we must store `ADVANCER_WAITING`
268 // before checking if there are any active readers.
269 //
270 // In the single total order, either this store or the last decrement to the reader counter
271 // must happen first.
272 //
273 // (1) If this store happens first, then the last thread to decrement the reader counter
274 // for this generation will observe `ADVANCER_WAITING` and will reset the value to
275 // `ADVANCER_IDLE` and wake the futex, unblocking this thread.
276 //
277 // (2) If the last decrement to the reader counter happens first, then this thread will see
278 // that there are no active readers in this generation and avoid blocking on the futex.
279 advancer.store(ADVANCER_WAITING, Ordering::SeqCst);
280 if !has_active_readers(generation) {
281 break;
282 }
283 let _ = advancer.wait(ADVANCER_WAITING, None, zx::MonotonicInstant::INFINITE);
284 }
285 advancer.store(ADVANCER_IDLE, Ordering::SeqCst);
286}
287
288/// Advance the RCU state machine.
289///
290/// This function blocks until all in-flight read operations have completed for the current
291/// generation and all callbacks have been run.
292fn rcu_grace_period() {
293 let callbacks = {
294 let mut waiting_callbacks = RCU_CONTROL_BLOCK.waiting_callbacks.lock();
295 // We are in the *Idle* state.
296
297 // Synchronization point [H] (see design.md)
298 waiting_callbacks.push_back(RCU_CONTROL_BLOCK.callback_chain.drain());
299 let generation = RCU_CONTROL_BLOCK.generation.fetch_add(1, Ordering::Relaxed);
300
301 // Enter the *Waiting* state.
302 rcu_advancer_wait(generation);
303 waiting_callbacks.pop_front()
304
305 // Return to the *Idle* state.
306 };
307
308 // Run the callbacks in reverse order to ensure that the callbacks are run in the order in which
309 // they were scheduled.
310 for callback in callbacks.into_iter().rev() {
311 callback();
312 }
313}
314
315/// Block until all in-flight read operations and callbacks have completed.
316pub fn rcu_synchronize() {
317 RCU_THREAD_BLOCK.with(|block| {
318 assert!(!block.holding_read_lock());
319 block.has_pending_callbacks.set(false);
320 });
321 for _ in 0..QUEUE_LENGTH {
322 rcu_grace_period();
323 }
324}
325
326/// Run all callbacks that have been scheduled from this thread.
327///
328/// If any callbacks have been scheduled from this thread, this function will block until all
329/// callbacks have been run. If no callbacks have been scheduled from this thread, this function
330/// will return immediately.
331pub fn rcu_run_callbacks() {
332 RCU_THREAD_BLOCK.with(|block| {
333 assert!(!block.holding_read_lock());
334 if block.has_pending_callbacks.get() {
335 rcu_synchronize();
336 }
337 })
338}