parking_lot/
raw_rwlock.rs

1// Copyright 2016 Amanieu d'Antras
2//
3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. This file may not be
6// copied, modified, or distributed except according to those terms.
7
8use crate::elision::{have_elision, AtomicElisionExt};
9use crate::raw_mutex::{TOKEN_HANDOFF, TOKEN_NORMAL};
10use crate::util;
11use core::{
12    cell::Cell,
13    sync::atomic::{AtomicUsize, Ordering},
14};
15use lock_api::{RawRwLock as RawRwLock_, RawRwLockUpgrade};
16use parking_lot_core::{
17    self, deadlock, FilterOp, ParkResult, ParkToken, SpinWait, UnparkResult, UnparkToken,
18};
19use std::time::{Duration, Instant};
20
21// This reader-writer lock implementation is based on Boost's upgrade_mutex:
22// https://github.com/boostorg/thread/blob/fc08c1fe2840baeeee143440fba31ef9e9a813c8/include/boost/thread/v2/shared_mutex.hpp#L432
23//
24// This implementation uses 2 wait queues, one at key [addr] and one at key
25// [addr + 1]. The primary queue is used for all new waiting threads, and the
26// secondary queue is used by the thread which has acquired WRITER_BIT but is
27// waiting for the remaining readers to exit the lock.
28//
29// This implementation is fair between readers and writers since it uses the
30// order in which threads first started queuing to alternate between read phases
31// and write phases. In particular is it not vulnerable to write starvation
32// since readers will block if there is a pending writer.
33
34// There is at least one thread in the main queue.
35const PARKED_BIT: usize = 0b0001;
36// There is a parked thread holding WRITER_BIT. WRITER_BIT must be set.
37const WRITER_PARKED_BIT: usize = 0b0010;
38// A reader is holding an upgradable lock. The reader count must be non-zero and
39// WRITER_BIT must not be set.
40const UPGRADABLE_BIT: usize = 0b0100;
41// If the reader count is zero: a writer is currently holding an exclusive lock.
42// Otherwise: a writer is waiting for the remaining readers to exit the lock.
43const WRITER_BIT: usize = 0b1000;
44// Mask of bits used to count readers.
45const READERS_MASK: usize = !0b1111;
46// Base unit for counting readers.
47const ONE_READER: usize = 0b10000;
48
49// Token indicating what type of lock a queued thread is trying to acquire
50const TOKEN_SHARED: ParkToken = ParkToken(ONE_READER);
51const TOKEN_EXCLUSIVE: ParkToken = ParkToken(WRITER_BIT);
52const TOKEN_UPGRADABLE: ParkToken = ParkToken(ONE_READER | UPGRADABLE_BIT);
53
54/// Raw reader-writer lock type backed by the parking lot.
55pub struct RawRwLock {
56    state: AtomicUsize,
57}
58
59unsafe impl lock_api::RawRwLock for RawRwLock {
60    const INIT: RawRwLock = RawRwLock {
61        state: AtomicUsize::new(0),
62    };
63
64    type GuardMarker = crate::GuardMarker;
65
66    #[inline]
67    fn lock_exclusive(&self) {
68        if self
69            .state
70            .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
71            .is_err()
72        {
73            let result = self.lock_exclusive_slow(None);
74            debug_assert!(result);
75        }
76        self.deadlock_acquire();
77    }
78
79    #[inline]
80    fn try_lock_exclusive(&self) -> bool {
81        if self
82            .state
83            .compare_exchange(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
84            .is_ok()
85        {
86            self.deadlock_acquire();
87            true
88        } else {
89            false
90        }
91    }
92
93    #[inline]
94    unsafe fn unlock_exclusive(&self) {
95        self.deadlock_release();
96        if self
97            .state
98            .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed)
99            .is_ok()
100        {
101            return;
102        }
103        self.unlock_exclusive_slow(false);
104    }
105
106    #[inline]
107    fn lock_shared(&self) {
108        if !self.try_lock_shared_fast(false) {
109            let result = self.lock_shared_slow(false, None);
110            debug_assert!(result);
111        }
112        self.deadlock_acquire();
113    }
114
115    #[inline]
116    fn try_lock_shared(&self) -> bool {
117        let result = if self.try_lock_shared_fast(false) {
118            true
119        } else {
120            self.try_lock_shared_slow(false)
121        };
122        if result {
123            self.deadlock_acquire();
124        }
125        result
126    }
127
128    #[inline]
129    unsafe fn unlock_shared(&self) {
130        self.deadlock_release();
131        let state = if have_elision() {
132            self.state.elision_fetch_sub_release(ONE_READER)
133        } else {
134            self.state.fetch_sub(ONE_READER, Ordering::Release)
135        };
136        if state & (READERS_MASK | WRITER_PARKED_BIT) == (ONE_READER | WRITER_PARKED_BIT) {
137            self.unlock_shared_slow();
138        }
139    }
140
141    #[inline]
142    fn is_locked(&self) -> bool {
143        let state = self.state.load(Ordering::Relaxed);
144        state & (WRITER_BIT | READERS_MASK) != 0
145    }
146
147    #[inline]
148    fn is_locked_exclusive(&self) -> bool {
149        let state = self.state.load(Ordering::Relaxed);
150        state & (WRITER_BIT) != 0
151    }
152}
153
154unsafe impl lock_api::RawRwLockFair for RawRwLock {
155    #[inline]
156    unsafe fn unlock_shared_fair(&self) {
157        // Shared unlocking is always fair in this implementation.
158        self.unlock_shared();
159    }
160
161    #[inline]
162    unsafe fn unlock_exclusive_fair(&self) {
163        self.deadlock_release();
164        if self
165            .state
166            .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed)
167            .is_ok()
168        {
169            return;
170        }
171        self.unlock_exclusive_slow(true);
172    }
173
174    #[inline]
175    unsafe fn bump_shared(&self) {
176        if self.state.load(Ordering::Relaxed) & (READERS_MASK | WRITER_BIT)
177            == ONE_READER | WRITER_BIT
178        {
179            self.bump_shared_slow();
180        }
181    }
182
183    #[inline]
184    unsafe fn bump_exclusive(&self) {
185        if self.state.load(Ordering::Relaxed) & PARKED_BIT != 0 {
186            self.bump_exclusive_slow();
187        }
188    }
189}
190
191unsafe impl lock_api::RawRwLockDowngrade for RawRwLock {
192    #[inline]
193    unsafe fn downgrade(&self) {
194        let state = self
195            .state
196            .fetch_add(ONE_READER - WRITER_BIT, Ordering::Release);
197
198        // Wake up parked shared and upgradable threads if there are any
199        if state & PARKED_BIT != 0 {
200            self.downgrade_slow();
201        }
202    }
203}
204
205unsafe impl lock_api::RawRwLockTimed for RawRwLock {
206    type Duration = Duration;
207    type Instant = Instant;
208
209    #[inline]
210    fn try_lock_shared_for(&self, timeout: Self::Duration) -> bool {
211        let result = if self.try_lock_shared_fast(false) {
212            true
213        } else {
214            self.lock_shared_slow(false, util::to_deadline(timeout))
215        };
216        if result {
217            self.deadlock_acquire();
218        }
219        result
220    }
221
222    #[inline]
223    fn try_lock_shared_until(&self, timeout: Self::Instant) -> bool {
224        let result = if self.try_lock_shared_fast(false) {
225            true
226        } else {
227            self.lock_shared_slow(false, Some(timeout))
228        };
229        if result {
230            self.deadlock_acquire();
231        }
232        result
233    }
234
235    #[inline]
236    fn try_lock_exclusive_for(&self, timeout: Duration) -> bool {
237        let result = if self
238            .state
239            .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
240            .is_ok()
241        {
242            true
243        } else {
244            self.lock_exclusive_slow(util::to_deadline(timeout))
245        };
246        if result {
247            self.deadlock_acquire();
248        }
249        result
250    }
251
252    #[inline]
253    fn try_lock_exclusive_until(&self, timeout: Instant) -> bool {
254        let result = if self
255            .state
256            .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
257            .is_ok()
258        {
259            true
260        } else {
261            self.lock_exclusive_slow(Some(timeout))
262        };
263        if result {
264            self.deadlock_acquire();
265        }
266        result
267    }
268}
269
270unsafe impl lock_api::RawRwLockRecursive for RawRwLock {
271    #[inline]
272    fn lock_shared_recursive(&self) {
273        if !self.try_lock_shared_fast(true) {
274            let result = self.lock_shared_slow(true, None);
275            debug_assert!(result);
276        }
277        self.deadlock_acquire();
278    }
279
280    #[inline]
281    fn try_lock_shared_recursive(&self) -> bool {
282        let result = if self.try_lock_shared_fast(true) {
283            true
284        } else {
285            self.try_lock_shared_slow(true)
286        };
287        if result {
288            self.deadlock_acquire();
289        }
290        result
291    }
292}
293
294unsafe impl lock_api::RawRwLockRecursiveTimed for RawRwLock {
295    #[inline]
296    fn try_lock_shared_recursive_for(&self, timeout: Self::Duration) -> bool {
297        let result = if self.try_lock_shared_fast(true) {
298            true
299        } else {
300            self.lock_shared_slow(true, util::to_deadline(timeout))
301        };
302        if result {
303            self.deadlock_acquire();
304        }
305        result
306    }
307
308    #[inline]
309    fn try_lock_shared_recursive_until(&self, timeout: Self::Instant) -> bool {
310        let result = if self.try_lock_shared_fast(true) {
311            true
312        } else {
313            self.lock_shared_slow(true, Some(timeout))
314        };
315        if result {
316            self.deadlock_acquire();
317        }
318        result
319    }
320}
321
322unsafe impl lock_api::RawRwLockUpgrade for RawRwLock {
323    #[inline]
324    fn lock_upgradable(&self) {
325        if !self.try_lock_upgradable_fast() {
326            let result = self.lock_upgradable_slow(None);
327            debug_assert!(result);
328        }
329        self.deadlock_acquire();
330    }
331
332    #[inline]
333    fn try_lock_upgradable(&self) -> bool {
334        let result = if self.try_lock_upgradable_fast() {
335            true
336        } else {
337            self.try_lock_upgradable_slow()
338        };
339        if result {
340            self.deadlock_acquire();
341        }
342        result
343    }
344
345    #[inline]
346    unsafe fn unlock_upgradable(&self) {
347        self.deadlock_release();
348        let state = self.state.load(Ordering::Relaxed);
349        if state & PARKED_BIT == 0 {
350            if self
351                .state
352                .compare_exchange_weak(
353                    state,
354                    state - (ONE_READER | UPGRADABLE_BIT),
355                    Ordering::Release,
356                    Ordering::Relaxed,
357                )
358                .is_ok()
359            {
360                return;
361            }
362        }
363        self.unlock_upgradable_slow(false);
364    }
365
366    #[inline]
367    unsafe fn upgrade(&self) {
368        let state = self.state.fetch_sub(
369            (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
370            Ordering::Acquire,
371        );
372        if state & READERS_MASK != ONE_READER {
373            let result = self.upgrade_slow(None);
374            debug_assert!(result);
375        }
376    }
377
378    #[inline]
379    unsafe fn try_upgrade(&self) -> bool {
380        if self
381            .state
382            .compare_exchange_weak(
383                ONE_READER | UPGRADABLE_BIT,
384                WRITER_BIT,
385                Ordering::Acquire,
386                Ordering::Relaxed,
387            )
388            .is_ok()
389        {
390            true
391        } else {
392            self.try_upgrade_slow()
393        }
394    }
395}
396
397unsafe impl lock_api::RawRwLockUpgradeFair for RawRwLock {
398    #[inline]
399    unsafe fn unlock_upgradable_fair(&self) {
400        self.deadlock_release();
401        let state = self.state.load(Ordering::Relaxed);
402        if state & PARKED_BIT == 0 {
403            if self
404                .state
405                .compare_exchange_weak(
406                    state,
407                    state - (ONE_READER | UPGRADABLE_BIT),
408                    Ordering::Release,
409                    Ordering::Relaxed,
410                )
411                .is_ok()
412            {
413                return;
414            }
415        }
416        self.unlock_upgradable_slow(false);
417    }
418
419    #[inline]
420    unsafe fn bump_upgradable(&self) {
421        if self.state.load(Ordering::Relaxed) == ONE_READER | UPGRADABLE_BIT | PARKED_BIT {
422            self.bump_upgradable_slow();
423        }
424    }
425}
426
427unsafe impl lock_api::RawRwLockUpgradeDowngrade for RawRwLock {
428    #[inline]
429    unsafe fn downgrade_upgradable(&self) {
430        let state = self.state.fetch_sub(UPGRADABLE_BIT, Ordering::Relaxed);
431
432        // Wake up parked upgradable threads if there are any
433        if state & PARKED_BIT != 0 {
434            self.downgrade_slow();
435        }
436    }
437
438    #[inline]
439    unsafe fn downgrade_to_upgradable(&self) {
440        let state = self.state.fetch_add(
441            (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
442            Ordering::Release,
443        );
444
445        // Wake up parked shared threads if there are any
446        if state & PARKED_BIT != 0 {
447            self.downgrade_to_upgradable_slow();
448        }
449    }
450}
451
452unsafe impl lock_api::RawRwLockUpgradeTimed for RawRwLock {
453    #[inline]
454    fn try_lock_upgradable_until(&self, timeout: Instant) -> bool {
455        let result = if self.try_lock_upgradable_fast() {
456            true
457        } else {
458            self.lock_upgradable_slow(Some(timeout))
459        };
460        if result {
461            self.deadlock_acquire();
462        }
463        result
464    }
465
466    #[inline]
467    fn try_lock_upgradable_for(&self, timeout: Duration) -> bool {
468        let result = if self.try_lock_upgradable_fast() {
469            true
470        } else {
471            self.lock_upgradable_slow(util::to_deadline(timeout))
472        };
473        if result {
474            self.deadlock_acquire();
475        }
476        result
477    }
478
479    #[inline]
480    unsafe fn try_upgrade_until(&self, timeout: Instant) -> bool {
481        let state = self.state.fetch_sub(
482            (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
483            Ordering::Relaxed,
484        );
485        if state & READERS_MASK == ONE_READER {
486            true
487        } else {
488            self.upgrade_slow(Some(timeout))
489        }
490    }
491
492    #[inline]
493    unsafe fn try_upgrade_for(&self, timeout: Duration) -> bool {
494        let state = self.state.fetch_sub(
495            (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
496            Ordering::Relaxed,
497        );
498        if state & READERS_MASK == ONE_READER {
499            true
500        } else {
501            self.upgrade_slow(util::to_deadline(timeout))
502        }
503    }
504}
505
506impl RawRwLock {
507    #[inline(always)]
508    fn try_lock_shared_fast(&self, recursive: bool) -> bool {
509        let state = self.state.load(Ordering::Relaxed);
510
511        // We can't allow grabbing a shared lock if there is a writer, even if
512        // the writer is still waiting for the remaining readers to exit.
513        if state & WRITER_BIT != 0 {
514            // To allow recursive locks, we make an exception and allow readers
515            // to skip ahead of a pending writer to avoid deadlocking, at the
516            // cost of breaking the fairness guarantees.
517            if !recursive || state & READERS_MASK == 0 {
518                return false;
519            }
520        }
521
522        // Use hardware lock elision to avoid cache conflicts when multiple
523        // readers try to acquire the lock. We only do this if the lock is
524        // completely empty since elision handles conflicts poorly.
525        if have_elision() && state == 0 {
526            self.state
527                .elision_compare_exchange_acquire(0, ONE_READER)
528                .is_ok()
529        } else if let Some(new_state) = state.checked_add(ONE_READER) {
530            self.state
531                .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed)
532                .is_ok()
533        } else {
534            false
535        }
536    }
537
538    #[cold]
539    fn try_lock_shared_slow(&self, recursive: bool) -> bool {
540        let mut state = self.state.load(Ordering::Relaxed);
541        loop {
542            // This mirrors the condition in try_lock_shared_fast
543            if state & WRITER_BIT != 0 {
544                if !recursive || state & READERS_MASK == 0 {
545                    return false;
546                }
547            }
548            if have_elision() && state == 0 {
549                match self.state.elision_compare_exchange_acquire(0, ONE_READER) {
550                    Ok(_) => return true,
551                    Err(x) => state = x,
552                }
553            } else {
554                match self.state.compare_exchange_weak(
555                    state,
556                    state
557                        .checked_add(ONE_READER)
558                        .expect("RwLock reader count overflow"),
559                    Ordering::Acquire,
560                    Ordering::Relaxed,
561                ) {
562                    Ok(_) => return true,
563                    Err(x) => state = x,
564                }
565            }
566        }
567    }
568
569    #[inline(always)]
570    fn try_lock_upgradable_fast(&self) -> bool {
571        let state = self.state.load(Ordering::Relaxed);
572
573        // We can't grab an upgradable lock if there is already a writer or
574        // upgradable reader.
575        if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
576            return false;
577        }
578
579        if let Some(new_state) = state.checked_add(ONE_READER | UPGRADABLE_BIT) {
580            self.state
581                .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed)
582                .is_ok()
583        } else {
584            false
585        }
586    }
587
588    #[cold]
589    fn try_lock_upgradable_slow(&self) -> bool {
590        let mut state = self.state.load(Ordering::Relaxed);
591        loop {
592            // This mirrors the condition in try_lock_upgradable_fast
593            if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
594                return false;
595            }
596
597            match self.state.compare_exchange_weak(
598                state,
599                state
600                    .checked_add(ONE_READER | UPGRADABLE_BIT)
601                    .expect("RwLock reader count overflow"),
602                Ordering::Acquire,
603                Ordering::Relaxed,
604            ) {
605                Ok(_) => return true,
606                Err(x) => state = x,
607            }
608        }
609    }
610
611    #[cold]
612    fn lock_exclusive_slow(&self, timeout: Option<Instant>) -> bool {
613        let try_lock = |state: &mut usize| {
614            loop {
615                if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
616                    return false;
617                }
618
619                // Grab WRITER_BIT if it isn't set, even if there are parked threads.
620                match self.state.compare_exchange_weak(
621                    *state,
622                    *state | WRITER_BIT,
623                    Ordering::Acquire,
624                    Ordering::Relaxed,
625                ) {
626                    Ok(_) => return true,
627                    Err(x) => *state = x,
628                }
629            }
630        };
631
632        // Step 1: grab exclusive ownership of WRITER_BIT
633        let timed_out = !self.lock_common(
634            timeout,
635            TOKEN_EXCLUSIVE,
636            try_lock,
637            WRITER_BIT | UPGRADABLE_BIT,
638        );
639        if timed_out {
640            return false;
641        }
642
643        // Step 2: wait for all remaining readers to exit the lock.
644        self.wait_for_readers(timeout, 0)
645    }
646
647    #[cold]
648    fn unlock_exclusive_slow(&self, force_fair: bool) {
649        // There are threads to unpark. Try to unpark as many as we can.
650        let callback = |mut new_state, result: UnparkResult| {
651            // If we are using a fair unlock then we should keep the
652            // rwlock locked and hand it off to the unparked threads.
653            if result.unparked_threads != 0 && (force_fair || result.be_fair) {
654                if result.have_more_threads {
655                    new_state |= PARKED_BIT;
656                }
657                self.state.store(new_state, Ordering::Release);
658                TOKEN_HANDOFF
659            } else {
660                // Clear the parked bit if there are no more parked threads.
661                if result.have_more_threads {
662                    self.state.store(PARKED_BIT, Ordering::Release);
663                } else {
664                    self.state.store(0, Ordering::Release);
665                }
666                TOKEN_NORMAL
667            }
668        };
669        // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
670        unsafe {
671            self.wake_parked_threads(0, callback);
672        }
673    }
674
675    #[cold]
676    fn lock_shared_slow(&self, recursive: bool, timeout: Option<Instant>) -> bool {
677        let try_lock = |state: &mut usize| {
678            let mut spinwait_shared = SpinWait::new();
679            loop {
680                // Use hardware lock elision to avoid cache conflicts when multiple
681                // readers try to acquire the lock. We only do this if the lock is
682                // completely empty since elision handles conflicts poorly.
683                if have_elision() && *state == 0 {
684                    match self.state.elision_compare_exchange_acquire(0, ONE_READER) {
685                        Ok(_) => return true,
686                        Err(x) => *state = x,
687                    }
688                }
689
690                // This is the same condition as try_lock_shared_fast
691                if *state & WRITER_BIT != 0 {
692                    if !recursive || *state & READERS_MASK == 0 {
693                        return false;
694                    }
695                }
696
697                if self
698                    .state
699                    .compare_exchange_weak(
700                        *state,
701                        state
702                            .checked_add(ONE_READER)
703                            .expect("RwLock reader count overflow"),
704                        Ordering::Acquire,
705                        Ordering::Relaxed,
706                    )
707                    .is_ok()
708                {
709                    return true;
710                }
711
712                // If there is high contention on the reader count then we want
713                // to leave some time between attempts to acquire the lock to
714                // let other threads make progress.
715                spinwait_shared.spin_no_yield();
716                *state = self.state.load(Ordering::Relaxed);
717            }
718        };
719        self.lock_common(timeout, TOKEN_SHARED, try_lock, WRITER_BIT)
720    }
721
722    #[cold]
723    fn unlock_shared_slow(&self) {
724        // At this point WRITER_PARKED_BIT is set and READER_MASK is empty. We
725        // just need to wake up a potentially sleeping pending writer.
726        // Using the 2nd key at addr + 1
727        let addr = self as *const _ as usize + 1;
728        let callback = |_result: UnparkResult| {
729            // Clear the WRITER_PARKED_BIT here since there can only be one
730            // parked writer thread.
731            self.state.fetch_and(!WRITER_PARKED_BIT, Ordering::Relaxed);
732            TOKEN_NORMAL
733        };
734        // SAFETY:
735        //   * `addr` is an address we control.
736        //   * `callback` does not panic or call into any function of `parking_lot`.
737        unsafe {
738            parking_lot_core::unpark_one(addr, callback);
739        }
740    }
741
742    #[cold]
743    fn lock_upgradable_slow(&self, timeout: Option<Instant>) -> bool {
744        let try_lock = |state: &mut usize| {
745            let mut spinwait_shared = SpinWait::new();
746            loop {
747                if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
748                    return false;
749                }
750
751                if self
752                    .state
753                    .compare_exchange_weak(
754                        *state,
755                        state
756                            .checked_add(ONE_READER | UPGRADABLE_BIT)
757                            .expect("RwLock reader count overflow"),
758                        Ordering::Acquire,
759                        Ordering::Relaxed,
760                    )
761                    .is_ok()
762                {
763                    return true;
764                }
765
766                // If there is high contention on the reader count then we want
767                // to leave some time between attempts to acquire the lock to
768                // let other threads make progress.
769                spinwait_shared.spin_no_yield();
770                *state = self.state.load(Ordering::Relaxed);
771            }
772        };
773        self.lock_common(
774            timeout,
775            TOKEN_UPGRADABLE,
776            try_lock,
777            WRITER_BIT | UPGRADABLE_BIT,
778        )
779    }
780
781    #[cold]
782    fn unlock_upgradable_slow(&self, force_fair: bool) {
783        // Just release the lock if there are no parked threads.
784        let mut state = self.state.load(Ordering::Relaxed);
785        while state & PARKED_BIT == 0 {
786            match self.state.compare_exchange_weak(
787                state,
788                state - (ONE_READER | UPGRADABLE_BIT),
789                Ordering::Release,
790                Ordering::Relaxed,
791            ) {
792                Ok(_) => return,
793                Err(x) => state = x,
794            }
795        }
796
797        // There are threads to unpark. Try to unpark as many as we can.
798        let callback = |new_state, result: UnparkResult| {
799            // If we are using a fair unlock then we should keep the
800            // rwlock locked and hand it off to the unparked threads.
801            let mut state = self.state.load(Ordering::Relaxed);
802            if force_fair || result.be_fair {
803                // Fall back to normal unpark on overflow. Panicking is
804                // not allowed in parking_lot callbacks.
805                while let Some(mut new_state) =
806                    (state - (ONE_READER | UPGRADABLE_BIT)).checked_add(new_state)
807                {
808                    if result.have_more_threads {
809                        new_state |= PARKED_BIT;
810                    } else {
811                        new_state &= !PARKED_BIT;
812                    }
813                    match self.state.compare_exchange_weak(
814                        state,
815                        new_state,
816                        Ordering::Relaxed,
817                        Ordering::Relaxed,
818                    ) {
819                        Ok(_) => return TOKEN_HANDOFF,
820                        Err(x) => state = x,
821                    }
822                }
823            }
824
825            // Otherwise just release the upgradable lock and update PARKED_BIT.
826            loop {
827                let mut new_state = state - (ONE_READER | UPGRADABLE_BIT);
828                if result.have_more_threads {
829                    new_state |= PARKED_BIT;
830                } else {
831                    new_state &= !PARKED_BIT;
832                }
833                match self.state.compare_exchange_weak(
834                    state,
835                    new_state,
836                    Ordering::Relaxed,
837                    Ordering::Relaxed,
838                ) {
839                    Ok(_) => return TOKEN_NORMAL,
840                    Err(x) => state = x,
841                }
842            }
843        };
844        // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
845        unsafe {
846            self.wake_parked_threads(0, callback);
847        }
848    }
849
850    #[cold]
851    fn try_upgrade_slow(&self) -> bool {
852        let mut state = self.state.load(Ordering::Relaxed);
853        loop {
854            if state & READERS_MASK != ONE_READER {
855                return false;
856            }
857            match self.state.compare_exchange_weak(
858                state,
859                state - (ONE_READER | UPGRADABLE_BIT) + WRITER_BIT,
860                Ordering::Relaxed,
861                Ordering::Relaxed,
862            ) {
863                Ok(_) => return true,
864                Err(x) => state = x,
865            }
866        }
867    }
868
869    #[cold]
870    fn upgrade_slow(&self, timeout: Option<Instant>) -> bool {
871        self.wait_for_readers(timeout, ONE_READER | UPGRADABLE_BIT)
872    }
873
874    #[cold]
875    fn downgrade_slow(&self) {
876        // We only reach this point if PARKED_BIT is set.
877        let callback = |_, result: UnparkResult| {
878            // Clear the parked bit if there no more parked threads
879            if !result.have_more_threads {
880                self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
881            }
882            TOKEN_NORMAL
883        };
884        // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
885        unsafe {
886            self.wake_parked_threads(ONE_READER, callback);
887        }
888    }
889
890    #[cold]
891    fn downgrade_to_upgradable_slow(&self) {
892        // We only reach this point if PARKED_BIT is set.
893        let callback = |_, result: UnparkResult| {
894            // Clear the parked bit if there no more parked threads
895            if !result.have_more_threads {
896                self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
897            }
898            TOKEN_NORMAL
899        };
900        // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
901        unsafe {
902            self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback);
903        }
904    }
905
906    #[cold]
907    unsafe fn bump_shared_slow(&self) {
908        self.unlock_shared();
909        self.lock_shared();
910    }
911
912    #[cold]
913    fn bump_exclusive_slow(&self) {
914        self.deadlock_release();
915        self.unlock_exclusive_slow(true);
916        self.lock_exclusive();
917    }
918
919    #[cold]
920    fn bump_upgradable_slow(&self) {
921        self.deadlock_release();
922        self.unlock_upgradable_slow(true);
923        self.lock_upgradable();
924    }
925
926    /// Common code for waking up parked threads after releasing WRITER_BIT or
927    /// UPGRADABLE_BIT.
928    ///
929    /// # Safety
930    ///
931    /// `callback` must uphold the requirements of the `callback` parameter to
932    /// `parking_lot_core::unpark_filter`. Meaning no panics or calls into any function in
933    /// `parking_lot`.
934    #[inline]
935    unsafe fn wake_parked_threads(
936        &self,
937        new_state: usize,
938        callback: impl FnOnce(usize, UnparkResult) -> UnparkToken,
939    ) {
940        // We must wake up at least one upgrader or writer if there is one,
941        // otherwise they may end up parked indefinitely since unlock_shared
942        // does not call wake_parked_threads.
943        let new_state = Cell::new(new_state);
944        let addr = self as *const _ as usize;
945        let filter = |ParkToken(token)| {
946            let s = new_state.get();
947
948            // If we are waking up a writer, don't wake anything else.
949            if s & WRITER_BIT != 0 {
950                return FilterOp::Stop;
951            }
952
953            // Otherwise wake *all* readers and one upgrader/writer.
954            if token & (UPGRADABLE_BIT | WRITER_BIT) != 0 && s & UPGRADABLE_BIT != 0 {
955                // Skip writers and upgradable readers if we already have
956                // a writer/upgradable reader.
957                FilterOp::Skip
958            } else {
959                new_state.set(s + token);
960                FilterOp::Unpark
961            }
962        };
963        let callback = |result| callback(new_state.get(), result);
964        // SAFETY:
965        // * `addr` is an address we control.
966        // * `filter` does not panic or call into any function of `parking_lot`.
967        // * `callback` safety responsibility is on caller
968        parking_lot_core::unpark_filter(addr, filter, callback);
969    }
970
971    // Common code for waiting for readers to exit the lock after acquiring
972    // WRITER_BIT.
973    #[inline]
974    fn wait_for_readers(&self, timeout: Option<Instant>, prev_value: usize) -> bool {
975        // At this point WRITER_BIT is already set, we just need to wait for the
976        // remaining readers to exit the lock.
977        let mut spinwait = SpinWait::new();
978        let mut state = self.state.load(Ordering::Acquire);
979        while state & READERS_MASK != 0 {
980            // Spin a few times to wait for readers to exit
981            if spinwait.spin() {
982                state = self.state.load(Ordering::Acquire);
983                continue;
984            }
985
986            // Set the parked bit
987            if state & WRITER_PARKED_BIT == 0 {
988                if let Err(x) = self.state.compare_exchange_weak(
989                    state,
990                    state | WRITER_PARKED_BIT,
991                    Ordering::Relaxed,
992                    Ordering::Relaxed,
993                ) {
994                    state = x;
995                    continue;
996                }
997            }
998
999            // Park our thread until we are woken up by an unlock
1000            // Using the 2nd key at addr + 1
1001            let addr = self as *const _ as usize + 1;
1002            let validate = || {
1003                let state = self.state.load(Ordering::Relaxed);
1004                state & READERS_MASK != 0 && state & WRITER_PARKED_BIT != 0
1005            };
1006            let before_sleep = || {};
1007            let timed_out = |_, _| {};
1008            // SAFETY:
1009            //   * `addr` is an address we control.
1010            //   * `validate`/`timed_out` does not panic or call into any function of `parking_lot`.
1011            //   * `before_sleep` does not call `park`, nor does it panic.
1012            let park_result = unsafe {
1013                parking_lot_core::park(
1014                    addr,
1015                    validate,
1016                    before_sleep,
1017                    timed_out,
1018                    TOKEN_EXCLUSIVE,
1019                    timeout,
1020                )
1021            };
1022            match park_result {
1023                // We still need to re-check the state if we are unparked
1024                // since a previous writer timing-out could have allowed
1025                // another reader to sneak in before we parked.
1026                ParkResult::Unparked(_) | ParkResult::Invalid => {
1027                    state = self.state.load(Ordering::Acquire);
1028                    continue;
1029                }
1030
1031                // Timeout expired
1032                ParkResult::TimedOut => {
1033                    // We need to release WRITER_BIT and revert back to
1034                    // our previous value. We also wake up any threads that
1035                    // might be waiting on WRITER_BIT.
1036                    let state = self.state.fetch_add(
1037                        prev_value.wrapping_sub(WRITER_BIT | WRITER_PARKED_BIT),
1038                        Ordering::Relaxed,
1039                    );
1040                    if state & PARKED_BIT != 0 {
1041                        let callback = |_, result: UnparkResult| {
1042                            // Clear the parked bit if there no more parked threads
1043                            if !result.have_more_threads {
1044                                self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
1045                            }
1046                            TOKEN_NORMAL
1047                        };
1048                        // SAFETY: `callback` does not panic or call any function of `parking_lot`.
1049                        unsafe {
1050                            self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback);
1051                        }
1052                    }
1053                    return false;
1054                }
1055            }
1056        }
1057        true
1058    }
1059
1060    /// Common code for acquiring a lock
1061    #[inline]
1062    fn lock_common(
1063        &self,
1064        timeout: Option<Instant>,
1065        token: ParkToken,
1066        mut try_lock: impl FnMut(&mut usize) -> bool,
1067        validate_flags: usize,
1068    ) -> bool {
1069        let mut spinwait = SpinWait::new();
1070        let mut state = self.state.load(Ordering::Relaxed);
1071        loop {
1072            // Attempt to grab the lock
1073            if try_lock(&mut state) {
1074                return true;
1075            }
1076
1077            // If there are no parked threads, try spinning a few times.
1078            if state & (PARKED_BIT | WRITER_PARKED_BIT) == 0 && spinwait.spin() {
1079                state = self.state.load(Ordering::Relaxed);
1080                continue;
1081            }
1082
1083            // Set the parked bit
1084            if state & PARKED_BIT == 0 {
1085                if let Err(x) = self.state.compare_exchange_weak(
1086                    state,
1087                    state | PARKED_BIT,
1088                    Ordering::Relaxed,
1089                    Ordering::Relaxed,
1090                ) {
1091                    state = x;
1092                    continue;
1093                }
1094            }
1095
1096            // Park our thread until we are woken up by an unlock
1097            let addr = self as *const _ as usize;
1098            let validate = || {
1099                let state = self.state.load(Ordering::Relaxed);
1100                state & PARKED_BIT != 0 && (state & validate_flags != 0)
1101            };
1102            let before_sleep = || {};
1103            let timed_out = |_, was_last_thread| {
1104                // Clear the parked bit if we were the last parked thread
1105                if was_last_thread {
1106                    self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
1107                }
1108            };
1109
1110            // SAFETY:
1111            // * `addr` is an address we control.
1112            // * `validate`/`timed_out` does not panic or call into any function of `parking_lot`.
1113            // * `before_sleep` does not call `park`, nor does it panic.
1114            let park_result = unsafe {
1115                parking_lot_core::park(addr, validate, before_sleep, timed_out, token, timeout)
1116            };
1117            match park_result {
1118                // The thread that unparked us passed the lock on to us
1119                // directly without unlocking it.
1120                ParkResult::Unparked(TOKEN_HANDOFF) => return true,
1121
1122                // We were unparked normally, try acquiring the lock again
1123                ParkResult::Unparked(_) => (),
1124
1125                // The validation function failed, try locking again
1126                ParkResult::Invalid => (),
1127
1128                // Timeout expired
1129                ParkResult::TimedOut => return false,
1130            }
1131
1132            // Loop back and try locking again
1133            spinwait.reset();
1134            state = self.state.load(Ordering::Relaxed);
1135        }
1136    }
1137
1138    #[inline]
1139    fn deadlock_acquire(&self) {
1140        unsafe { deadlock::acquire_resource(self as *const _ as usize) };
1141        unsafe { deadlock::acquire_resource(self as *const _ as usize + 1) };
1142    }
1143
1144    #[inline]
1145    fn deadlock_release(&self) {
1146        unsafe { deadlock::release_resource(self as *const _ as usize) };
1147        unsafe { deadlock::release_resource(self as *const _ as usize + 1) };
1148    }
1149}