fuchsia_sync/
rwlock.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::sync::atomic::Ordering;
6
7pub struct RawSyncRwLock {
8    /// Holds the primary state of the RwLock.
9    ///
10    /// See the constants declared below for the semantics of this value.
11    ///
12    /// Readers will zx_futex_wait on this address.
13    ///
14    /// Ordering rules:
15    ///
16    ///  * Any store operation which may release the lock must use Ordering::Release on state to
17    ///    establish a happens-before relationship with the next lock acquisition.
18    ///  * Any load operation which may acquire the lock must use Ordering::Acquire on state to
19    ///    establish a happens-before relationship with the previous lock release.
20    state: zx::Futex,
21
22    /// The queue of writers waiting to obtain this lock.
23    ///
24    /// The value of this field is just a generation counter for this queue.
25    ///
26    /// Writers will zx_futex_wait on this address with the current generation number.
27    ///
28    /// Ordering rules:
29    ///
30    ///  * Stores to writer_queue must be preceded by a store to state and use Ordering::Release.
31    ///  * Loads from writer_queue must use Ordering::Acquire and be followed by a load of state.
32    writer_queue: zx::Futex,
33}
34
35const INITIAL_STATE: i32 = 0;
36
37/// If this bit is set in `state`, then the lock is held exclusively (i.e., as a writer) by the
38/// thread that set this bit.
39const WRITER_BIT: i32 = 0b0001;
40
41/// If this bit is set in `state`, then a writer wished to acquire exclusive access to this lock
42/// but observed a reader or a writer holding the lock. The writer will fetch the currentgeneration
43/// number for `writer_queue`, re-check `state`, and then zx_futex_wait on the `writer_queue`.
44const WRITER_BLOCKED_BIT: i32 = 0b0010;
45
46/// If this bit is set in `state`, then a reader wished to acquire shared access to this lock
47/// but could not because either (a) the lock was held exclusively by a writer or (b) a writer
48/// was already blocked waiting for the lock. This second condition is necessary to avoid
49/// starving writers: once a writer is blocked, readers that could otherwise have acquired
50/// shared access to the lock become blocked waiting for at least one writer to acquire the lock.
51const READER_BLOCKED_BIT: i32 = 0b0100;
52
53/// The amount `state` is incremented when a reader acquires the lock. The `state` tracks the
54/// number of outstanding readers so that once all the readers have released their shared access,
55/// the lock can be made available for exclusive access again.
56///
57/// We count the readers in the high bits of the state so that we can use arithmetic overflow to
58/// detect when too many readers have acquired the lock for us to keep track of.
59const READER_UNIT: i32 = 0b1000;
60
61/// A mask to select only the bits that count the number of readers holding shared access to the
62/// lock.
63const READER_MASK: i32 = !0b0111;
64
65/// # STATE MACHINE
66///
67/// The RwLock goes through the following states:
68///
69/// ## Initial
70///
71/// In the "Initial" state, the `state` is zero. No thread has access to the lock and no threads
72/// are waiting.
73///
74/// * If a reader tries to acquire the lock => Shared access (unblocked)
75/// * If a writer tries to acquire the lock => Exclusive access (unblocked)
76/// * If a previously blocked writer acquires the lock => Exclusive access (writers blocked)
77///
78/// ## Shared access (unblocked)
79///
80/// In this state, `state & READER_MASK` is non-zero and other bits are unset. A non-zero
81/// number of threads have shared access to the lock and no threads are waiting.
82///
83/// Additional readers can acquire shared access to the lock without entering the kernel.
84///
85/// * If a reader tries to acquire the lock => Shared access (unblocked)
86/// * If a writer tries to acquire the lock => Shared access (writers blocked)
87/// * If the last reader releases the lock => Initial
88///
89/// ## Shared access (writers blocked)
90///
91/// In this state, `state & READER_MASK` is non-zero, WRITER_BLOCKED_BIT is set, and other bits are
92/// unset. A non-zero number of threads have shared access to the lock and a non-zero number of
93/// writers are waiting for exclusive access.
94///
95/// The lock is contended and requires kernel coordination to wake the blocked threads.
96///
97/// * If a reader tries to acquire the lock => Shared access (readers and writers blocked)
98/// * If a writer tries to acquire the lock => Shared access (writers blocked)
99/// * If the last reader releases the lock => Exclusive access (writers blocked)
100///
101/// ## Shared access (readers and writers blocked)
102///
103/// In this state, `state & READER_MASK` is non-zero, WRITER_BLOCKED_BIT and READER_BLOCKED_BIT are
104/// set, and other bits are unset. A non-zero number of threads have shared access to the lock,
105/// a non-zero number of writers are waiting for exclusive access, and a non-zero number of writers
106/// are waiting for shared access.
107///
108/// The lock is contended and requires kernel coordination to wake the blocked threads.
109///
110/// * If a reader tries to acquire the lock => Shared access (readers and writers blocked)
111/// * If a writer tries to acquire the lock => Shared access (readers and writers blocked)
112/// * If the last reader releases the lock => Exclusive access (readers and writers blocked)
113///
114/// ## Exclusive access (unblocked)
115///
116/// In this state, WRITER_BIT is set and other bits are unset. Exactly one thread has exclusive
117/// access to the lock and no threads are waiting.
118///
119/// The writer can release the lock without entering the kernel.
120///
121/// * If a reader tries to acquire the lock => Exclusive access (readers and writers blocked)
122/// * If a writer tries to acquire the lock => Exclusive access (writers blocked)
123/// * If a writer tries to downgrade the lock => Shared access (unblocked)
124/// * If the writer releases the lock => Initial
125///
126/// ## Exclusive access (writers blocked)
127///
128/// In this state, WRITER_BIT and WRITER_BLOCKED_BIT are set and other bits are unset. Exactly one
129/// thread has exclusive access to the lock and zero or more writers are waiting for exclusive
130/// access.
131///
132/// When the writer release the lock, the state transitions to the "Initial state" and then the
133/// lock wakes up one of the writers, if any exist. If this previously waiting writer succeeds in
134/// acquiring the lock, the state machine returns to the "Exclusive access (writers blocked)" state
135/// because we do not know how many writers are blocked waiting for exclusive access.
136///
137/// * If a reader tries to acquire the lock => Exclusive access (readers and writers blocked)
138/// * If a writer tries to acquire the lock => Exclusive access (writers blocked)
139/// * If a writer tries to downgrade the lock => Shared access (writers blocked)
140/// * If the writer releases the lock => Initial
141///
142/// ## Exclusive access (readers blocked)
143///
144/// In this state, WRITER_BIT and READER_BLOCKED_BIT are set and other bits are unset. Exactly one
145/// thread has exclusive access to the lock and zero or more writers are waiting for shared
146/// access.
147///
148/// When the writer release the lock, the state transitions to the initial state and then the lock
149/// wakes up any blocked readers.
150///
151/// * If a reader tries to acquire the lock => Exclusive access (readers blocked)
152/// * If a writer tries to acquire the lock => Exclusive access (readers and writers blocked)
153/// * If a writer tries to downgrade the lock => Unique reader (readers blocked)
154/// * If the writer releases the lock => Initial
155///
156/// ## Exclusive access (readers and writers blocked)
157///
158/// In this state, WRITER_BIT, WRITER_BLOCKED_BIT, and READER_BLOCKED_BIT are set and other bits
159/// are unset. Exactly one thread has exclusive access to the lock and zero or more writers are
160/// waiting for exclusive access, and a non-zero number of readers are waiting for shared access.
161///
162/// The lock is contended and requires kernel coordination to wake the blocked threads.
163///
164/// * If a reader tries to acquire the lock => Exclusive access (readers and writers blocked)
165/// * If a writer tries to acquire the lock => Exclusive access (readers and writers blocked)
166/// * If a writer tries to downgrade the lock => Unique reader (readers and writers blocked)
167/// * If the writer releases the lock => Unlocked (readers blocked)
168///
169/// ## Unlocked (readers blocked)
170///
171/// In this state, READER_BLOCKED_BIT is set and other bits are unset. No thread has access to the
172/// lock and a non-zero number of readers are waiting for shared access.
173///
174/// This state is transitory and the state machine will leave this state without outside
175/// intervention by returning to the "Initial" state and waking any blocked readers.
176///
177/// * If a reader tries to acquire the lock => Unlocked (readers blocked)
178/// * If a writer tries to acquire the lock => Exclusive access (readers blocked)
179/// * Otherwise => Initial
180///
181/// ## Unique reader (readers blocked)
182///
183/// In this state, there is exactly one reader, who is running on the current thread, the
184/// READER_BLOCKED_BIT is set and and other bits are unset. A non-zero number of readers are
185/// waiting for shared access.
186///
187/// This state is transitory and the state machine will leave this state without outside
188/// intervention by moving to the "Shared access (unblocked)" state and waking any blocked
189/// readers.
190///
191/// * If a reader tries to acquire the lock => Unique reader (readers blocked)
192/// * If a writer tries to acquire the lock => Unique reader (readers and writers blocked)
193/// * Otherwise => Shared access (unblocked)
194///
195/// ## Unique reader (readers and writers blocked)
196///
197/// In this state, there is exactly one reader, who is running on the current thread, and the
198/// READER_BLOCKED_BIT and the WRITER_BLOCKED_BIT are set. Zero or more writers are waiting for
199/// exclusive access, and a non-zero number of readers are waiting for shared access.
200///
201/// This state is transitory and the state machine will leave this state without outside
202/// intervention by moving to the "Shared access (writers blocked)" state and waking any blocked
203/// readers.
204///
205///
206/// * If a reader tries to acquire the lock => Unique reader (readers and writers blocked)
207/// * If a writer tries to acquire the lock => Unique reader (readers and writers blocked)
208/// * Otherwise => Shared access (writers blocked)
209
210fn is_locked_exclusive(state: i32) -> bool {
211    state & WRITER_BIT != 0
212}
213
214fn has_blocked_writer(state: i32) -> bool {
215    state & WRITER_BLOCKED_BIT != 0
216}
217
218fn has_blocked_reader(state: i32) -> bool {
219    state & READER_BLOCKED_BIT != 0
220}
221
222fn can_lock_shared(state: i32) -> bool {
223    !is_locked_exclusive(state) && !has_blocked_writer(state) && !has_blocked_reader(state)
224}
225
226fn is_unlocked(state: i32) -> bool {
227    state & (WRITER_BIT | READER_MASK) == 0
228}
229
230impl RawSyncRwLock {
231    #[inline]
232    fn try_lock_shared_fast(&self) -> bool {
233        let state = self.state.load(Ordering::Relaxed);
234        if can_lock_shared(state) {
235            if let Some(new_state) = state.checked_add(READER_UNIT) {
236                return self
237                    .state
238                    .compare_exchange(state, new_state, Ordering::Acquire, Ordering::Relaxed)
239                    .is_ok();
240            }
241        }
242        false
243    }
244
245    #[cold]
246    fn lock_shared_slow(&self) {
247        let mut state = self.state.load(Ordering::Relaxed);
248        loop {
249            if can_lock_shared(state) {
250                let new_state =
251                    state.checked_add(READER_UNIT).expect("overflowed reader count in rwlock");
252                match self.state.compare_exchange_weak(
253                    state,
254                    new_state,
255                    Ordering::Acquire,
256                    Ordering::Relaxed,
257                ) {
258                    Ok(_) => return, // Acquired shared lock.
259                    Err(observed_state) => {
260                        state = observed_state;
261                        continue;
262                    }
263                }
264            }
265
266            let desired_sleep_state = state | READER_BLOCKED_BIT;
267
268            if !has_blocked_reader(state) {
269                if let Err(observed_state) = self.state.compare_exchange(
270                    state,
271                    desired_sleep_state,
272                    Ordering::Relaxed,
273                    Ordering::Relaxed,
274                ) {
275                    state = observed_state;
276                    continue;
277                }
278            }
279
280            // Ignore spurious wakeups, the loop will retry.
281            self.state
282                .wait(
283                    desired_sleep_state,
284                    None, // We don't integrate with priority inheritance yet.
285                    zx::MonotonicInstant::INFINITE,
286                )
287                .ok();
288            state = self.state.load(Ordering::Relaxed);
289        }
290    }
291
292    #[cold]
293    fn lock_exclusive_slow(&self) {
294        let mut state = self.state.load(Ordering::Relaxed);
295        let mut other_writers_bit = 0;
296
297        loop {
298            if is_unlocked(state) {
299                match self.state.compare_exchange_weak(
300                    state,
301                    state | WRITER_BIT | other_writers_bit,
302                    Ordering::Acquire,
303                    Ordering::Relaxed,
304                ) {
305                    Ok(_) => return, // Acquired exclusive lock.
306                    Err(observed_state) => {
307                        state = observed_state;
308                        continue;
309                    }
310                }
311            }
312
313            if !has_blocked_writer(state) {
314                if let Err(observed_state) = self.state.compare_exchange(
315                    state,
316                    state | WRITER_BLOCKED_BIT,
317                    Ordering::Relaxed,
318                    Ordering::Relaxed,
319                ) {
320                    state = observed_state;
321                    continue;
322                }
323            }
324
325            other_writers_bit = WRITER_BLOCKED_BIT;
326
327            let generation_number = self.writer_queue.load(Ordering::Acquire);
328
329            // Before we go to sleep on the writer_queue at the fetched generation number, we need
330            // to make sure that some other thread is going to wake that generation of sleeping
331            // writers. If we didn't fetch the state again, it's possible that another thread could
332            // have cleared the WRITER_BLOCKED_BIT in the state and incremented the generation
333            // number between the last time we observed state and the time we observed the
334            // generation number.
335            //
336            // By observing the WRITER_BLOCKED_BIT *after* fetching the generation number, we
337            // ensure that either (a) this generation has already been awoken or (b) whoever clears
338            // the WRITER_BLOCKED_BIT bit will wake this generation in the future.
339            state = self.state.load(Ordering::Relaxed);
340
341            // If the lock is available or the WRITER_BLOCKED_BIT is missing, try again. No one has
342            // promised to wake the observed generation number.
343            if is_unlocked(state) || !has_blocked_writer(state) {
344                continue;
345            }
346
347            // Ignore spurious wakeups here, the loop will retry.
348            self.writer_queue
349                .wait(
350                    generation_number,
351                    None, // We don't integrate with priority inheritance yet.
352                    zx::MonotonicInstant::INFINITE,
353                )
354                .ok();
355
356            state = self.state.load(Ordering::Relaxed);
357        }
358    }
359
360    #[cold]
361    fn unlock_slow(&self, mut state: i32) {
362        debug_assert!(is_unlocked(state));
363
364        // There are only writers waiting.
365        if state == WRITER_BLOCKED_BIT {
366            match self.state.compare_exchange(
367                state,
368                INITIAL_STATE,
369                Ordering::Relaxed,
370                Ordering::Relaxed,
371            ) {
372                Ok(_) => {
373                    self.wake_writer();
374                    // We either made progress by waking a waiter or no one is waiting for this
375                    // lock anymore.
376                    return;
377                }
378                Err(observed_state) => {
379                    state = observed_state;
380                }
381            }
382        }
383
384        // There are both readers and writers waiting.
385        if state == READER_BLOCKED_BIT | WRITER_BLOCKED_BIT {
386            // Attempt to clear the WRITER_BLOCKED_BIT.
387            if self
388                .state
389                .compare_exchange(state, READER_BLOCKED_BIT, Ordering::Relaxed, Ordering::Relaxed)
390                .is_err()
391            {
392                // The state changed, which means another thread made progress. We're done.
393                return;
394            }
395            self.wake_writer();
396            // We cannot be sure that we actually work up a writer, which means we also need to
397            // wake up the readers to avoid the situation where a stack of readers are waiting for
398            // a non-existent writer to be done.
399            state = READER_BLOCKED_BIT;
400        }
401
402        // There are only readers waiting.
403        if state == READER_BLOCKED_BIT {
404            if self
405                .state
406                .compare_exchange(state, INITIAL_STATE, Ordering::Relaxed, Ordering::Relaxed)
407                .is_ok()
408            {
409                // Wake up all the readers.
410                self.wake_readers();
411            }
412        }
413    }
414
415    #[cold]
416    fn downgrade_slow(&self, mut state: i32) {
417        debug_assert!(has_blocked_reader(state));
418        loop {
419            if !has_blocked_reader(state) {
420                // Someone else must have woken up the readers.
421                return;
422            }
423
424            match self.state.compare_exchange(
425                state,
426                state - READER_BLOCKED_BIT,
427                Ordering::Relaxed,
428                Ordering::Relaxed,
429            ) {
430                Ok(_) => {
431                    // We cleared the READER_BLOCKED_BIT, so we need to wake the readers.
432                    self.wake_readers();
433                    return;
434                }
435                Err(observed_state) => {
436                    state = observed_state;
437                    continue;
438                }
439            }
440        }
441    }
442
443    fn wake_writer(&self) {
444        self.writer_queue.fetch_add(1, Ordering::Release);
445        // TODO: Track which thread owns this futex for priority inheritance.
446        self.writer_queue.wake(1);
447    }
448
449    fn wake_readers(&self) {
450        self.state.wake_all();
451    }
452}
453
454unsafe impl lock_api::RawRwLock for RawSyncRwLock {
455    const INIT: RawSyncRwLock =
456        RawSyncRwLock { state: zx::Futex::new(0), writer_queue: zx::Futex::new(0) };
457
458    // These operations do not need to happen on the same thread.
459    type GuardMarker = lock_api::GuardSend;
460
461    #[inline]
462    fn lock_shared(&self) {
463        if !self.try_lock_shared_fast() {
464            self.lock_shared_slow();
465        }
466    }
467
468    #[inline]
469    fn try_lock_shared(&self) -> bool {
470        self.try_lock_shared_fast()
471    }
472
473    #[inline]
474    unsafe fn unlock_shared(&self) {
475        let state = self.state.fetch_sub(READER_UNIT, Ordering::Release) - READER_UNIT;
476
477        // If we just released a reader, then we cannot have blocked readers unless we also have
478        // blocked writers because, otherwise, the reader would just have acquired the lock.
479        debug_assert!(!has_blocked_reader(state) || has_blocked_writer(state));
480
481        // If we were the last reader and there are writers blocked, we need to wake up the blocked
482        // writer.
483        if is_unlocked(state) && has_blocked_writer(state) {
484            self.unlock_slow(state);
485        }
486    }
487
488    #[inline]
489    fn lock_exclusive(&self) {
490        if self
491            .state
492            .compare_exchange_weak(INITIAL_STATE, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
493            .is_err()
494        {
495            self.lock_exclusive_slow();
496        }
497    }
498
499    #[inline]
500    fn try_lock_exclusive(&self) -> bool {
501        self.state
502            .compare_exchange(INITIAL_STATE, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
503            .is_ok()
504    }
505
506    #[inline]
507    unsafe fn unlock_exclusive(&self) {
508        let state = self.state.fetch_sub(WRITER_BIT, Ordering::Release) - WRITER_BIT;
509
510        // If we just released a writer, then there must not be any readers or writers.
511        debug_assert!(is_unlocked(state));
512
513        if has_blocked_reader(state) || has_blocked_writer(state) {
514            self.unlock_slow(state);
515        }
516    }
517}
518
519unsafe impl lock_api::RawRwLockDowngrade for RawSyncRwLock {
520    #[inline]
521    unsafe fn downgrade(&self) {
522        let state = self.state.fetch_add(READER_UNIT - WRITER_BIT, Ordering::Release);
523
524        if has_blocked_reader(state) {
525            self.downgrade_slow(state);
526        }
527    }
528}
529
530pub type RwLock<T> = lock_api::RwLock<RawSyncRwLock, T>;
531pub type RwLockReadGuard<'a, T> = lock_api::RwLockReadGuard<'a, RawSyncRwLock, T>;
532pub type RwLockWriteGuard<'a, T> = lock_api::RwLockWriteGuard<'a, RawSyncRwLock, T>;
533pub type MappedRwLockReadGuard<'a, T> = lock_api::MappedRwLockReadGuard<'a, RawSyncRwLock, T>;
534pub type MappedRwLockWriteGuard<'a, T> = lock_api::MappedRwLockWriteGuard<'a, RawSyncRwLock, T>;
535
536#[cfg(test)]
537mod test {
538    use super::*;
539    use std::sync::atomic::AtomicUsize;
540    use std::sync::Arc;
541
542    #[test]
543    fn test_write_and_read() {
544        let value = RwLock::<u32>::new(5);
545        let mut guard = value.write();
546        assert_eq!(*guard, 5);
547        *guard = 6;
548        assert_eq!(*guard, 6);
549        std::mem::drop(guard);
550
551        let guard = value.read();
552        assert_eq!(*guard, 6);
553    }
554
555    #[test]
556    fn test_try_during_read() {
557        let value = RwLock::<u32>::new(5);
558        let _read_guard = value.read();
559        assert!(value.try_write().is_none());
560        assert!(value.try_read().is_some());
561    }
562
563    #[test]
564    fn test_try_during_write() {
565        let value = RwLock::<u32>::new(5);
566        let _write_guard = value.write();
567        assert!(value.try_write().is_none());
568        assert!(value.try_read().is_none());
569    }
570
571    #[test]
572    fn test_downgrade() {
573        let value = RwLock::<u32>::new(5);
574        let mut guard = value.write();
575        assert_eq!(*guard, 5);
576        *guard = 6;
577        assert_eq!(*guard, 6);
578        assert!(value.try_write().is_none());
579        assert!(value.try_read().is_none());
580        let guard1 = RwLockWriteGuard::downgrade(guard);
581        assert_eq!(*guard1, 6);
582        assert!(value.try_write().is_none());
583        let guard2 = value.read();
584        assert_eq!(*guard2, 6);
585    }
586
587    struct State {
588        value: RwLock<u32>,
589        gate: zx::Futex,
590        writer_count: AtomicUsize,
591        reader_count: AtomicUsize,
592    }
593
594    impl Default for State {
595        fn default() -> Self {
596            Self {
597                value: Default::default(),
598                gate: zx::Futex::new(0),
599                writer_count: Default::default(),
600                reader_count: Default::default(),
601            }
602        }
603    }
604
605    impl State {
606        fn wait_for_gate(&self) {
607            while self.gate.load(Ordering::Acquire) == 0 {
608                // Ignore failures, we'll retry anyways.
609                self.gate.wait(0, None, zx::MonotonicInstant::INFINITE).ok();
610            }
611        }
612
613        fn open_gate(&self) {
614            self.gate.fetch_add(1, Ordering::Release);
615            self.gate.wake_all();
616        }
617
618        fn spawn_writer(state: Arc<Self>, count: usize) -> std::thread::JoinHandle<()> {
619            std::thread::spawn(move || {
620                state.wait_for_gate();
621                for _ in 0..count {
622                    let mut guard = state.value.write();
623                    *guard = *guard + 1;
624                    let writer_count = state.writer_count.fetch_add(1, Ordering::Acquire) + 1;
625                    let reader_count = state.reader_count.load(Ordering::Acquire);
626                    state.writer_count.fetch_sub(1, Ordering::Release);
627                    std::mem::drop(guard);
628                    assert_eq!(writer_count, 1, "More than one writer held the RwLock at once.");
629                    assert_eq!(
630                        reader_count, 0,
631                        "A reader and writer held the RwLock at the same time."
632                    );
633                }
634            })
635        }
636
637        fn spawn_reader(state: Arc<Self>, count: usize) -> std::thread::JoinHandle<()> {
638            std::thread::spawn(move || {
639                state.wait_for_gate();
640                for _ in 0..count {
641                    let guard = state.value.read();
642                    let observed_value = *guard;
643                    let reader_count = state.reader_count.fetch_add(1, Ordering::Acquire) + 1;
644                    let writer_count = state.writer_count.load(Ordering::Acquire);
645                    state.reader_count.fetch_sub(1, Ordering::Release);
646                    std::mem::drop(guard);
647                    assert!(observed_value < u32::MAX, "The value inside the RwLock underflowed.");
648                    assert_eq!(
649                        writer_count, 0,
650                        "A reader and writer held the RwLock at the same time."
651                    );
652                    assert!(reader_count > 0, "A reader held the RwLock without being counted.");
653                }
654            })
655        }
656    }
657
658    #[test]
659    fn test_thundering_writes() {
660        let state = Arc::new(State::default());
661        let mut threads = vec![];
662        for _ in 0..10 {
663            threads.push(State::spawn_writer(Arc::clone(&state), 100));
664        }
665
666        // Try to align the thundering herd to stress the RwLock as much as possible.
667        std::thread::sleep(std::time::Duration::from_millis(100));
668        state.open_gate();
669
670        while let Some(thread) = threads.pop() {
671            thread.join().expect("failed to join thread");
672        }
673        let guard = state.value.read();
674        assert_eq!(1000, *guard, "The RwLock held the wrong value at the end.");
675    }
676
677    #[test]
678    fn test_thundering_reads_and_writes() {
679        let state = Arc::new(State::default());
680        let mut threads = vec![];
681        for _ in 0..10 {
682            let state = Arc::clone(&state);
683            threads.push(State::spawn_writer(Arc::clone(&state), 100));
684            threads.push(State::spawn_reader(Arc::clone(&state), 100));
685        }
686
687        // Try to align the thundering herd to stress the RwLock as much as possible.
688        std::thread::sleep(std::time::Duration::from_millis(100));
689        state.open_gate();
690
691        while let Some(thread) = threads.pop() {
692            thread.join().expect("failed to join thread");
693        }
694        let guard = state.value.read();
695        assert_eq!(1000, *guard, "The RwLock held the wrong value at the end.");
696    }
697}