fuchsia_sync/rwlock.rs
1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::sync::atomic::Ordering;
6
7pub struct RawSyncRwLock {
8 /// Holds the primary state of the RwLock.
9 ///
10 /// See the constants declared below for the semantics of this value.
11 ///
12 /// Readers will zx_futex_wait on this address.
13 ///
14 /// Ordering rules:
15 ///
16 /// * Any store operation which may release the lock must use Ordering::Release on state to
17 /// establish a happens-before relationship with the next lock acquisition.
18 /// * Any load operation which may acquire the lock must use Ordering::Acquire on state to
19 /// establish a happens-before relationship with the previous lock release.
20 state: zx::Futex,
21
22 /// The queue of writers waiting to obtain this lock.
23 ///
24 /// The value of this field is just a generation counter for this queue.
25 ///
26 /// Writers will zx_futex_wait on this address with the current generation number.
27 ///
28 /// Ordering rules:
29 ///
30 /// * Stores to writer_queue must be preceded by a store to state and use Ordering::Release.
31 /// * Loads from writer_queue must use Ordering::Acquire and be followed by a load of state.
32 writer_queue: zx::Futex,
33}
34
35const INITIAL_STATE: i32 = 0;
36
37/// If this bit is set in `state`, then the lock is held exclusively (i.e., as a writer) by the
38/// thread that set this bit.
39const WRITER_BIT: i32 = 0b0001;
40
41/// If this bit is set in `state`, then a writer wished to acquire exclusive access to this lock
42/// but observed a reader or a writer holding the lock. The writer will fetch the currentgeneration
43/// number for `writer_queue`, re-check `state`, and then zx_futex_wait on the `writer_queue`.
44const WRITER_BLOCKED_BIT: i32 = 0b0010;
45
46/// If this bit is set in `state`, then a reader wished to acquire shared access to this lock
47/// but could not because either (a) the lock was held exclusively by a writer or (b) a writer
48/// was already blocked waiting for the lock. This second condition is necessary to avoid
49/// starving writers: once a writer is blocked, readers that could otherwise have acquired
50/// shared access to the lock become blocked waiting for at least one writer to acquire the lock.
51const READER_BLOCKED_BIT: i32 = 0b0100;
52
53/// The amount `state` is incremented when a reader acquires the lock. The `state` tracks the
54/// number of outstanding readers so that once all the readers have released their shared access,
55/// the lock can be made available for exclusive access again.
56///
57/// We count the readers in the high bits of the state so that we can use arithmetic overflow to
58/// detect when too many readers have acquired the lock for us to keep track of.
59const READER_UNIT: i32 = 0b1000;
60
61/// A mask to select only the bits that count the number of readers holding shared access to the
62/// lock.
63const READER_MASK: i32 = !0b0111;
64
65/// # STATE MACHINE
66///
67/// The RwLock goes through the following states:
68///
69/// ## Initial
70///
71/// In the "Initial" state, the `state` is zero. No thread has access to the lock and no threads
72/// are waiting.
73///
74/// * If a reader tries to acquire the lock => Shared access (unblocked)
75/// * If a writer tries to acquire the lock => Exclusive access (unblocked)
76/// * If a previously blocked writer acquires the lock => Exclusive access (writers blocked)
77///
78/// ## Shared access (unblocked)
79///
80/// In this state, `state & READER_MASK` is non-zero and other bits are unset. A non-zero
81/// number of threads have shared access to the lock and no threads are waiting.
82///
83/// Additional readers can acquire shared access to the lock without entering the kernel.
84///
85/// * If a reader tries to acquire the lock => Shared access (unblocked)
86/// * If a writer tries to acquire the lock => Shared access (writers blocked)
87/// * If the last reader releases the lock => Initial
88///
89/// ## Shared access (writers blocked)
90///
91/// In this state, `state & READER_MASK` is non-zero, WRITER_BLOCKED_BIT is set, and other bits are
92/// unset. A non-zero number of threads have shared access to the lock and a non-zero number of
93/// writers are waiting for exclusive access.
94///
95/// The lock is contended and requires kernel coordination to wake the blocked threads.
96///
97/// * If a reader tries to acquire the lock => Shared access (readers and writers blocked)
98/// * If a writer tries to acquire the lock => Shared access (writers blocked)
99/// * If the last reader releases the lock => Exclusive access (writers blocked)
100///
101/// ## Shared access (readers and writers blocked)
102///
103/// In this state, `state & READER_MASK` is non-zero, WRITER_BLOCKED_BIT and READER_BLOCKED_BIT are
104/// set, and other bits are unset. A non-zero number of threads have shared access to the lock,
105/// a non-zero number of writers are waiting for exclusive access, and a non-zero number of writers
106/// are waiting for shared access.
107///
108/// The lock is contended and requires kernel coordination to wake the blocked threads.
109///
110/// * If a reader tries to acquire the lock => Shared access (readers and writers blocked)
111/// * If a writer tries to acquire the lock => Shared access (readers and writers blocked)
112/// * If the last reader releases the lock => Exclusive access (readers and writers blocked)
113///
114/// ## Exclusive access (unblocked)
115///
116/// In this state, WRITER_BIT is set and other bits are unset. Exactly one thread has exclusive
117/// access to the lock and no threads are waiting.
118///
119/// The writer can release the lock without entering the kernel.
120///
121/// * If a reader tries to acquire the lock => Exclusive access (readers and writers blocked)
122/// * If a writer tries to acquire the lock => Exclusive access (writers blocked)
123/// * If a writer tries to downgrade the lock => Shared access (unblocked)
124/// * If the writer releases the lock => Initial
125///
126/// ## Exclusive access (writers blocked)
127///
128/// In this state, WRITER_BIT and WRITER_BLOCKED_BIT are set and other bits are unset. Exactly one
129/// thread has exclusive access to the lock and zero or more writers are waiting for exclusive
130/// access.
131///
132/// When the writer release the lock, the state transitions to the "Initial state" and then the
133/// lock wakes up one of the writers, if any exist. If this previously waiting writer succeeds in
134/// acquiring the lock, the state machine returns to the "Exclusive access (writers blocked)" state
135/// because we do not know how many writers are blocked waiting for exclusive access.
136///
137/// * If a reader tries to acquire the lock => Exclusive access (readers and writers blocked)
138/// * If a writer tries to acquire the lock => Exclusive access (writers blocked)
139/// * If a writer tries to downgrade the lock => Shared access (writers blocked)
140/// * If the writer releases the lock => Initial
141///
142/// ## Exclusive access (readers blocked)
143///
144/// In this state, WRITER_BIT and READER_BLOCKED_BIT are set and other bits are unset. Exactly one
145/// thread has exclusive access to the lock and zero or more writers are waiting for shared
146/// access.
147///
148/// When the writer release the lock, the state transitions to the initial state and then the lock
149/// wakes up any blocked readers.
150///
151/// * If a reader tries to acquire the lock => Exclusive access (readers blocked)
152/// * If a writer tries to acquire the lock => Exclusive access (readers and writers blocked)
153/// * If a writer tries to downgrade the lock => Unique reader (readers blocked)
154/// * If the writer releases the lock => Initial
155///
156/// ## Exclusive access (readers and writers blocked)
157///
158/// In this state, WRITER_BIT, WRITER_BLOCKED_BIT, and READER_BLOCKED_BIT are set and other bits
159/// are unset. Exactly one thread has exclusive access to the lock and zero or more writers are
160/// waiting for exclusive access, and a non-zero number of readers are waiting for shared access.
161///
162/// The lock is contended and requires kernel coordination to wake the blocked threads.
163///
164/// * If a reader tries to acquire the lock => Exclusive access (readers and writers blocked)
165/// * If a writer tries to acquire the lock => Exclusive access (readers and writers blocked)
166/// * If a writer tries to downgrade the lock => Unique reader (readers and writers blocked)
167/// * If the writer releases the lock => Unlocked (readers blocked)
168///
169/// ## Unlocked (readers blocked)
170///
171/// In this state, READER_BLOCKED_BIT is set and other bits are unset. No thread has access to the
172/// lock and a non-zero number of readers are waiting for shared access.
173///
174/// This state is transitory and the state machine will leave this state without outside
175/// intervention by returning to the "Initial" state and waking any blocked readers.
176///
177/// * If a reader tries to acquire the lock => Unlocked (readers blocked)
178/// * If a writer tries to acquire the lock => Exclusive access (readers blocked)
179/// * Otherwise => Initial
180///
181/// ## Unique reader (readers blocked)
182///
183/// In this state, there is exactly one reader, who is running on the current thread, the
184/// READER_BLOCKED_BIT is set and and other bits are unset. A non-zero number of readers are
185/// waiting for shared access.
186///
187/// This state is transitory and the state machine will leave this state without outside
188/// intervention by moving to the "Shared access (unblocked)" state and waking any blocked
189/// readers.
190///
191/// * If a reader tries to acquire the lock => Unique reader (readers blocked)
192/// * If a writer tries to acquire the lock => Unique reader (readers and writers blocked)
193/// * Otherwise => Shared access (unblocked)
194///
195/// ## Unique reader (readers and writers blocked)
196///
197/// In this state, there is exactly one reader, who is running on the current thread, and the
198/// READER_BLOCKED_BIT and the WRITER_BLOCKED_BIT are set. Zero or more writers are waiting for
199/// exclusive access, and a non-zero number of readers are waiting for shared access.
200///
201/// This state is transitory and the state machine will leave this state without outside
202/// intervention by moving to the "Shared access (writers blocked)" state and waking any blocked
203/// readers.
204///
205///
206/// * If a reader tries to acquire the lock => Unique reader (readers and writers blocked)
207/// * If a writer tries to acquire the lock => Unique reader (readers and writers blocked)
208/// * Otherwise => Shared access (writers blocked)
209
210fn is_locked_exclusive(state: i32) -> bool {
211 state & WRITER_BIT != 0
212}
213
214fn has_blocked_writer(state: i32) -> bool {
215 state & WRITER_BLOCKED_BIT != 0
216}
217
218fn has_blocked_reader(state: i32) -> bool {
219 state & READER_BLOCKED_BIT != 0
220}
221
222fn can_lock_shared(state: i32) -> bool {
223 !is_locked_exclusive(state) && !has_blocked_writer(state) && !has_blocked_reader(state)
224}
225
226fn is_unlocked(state: i32) -> bool {
227 state & (WRITER_BIT | READER_MASK) == 0
228}
229
230impl RawSyncRwLock {
231 #[inline]
232 fn try_lock_shared_fast(&self) -> bool {
233 let state = self.state.load(Ordering::Relaxed);
234 if can_lock_shared(state) {
235 if let Some(new_state) = state.checked_add(READER_UNIT) {
236 return self
237 .state
238 .compare_exchange(state, new_state, Ordering::Acquire, Ordering::Relaxed)
239 .is_ok();
240 }
241 }
242 false
243 }
244
245 #[cold]
246 fn lock_shared_slow(&self) {
247 let mut state = self.state.load(Ordering::Relaxed);
248 loop {
249 if can_lock_shared(state) {
250 let new_state =
251 state.checked_add(READER_UNIT).expect("overflowed reader count in rwlock");
252 match self.state.compare_exchange_weak(
253 state,
254 new_state,
255 Ordering::Acquire,
256 Ordering::Relaxed,
257 ) {
258 Ok(_) => return, // Acquired shared lock.
259 Err(observed_state) => {
260 state = observed_state;
261 continue;
262 }
263 }
264 }
265
266 let desired_sleep_state = state | READER_BLOCKED_BIT;
267
268 if !has_blocked_reader(state) {
269 if let Err(observed_state) = self.state.compare_exchange(
270 state,
271 desired_sleep_state,
272 Ordering::Relaxed,
273 Ordering::Relaxed,
274 ) {
275 state = observed_state;
276 continue;
277 }
278 }
279
280 // Ignore spurious wakeups, the loop will retry.
281 self.state
282 .wait(
283 desired_sleep_state,
284 None, // We don't integrate with priority inheritance yet.
285 zx::MonotonicInstant::INFINITE,
286 )
287 .ok();
288 state = self.state.load(Ordering::Relaxed);
289 }
290 }
291
292 #[cold]
293 fn lock_exclusive_slow(&self) {
294 let mut state = self.state.load(Ordering::Relaxed);
295 let mut other_writers_bit = 0;
296
297 loop {
298 if is_unlocked(state) {
299 match self.state.compare_exchange_weak(
300 state,
301 state | WRITER_BIT | other_writers_bit,
302 Ordering::Acquire,
303 Ordering::Relaxed,
304 ) {
305 Ok(_) => return, // Acquired exclusive lock.
306 Err(observed_state) => {
307 state = observed_state;
308 continue;
309 }
310 }
311 }
312
313 if !has_blocked_writer(state) {
314 if let Err(observed_state) = self.state.compare_exchange(
315 state,
316 state | WRITER_BLOCKED_BIT,
317 Ordering::Relaxed,
318 Ordering::Relaxed,
319 ) {
320 state = observed_state;
321 continue;
322 }
323 }
324
325 other_writers_bit = WRITER_BLOCKED_BIT;
326
327 let generation_number = self.writer_queue.load(Ordering::Acquire);
328
329 // Before we go to sleep on the writer_queue at the fetched generation number, we need
330 // to make sure that some other thread is going to wake that generation of sleeping
331 // writers. If we didn't fetch the state again, it's possible that another thread could
332 // have cleared the WRITER_BLOCKED_BIT in the state and incremented the generation
333 // number between the last time we observed state and the time we observed the
334 // generation number.
335 //
336 // By observing the WRITER_BLOCKED_BIT *after* fetching the generation number, we
337 // ensure that either (a) this generation has already been awoken or (b) whoever clears
338 // the WRITER_BLOCKED_BIT bit will wake this generation in the future.
339 state = self.state.load(Ordering::Relaxed);
340
341 // If the lock is available or the WRITER_BLOCKED_BIT is missing, try again. No one has
342 // promised to wake the observed generation number.
343 if is_unlocked(state) || !has_blocked_writer(state) {
344 continue;
345 }
346
347 // Ignore spurious wakeups here, the loop will retry.
348 self.writer_queue
349 .wait(
350 generation_number,
351 None, // We don't integrate with priority inheritance yet.
352 zx::MonotonicInstant::INFINITE,
353 )
354 .ok();
355
356 state = self.state.load(Ordering::Relaxed);
357 }
358 }
359
360 #[cold]
361 fn unlock_slow(&self, mut state: i32) {
362 debug_assert!(is_unlocked(state));
363
364 // There are only writers waiting.
365 if state == WRITER_BLOCKED_BIT {
366 match self.state.compare_exchange(
367 state,
368 INITIAL_STATE,
369 Ordering::Relaxed,
370 Ordering::Relaxed,
371 ) {
372 Ok(_) => {
373 self.wake_writer();
374 // We either made progress by waking a waiter or no one is waiting for this
375 // lock anymore.
376 return;
377 }
378 Err(observed_state) => {
379 state = observed_state;
380 }
381 }
382 }
383
384 // There are both readers and writers waiting.
385 if state == READER_BLOCKED_BIT | WRITER_BLOCKED_BIT {
386 // Attempt to clear the WRITER_BLOCKED_BIT.
387 if self
388 .state
389 .compare_exchange(state, READER_BLOCKED_BIT, Ordering::Relaxed, Ordering::Relaxed)
390 .is_err()
391 {
392 // The state changed, which means another thread made progress. We're done.
393 return;
394 }
395 self.wake_writer();
396 // We cannot be sure that we actually work up a writer, which means we also need to
397 // wake up the readers to avoid the situation where a stack of readers are waiting for
398 // a non-existent writer to be done.
399 state = READER_BLOCKED_BIT;
400 }
401
402 // There are only readers waiting.
403 if state == READER_BLOCKED_BIT {
404 if self
405 .state
406 .compare_exchange(state, INITIAL_STATE, Ordering::Relaxed, Ordering::Relaxed)
407 .is_ok()
408 {
409 // Wake up all the readers.
410 self.wake_readers();
411 }
412 }
413 }
414
415 #[cold]
416 fn downgrade_slow(&self, mut state: i32) {
417 debug_assert!(has_blocked_reader(state));
418 loop {
419 if !has_blocked_reader(state) {
420 // Someone else must have woken up the readers.
421 return;
422 }
423
424 match self.state.compare_exchange(
425 state,
426 state - READER_BLOCKED_BIT,
427 Ordering::Relaxed,
428 Ordering::Relaxed,
429 ) {
430 Ok(_) => {
431 // We cleared the READER_BLOCKED_BIT, so we need to wake the readers.
432 self.wake_readers();
433 return;
434 }
435 Err(observed_state) => {
436 state = observed_state;
437 continue;
438 }
439 }
440 }
441 }
442
443 fn wake_writer(&self) {
444 self.writer_queue.fetch_add(1, Ordering::Release);
445 // TODO: Track which thread owns this futex for priority inheritance.
446 self.writer_queue.wake(1);
447 }
448
449 fn wake_readers(&self) {
450 self.state.wake_all();
451 }
452}
453
454unsafe impl lock_api::RawRwLock for RawSyncRwLock {
455 const INIT: RawSyncRwLock =
456 RawSyncRwLock { state: zx::Futex::new(0), writer_queue: zx::Futex::new(0) };
457
458 // These operations do not need to happen on the same thread.
459 type GuardMarker = lock_api::GuardSend;
460
461 #[inline]
462 fn lock_shared(&self) {
463 if !self.try_lock_shared_fast() {
464 self.lock_shared_slow();
465 }
466 }
467
468 #[inline]
469 fn try_lock_shared(&self) -> bool {
470 self.try_lock_shared_fast()
471 }
472
473 #[inline]
474 unsafe fn unlock_shared(&self) {
475 let state = self.state.fetch_sub(READER_UNIT, Ordering::Release) - READER_UNIT;
476
477 // If we just released a reader, then we cannot have blocked readers unless we also have
478 // blocked writers because, otherwise, the reader would just have acquired the lock.
479 debug_assert!(!has_blocked_reader(state) || has_blocked_writer(state));
480
481 // If we were the last reader and there are writers blocked, we need to wake up the blocked
482 // writer.
483 if is_unlocked(state) && has_blocked_writer(state) {
484 self.unlock_slow(state);
485 }
486 }
487
488 #[inline]
489 fn lock_exclusive(&self) {
490 if self
491 .state
492 .compare_exchange_weak(INITIAL_STATE, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
493 .is_err()
494 {
495 self.lock_exclusive_slow();
496 }
497 }
498
499 #[inline]
500 fn try_lock_exclusive(&self) -> bool {
501 self.state
502 .compare_exchange(INITIAL_STATE, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
503 .is_ok()
504 }
505
506 #[inline]
507 unsafe fn unlock_exclusive(&self) {
508 let state = self.state.fetch_sub(WRITER_BIT, Ordering::Release) - WRITER_BIT;
509
510 // If we just released a writer, then there must not be any readers or writers.
511 debug_assert!(is_unlocked(state));
512
513 if has_blocked_reader(state) || has_blocked_writer(state) {
514 self.unlock_slow(state);
515 }
516 }
517}
518
519unsafe impl lock_api::RawRwLockDowngrade for RawSyncRwLock {
520 #[inline]
521 unsafe fn downgrade(&self) {
522 let state = self.state.fetch_add(READER_UNIT - WRITER_BIT, Ordering::Release);
523
524 if has_blocked_reader(state) {
525 self.downgrade_slow(state);
526 }
527 }
528}
529
530pub type RwLock<T> = lock_api::RwLock<RawSyncRwLock, T>;
531pub type RwLockReadGuard<'a, T> = lock_api::RwLockReadGuard<'a, RawSyncRwLock, T>;
532pub type RwLockWriteGuard<'a, T> = lock_api::RwLockWriteGuard<'a, RawSyncRwLock, T>;
533pub type MappedRwLockReadGuard<'a, T> = lock_api::MappedRwLockReadGuard<'a, RawSyncRwLock, T>;
534pub type MappedRwLockWriteGuard<'a, T> = lock_api::MappedRwLockWriteGuard<'a, RawSyncRwLock, T>;
535
536#[cfg(test)]
537mod test {
538 use super::*;
539 use std::sync::atomic::AtomicUsize;
540 use std::sync::Arc;
541
542 #[test]
543 fn test_write_and_read() {
544 let value = RwLock::<u32>::new(5);
545 let mut guard = value.write();
546 assert_eq!(*guard, 5);
547 *guard = 6;
548 assert_eq!(*guard, 6);
549 std::mem::drop(guard);
550
551 let guard = value.read();
552 assert_eq!(*guard, 6);
553 }
554
555 #[test]
556 fn test_try_during_read() {
557 let value = RwLock::<u32>::new(5);
558 let _read_guard = value.read();
559 assert!(value.try_write().is_none());
560 assert!(value.try_read().is_some());
561 }
562
563 #[test]
564 fn test_try_during_write() {
565 let value = RwLock::<u32>::new(5);
566 let _write_guard = value.write();
567 assert!(value.try_write().is_none());
568 assert!(value.try_read().is_none());
569 }
570
571 #[test]
572 fn test_downgrade() {
573 let value = RwLock::<u32>::new(5);
574 let mut guard = value.write();
575 assert_eq!(*guard, 5);
576 *guard = 6;
577 assert_eq!(*guard, 6);
578 assert!(value.try_write().is_none());
579 assert!(value.try_read().is_none());
580 let guard1 = RwLockWriteGuard::downgrade(guard);
581 assert_eq!(*guard1, 6);
582 assert!(value.try_write().is_none());
583 let guard2 = value.read();
584 assert_eq!(*guard2, 6);
585 }
586
587 struct State {
588 value: RwLock<u32>,
589 gate: zx::Futex,
590 writer_count: AtomicUsize,
591 reader_count: AtomicUsize,
592 }
593
594 impl Default for State {
595 fn default() -> Self {
596 Self {
597 value: Default::default(),
598 gate: zx::Futex::new(0),
599 writer_count: Default::default(),
600 reader_count: Default::default(),
601 }
602 }
603 }
604
605 impl State {
606 fn wait_for_gate(&self) {
607 while self.gate.load(Ordering::Acquire) == 0 {
608 // Ignore failures, we'll retry anyways.
609 self.gate.wait(0, None, zx::MonotonicInstant::INFINITE).ok();
610 }
611 }
612
613 fn open_gate(&self) {
614 self.gate.fetch_add(1, Ordering::Release);
615 self.gate.wake_all();
616 }
617
618 fn spawn_writer(state: Arc<Self>, count: usize) -> std::thread::JoinHandle<()> {
619 std::thread::spawn(move || {
620 state.wait_for_gate();
621 for _ in 0..count {
622 let mut guard = state.value.write();
623 *guard = *guard + 1;
624 let writer_count = state.writer_count.fetch_add(1, Ordering::Acquire) + 1;
625 let reader_count = state.reader_count.load(Ordering::Acquire);
626 state.writer_count.fetch_sub(1, Ordering::Release);
627 std::mem::drop(guard);
628 assert_eq!(writer_count, 1, "More than one writer held the RwLock at once.");
629 assert_eq!(
630 reader_count, 0,
631 "A reader and writer held the RwLock at the same time."
632 );
633 }
634 })
635 }
636
637 fn spawn_reader(state: Arc<Self>, count: usize) -> std::thread::JoinHandle<()> {
638 std::thread::spawn(move || {
639 state.wait_for_gate();
640 for _ in 0..count {
641 let guard = state.value.read();
642 let observed_value = *guard;
643 let reader_count = state.reader_count.fetch_add(1, Ordering::Acquire) + 1;
644 let writer_count = state.writer_count.load(Ordering::Acquire);
645 state.reader_count.fetch_sub(1, Ordering::Release);
646 std::mem::drop(guard);
647 assert!(observed_value < u32::MAX, "The value inside the RwLock underflowed.");
648 assert_eq!(
649 writer_count, 0,
650 "A reader and writer held the RwLock at the same time."
651 );
652 assert!(reader_count > 0, "A reader held the RwLock without being counted.");
653 }
654 })
655 }
656 }
657
658 #[test]
659 fn test_thundering_writes() {
660 let state = Arc::new(State::default());
661 let mut threads = vec![];
662 for _ in 0..10 {
663 threads.push(State::spawn_writer(Arc::clone(&state), 100));
664 }
665
666 // Try to align the thundering herd to stress the RwLock as much as possible.
667 std::thread::sleep(std::time::Duration::from_millis(100));
668 state.open_gate();
669
670 while let Some(thread) = threads.pop() {
671 thread.join().expect("failed to join thread");
672 }
673 let guard = state.value.read();
674 assert_eq!(1000, *guard, "The RwLock held the wrong value at the end.");
675 }
676
677 #[test]
678 fn test_thundering_reads_and_writes() {
679 let state = Arc::new(State::default());
680 let mut threads = vec![];
681 for _ in 0..10 {
682 let state = Arc::clone(&state);
683 threads.push(State::spawn_writer(Arc::clone(&state), 100));
684 threads.push(State::spawn_reader(Arc::clone(&state), 100));
685 }
686
687 // Try to align the thundering herd to stress the RwLock as much as possible.
688 std::thread::sleep(std::time::Duration::from_millis(100));
689 state.open_gate();
690
691 while let Some(thread) = threads.pop() {
692 thread.join().expect("failed to join thread");
693 }
694 let guard = state.value.read();
695 assert_eq!(1000, *guard, "The RwLock held the wrong value at the end.");
696 }
697}