1use crate::elision::{have_elision, AtomicElisionExt};
9use crate::raw_mutex::{TOKEN_HANDOFF, TOKEN_NORMAL};
10use crate::util;
11use core::{
12 cell::Cell,
13 sync::atomic::{AtomicUsize, Ordering},
14};
15use lock_api::{RawRwLock as RawRwLock_, RawRwLockUpgrade};
16use parking_lot_core::{
17 self, deadlock, FilterOp, ParkResult, ParkToken, SpinWait, UnparkResult, UnparkToken,
18};
19use std::time::{Duration, Instant};
20
21const PARKED_BIT: usize = 0b0001;
36const WRITER_PARKED_BIT: usize = 0b0010;
38const UPGRADABLE_BIT: usize = 0b0100;
41const WRITER_BIT: usize = 0b1000;
44const READERS_MASK: usize = !0b1111;
46const ONE_READER: usize = 0b10000;
48
49const TOKEN_SHARED: ParkToken = ParkToken(ONE_READER);
51const TOKEN_EXCLUSIVE: ParkToken = ParkToken(WRITER_BIT);
52const TOKEN_UPGRADABLE: ParkToken = ParkToken(ONE_READER | UPGRADABLE_BIT);
53
54pub struct RawRwLock {
56 state: AtomicUsize,
57}
58
59unsafe impl lock_api::RawRwLock for RawRwLock {
60 const INIT: RawRwLock = RawRwLock {
61 state: AtomicUsize::new(0),
62 };
63
64 type GuardMarker = crate::GuardMarker;
65
66 #[inline]
67 fn lock_exclusive(&self) {
68 if self
69 .state
70 .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
71 .is_err()
72 {
73 let result = self.lock_exclusive_slow(None);
74 debug_assert!(result);
75 }
76 self.deadlock_acquire();
77 }
78
79 #[inline]
80 fn try_lock_exclusive(&self) -> bool {
81 if self
82 .state
83 .compare_exchange(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
84 .is_ok()
85 {
86 self.deadlock_acquire();
87 true
88 } else {
89 false
90 }
91 }
92
93 #[inline]
94 unsafe fn unlock_exclusive(&self) {
95 self.deadlock_release();
96 if self
97 .state
98 .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed)
99 .is_ok()
100 {
101 return;
102 }
103 self.unlock_exclusive_slow(false);
104 }
105
106 #[inline]
107 fn lock_shared(&self) {
108 if !self.try_lock_shared_fast(false) {
109 let result = self.lock_shared_slow(false, None);
110 debug_assert!(result);
111 }
112 self.deadlock_acquire();
113 }
114
115 #[inline]
116 fn try_lock_shared(&self) -> bool {
117 let result = if self.try_lock_shared_fast(false) {
118 true
119 } else {
120 self.try_lock_shared_slow(false)
121 };
122 if result {
123 self.deadlock_acquire();
124 }
125 result
126 }
127
128 #[inline]
129 unsafe fn unlock_shared(&self) {
130 self.deadlock_release();
131 let state = if have_elision() {
132 self.state.elision_fetch_sub_release(ONE_READER)
133 } else {
134 self.state.fetch_sub(ONE_READER, Ordering::Release)
135 };
136 if state & (READERS_MASK | WRITER_PARKED_BIT) == (ONE_READER | WRITER_PARKED_BIT) {
137 self.unlock_shared_slow();
138 }
139 }
140
141 #[inline]
142 fn is_locked(&self) -> bool {
143 let state = self.state.load(Ordering::Relaxed);
144 state & (WRITER_BIT | READERS_MASK) != 0
145 }
146
147 #[inline]
148 fn is_locked_exclusive(&self) -> bool {
149 let state = self.state.load(Ordering::Relaxed);
150 state & (WRITER_BIT) != 0
151 }
152}
153
154unsafe impl lock_api::RawRwLockFair for RawRwLock {
155 #[inline]
156 unsafe fn unlock_shared_fair(&self) {
157 self.unlock_shared();
159 }
160
161 #[inline]
162 unsafe fn unlock_exclusive_fair(&self) {
163 self.deadlock_release();
164 if self
165 .state
166 .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed)
167 .is_ok()
168 {
169 return;
170 }
171 self.unlock_exclusive_slow(true);
172 }
173
174 #[inline]
175 unsafe fn bump_shared(&self) {
176 if self.state.load(Ordering::Relaxed) & (READERS_MASK | WRITER_BIT)
177 == ONE_READER | WRITER_BIT
178 {
179 self.bump_shared_slow();
180 }
181 }
182
183 #[inline]
184 unsafe fn bump_exclusive(&self) {
185 if self.state.load(Ordering::Relaxed) & PARKED_BIT != 0 {
186 self.bump_exclusive_slow();
187 }
188 }
189}
190
191unsafe impl lock_api::RawRwLockDowngrade for RawRwLock {
192 #[inline]
193 unsafe fn downgrade(&self) {
194 let state = self
195 .state
196 .fetch_add(ONE_READER - WRITER_BIT, Ordering::Release);
197
198 if state & PARKED_BIT != 0 {
200 self.downgrade_slow();
201 }
202 }
203}
204
205unsafe impl lock_api::RawRwLockTimed for RawRwLock {
206 type Duration = Duration;
207 type Instant = Instant;
208
209 #[inline]
210 fn try_lock_shared_for(&self, timeout: Self::Duration) -> bool {
211 let result = if self.try_lock_shared_fast(false) {
212 true
213 } else {
214 self.lock_shared_slow(false, util::to_deadline(timeout))
215 };
216 if result {
217 self.deadlock_acquire();
218 }
219 result
220 }
221
222 #[inline]
223 fn try_lock_shared_until(&self, timeout: Self::Instant) -> bool {
224 let result = if self.try_lock_shared_fast(false) {
225 true
226 } else {
227 self.lock_shared_slow(false, Some(timeout))
228 };
229 if result {
230 self.deadlock_acquire();
231 }
232 result
233 }
234
235 #[inline]
236 fn try_lock_exclusive_for(&self, timeout: Duration) -> bool {
237 let result = if self
238 .state
239 .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
240 .is_ok()
241 {
242 true
243 } else {
244 self.lock_exclusive_slow(util::to_deadline(timeout))
245 };
246 if result {
247 self.deadlock_acquire();
248 }
249 result
250 }
251
252 #[inline]
253 fn try_lock_exclusive_until(&self, timeout: Instant) -> bool {
254 let result = if self
255 .state
256 .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
257 .is_ok()
258 {
259 true
260 } else {
261 self.lock_exclusive_slow(Some(timeout))
262 };
263 if result {
264 self.deadlock_acquire();
265 }
266 result
267 }
268}
269
270unsafe impl lock_api::RawRwLockRecursive for RawRwLock {
271 #[inline]
272 fn lock_shared_recursive(&self) {
273 if !self.try_lock_shared_fast(true) {
274 let result = self.lock_shared_slow(true, None);
275 debug_assert!(result);
276 }
277 self.deadlock_acquire();
278 }
279
280 #[inline]
281 fn try_lock_shared_recursive(&self) -> bool {
282 let result = if self.try_lock_shared_fast(true) {
283 true
284 } else {
285 self.try_lock_shared_slow(true)
286 };
287 if result {
288 self.deadlock_acquire();
289 }
290 result
291 }
292}
293
294unsafe impl lock_api::RawRwLockRecursiveTimed for RawRwLock {
295 #[inline]
296 fn try_lock_shared_recursive_for(&self, timeout: Self::Duration) -> bool {
297 let result = if self.try_lock_shared_fast(true) {
298 true
299 } else {
300 self.lock_shared_slow(true, util::to_deadline(timeout))
301 };
302 if result {
303 self.deadlock_acquire();
304 }
305 result
306 }
307
308 #[inline]
309 fn try_lock_shared_recursive_until(&self, timeout: Self::Instant) -> bool {
310 let result = if self.try_lock_shared_fast(true) {
311 true
312 } else {
313 self.lock_shared_slow(true, Some(timeout))
314 };
315 if result {
316 self.deadlock_acquire();
317 }
318 result
319 }
320}
321
322unsafe impl lock_api::RawRwLockUpgrade for RawRwLock {
323 #[inline]
324 fn lock_upgradable(&self) {
325 if !self.try_lock_upgradable_fast() {
326 let result = self.lock_upgradable_slow(None);
327 debug_assert!(result);
328 }
329 self.deadlock_acquire();
330 }
331
332 #[inline]
333 fn try_lock_upgradable(&self) -> bool {
334 let result = if self.try_lock_upgradable_fast() {
335 true
336 } else {
337 self.try_lock_upgradable_slow()
338 };
339 if result {
340 self.deadlock_acquire();
341 }
342 result
343 }
344
345 #[inline]
346 unsafe fn unlock_upgradable(&self) {
347 self.deadlock_release();
348 let state = self.state.load(Ordering::Relaxed);
349 if state & PARKED_BIT == 0 {
350 if self
351 .state
352 .compare_exchange_weak(
353 state,
354 state - (ONE_READER | UPGRADABLE_BIT),
355 Ordering::Release,
356 Ordering::Relaxed,
357 )
358 .is_ok()
359 {
360 return;
361 }
362 }
363 self.unlock_upgradable_slow(false);
364 }
365
366 #[inline]
367 unsafe fn upgrade(&self) {
368 let state = self.state.fetch_sub(
369 (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
370 Ordering::Acquire,
371 );
372 if state & READERS_MASK != ONE_READER {
373 let result = self.upgrade_slow(None);
374 debug_assert!(result);
375 }
376 }
377
378 #[inline]
379 unsafe fn try_upgrade(&self) -> bool {
380 if self
381 .state
382 .compare_exchange_weak(
383 ONE_READER | UPGRADABLE_BIT,
384 WRITER_BIT,
385 Ordering::Acquire,
386 Ordering::Relaxed,
387 )
388 .is_ok()
389 {
390 true
391 } else {
392 self.try_upgrade_slow()
393 }
394 }
395}
396
397unsafe impl lock_api::RawRwLockUpgradeFair for RawRwLock {
398 #[inline]
399 unsafe fn unlock_upgradable_fair(&self) {
400 self.deadlock_release();
401 let state = self.state.load(Ordering::Relaxed);
402 if state & PARKED_BIT == 0 {
403 if self
404 .state
405 .compare_exchange_weak(
406 state,
407 state - (ONE_READER | UPGRADABLE_BIT),
408 Ordering::Release,
409 Ordering::Relaxed,
410 )
411 .is_ok()
412 {
413 return;
414 }
415 }
416 self.unlock_upgradable_slow(false);
417 }
418
419 #[inline]
420 unsafe fn bump_upgradable(&self) {
421 if self.state.load(Ordering::Relaxed) == ONE_READER | UPGRADABLE_BIT | PARKED_BIT {
422 self.bump_upgradable_slow();
423 }
424 }
425}
426
427unsafe impl lock_api::RawRwLockUpgradeDowngrade for RawRwLock {
428 #[inline]
429 unsafe fn downgrade_upgradable(&self) {
430 let state = self.state.fetch_sub(UPGRADABLE_BIT, Ordering::Relaxed);
431
432 if state & PARKED_BIT != 0 {
434 self.downgrade_slow();
435 }
436 }
437
438 #[inline]
439 unsafe fn downgrade_to_upgradable(&self) {
440 let state = self.state.fetch_add(
441 (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
442 Ordering::Release,
443 );
444
445 if state & PARKED_BIT != 0 {
447 self.downgrade_to_upgradable_slow();
448 }
449 }
450}
451
452unsafe impl lock_api::RawRwLockUpgradeTimed for RawRwLock {
453 #[inline]
454 fn try_lock_upgradable_until(&self, timeout: Instant) -> bool {
455 let result = if self.try_lock_upgradable_fast() {
456 true
457 } else {
458 self.lock_upgradable_slow(Some(timeout))
459 };
460 if result {
461 self.deadlock_acquire();
462 }
463 result
464 }
465
466 #[inline]
467 fn try_lock_upgradable_for(&self, timeout: Duration) -> bool {
468 let result = if self.try_lock_upgradable_fast() {
469 true
470 } else {
471 self.lock_upgradable_slow(util::to_deadline(timeout))
472 };
473 if result {
474 self.deadlock_acquire();
475 }
476 result
477 }
478
479 #[inline]
480 unsafe fn try_upgrade_until(&self, timeout: Instant) -> bool {
481 let state = self.state.fetch_sub(
482 (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
483 Ordering::Relaxed,
484 );
485 if state & READERS_MASK == ONE_READER {
486 true
487 } else {
488 self.upgrade_slow(Some(timeout))
489 }
490 }
491
492 #[inline]
493 unsafe fn try_upgrade_for(&self, timeout: Duration) -> bool {
494 let state = self.state.fetch_sub(
495 (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
496 Ordering::Relaxed,
497 );
498 if state & READERS_MASK == ONE_READER {
499 true
500 } else {
501 self.upgrade_slow(util::to_deadline(timeout))
502 }
503 }
504}
505
506impl RawRwLock {
507 #[inline(always)]
508 fn try_lock_shared_fast(&self, recursive: bool) -> bool {
509 let state = self.state.load(Ordering::Relaxed);
510
511 if state & WRITER_BIT != 0 {
514 if !recursive || state & READERS_MASK == 0 {
518 return false;
519 }
520 }
521
522 if have_elision() && state == 0 {
526 self.state
527 .elision_compare_exchange_acquire(0, ONE_READER)
528 .is_ok()
529 } else if let Some(new_state) = state.checked_add(ONE_READER) {
530 self.state
531 .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed)
532 .is_ok()
533 } else {
534 false
535 }
536 }
537
538 #[cold]
539 fn try_lock_shared_slow(&self, recursive: bool) -> bool {
540 let mut state = self.state.load(Ordering::Relaxed);
541 loop {
542 if state & WRITER_BIT != 0 {
544 if !recursive || state & READERS_MASK == 0 {
545 return false;
546 }
547 }
548 if have_elision() && state == 0 {
549 match self.state.elision_compare_exchange_acquire(0, ONE_READER) {
550 Ok(_) => return true,
551 Err(x) => state = x,
552 }
553 } else {
554 match self.state.compare_exchange_weak(
555 state,
556 state
557 .checked_add(ONE_READER)
558 .expect("RwLock reader count overflow"),
559 Ordering::Acquire,
560 Ordering::Relaxed,
561 ) {
562 Ok(_) => return true,
563 Err(x) => state = x,
564 }
565 }
566 }
567 }
568
569 #[inline(always)]
570 fn try_lock_upgradable_fast(&self) -> bool {
571 let state = self.state.load(Ordering::Relaxed);
572
573 if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
576 return false;
577 }
578
579 if let Some(new_state) = state.checked_add(ONE_READER | UPGRADABLE_BIT) {
580 self.state
581 .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed)
582 .is_ok()
583 } else {
584 false
585 }
586 }
587
588 #[cold]
589 fn try_lock_upgradable_slow(&self) -> bool {
590 let mut state = self.state.load(Ordering::Relaxed);
591 loop {
592 if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
594 return false;
595 }
596
597 match self.state.compare_exchange_weak(
598 state,
599 state
600 .checked_add(ONE_READER | UPGRADABLE_BIT)
601 .expect("RwLock reader count overflow"),
602 Ordering::Acquire,
603 Ordering::Relaxed,
604 ) {
605 Ok(_) => return true,
606 Err(x) => state = x,
607 }
608 }
609 }
610
611 #[cold]
612 fn lock_exclusive_slow(&self, timeout: Option<Instant>) -> bool {
613 let try_lock = |state: &mut usize| {
614 loop {
615 if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
616 return false;
617 }
618
619 match self.state.compare_exchange_weak(
621 *state,
622 *state | WRITER_BIT,
623 Ordering::Acquire,
624 Ordering::Relaxed,
625 ) {
626 Ok(_) => return true,
627 Err(x) => *state = x,
628 }
629 }
630 };
631
632 let timed_out = !self.lock_common(
634 timeout,
635 TOKEN_EXCLUSIVE,
636 try_lock,
637 WRITER_BIT | UPGRADABLE_BIT,
638 );
639 if timed_out {
640 return false;
641 }
642
643 self.wait_for_readers(timeout, 0)
645 }
646
647 #[cold]
648 fn unlock_exclusive_slow(&self, force_fair: bool) {
649 let callback = |mut new_state, result: UnparkResult| {
651 if result.unparked_threads != 0 && (force_fair || result.be_fair) {
654 if result.have_more_threads {
655 new_state |= PARKED_BIT;
656 }
657 self.state.store(new_state, Ordering::Release);
658 TOKEN_HANDOFF
659 } else {
660 if result.have_more_threads {
662 self.state.store(PARKED_BIT, Ordering::Release);
663 } else {
664 self.state.store(0, Ordering::Release);
665 }
666 TOKEN_NORMAL
667 }
668 };
669 unsafe {
671 self.wake_parked_threads(0, callback);
672 }
673 }
674
675 #[cold]
676 fn lock_shared_slow(&self, recursive: bool, timeout: Option<Instant>) -> bool {
677 let try_lock = |state: &mut usize| {
678 let mut spinwait_shared = SpinWait::new();
679 loop {
680 if have_elision() && *state == 0 {
684 match self.state.elision_compare_exchange_acquire(0, ONE_READER) {
685 Ok(_) => return true,
686 Err(x) => *state = x,
687 }
688 }
689
690 if *state & WRITER_BIT != 0 {
692 if !recursive || *state & READERS_MASK == 0 {
693 return false;
694 }
695 }
696
697 if self
698 .state
699 .compare_exchange_weak(
700 *state,
701 state
702 .checked_add(ONE_READER)
703 .expect("RwLock reader count overflow"),
704 Ordering::Acquire,
705 Ordering::Relaxed,
706 )
707 .is_ok()
708 {
709 return true;
710 }
711
712 spinwait_shared.spin_no_yield();
716 *state = self.state.load(Ordering::Relaxed);
717 }
718 };
719 self.lock_common(timeout, TOKEN_SHARED, try_lock, WRITER_BIT)
720 }
721
722 #[cold]
723 fn unlock_shared_slow(&self) {
724 let addr = self as *const _ as usize + 1;
728 let callback = |_result: UnparkResult| {
729 self.state.fetch_and(!WRITER_PARKED_BIT, Ordering::Relaxed);
732 TOKEN_NORMAL
733 };
734 unsafe {
738 parking_lot_core::unpark_one(addr, callback);
739 }
740 }
741
742 #[cold]
743 fn lock_upgradable_slow(&self, timeout: Option<Instant>) -> bool {
744 let try_lock = |state: &mut usize| {
745 let mut spinwait_shared = SpinWait::new();
746 loop {
747 if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
748 return false;
749 }
750
751 if self
752 .state
753 .compare_exchange_weak(
754 *state,
755 state
756 .checked_add(ONE_READER | UPGRADABLE_BIT)
757 .expect("RwLock reader count overflow"),
758 Ordering::Acquire,
759 Ordering::Relaxed,
760 )
761 .is_ok()
762 {
763 return true;
764 }
765
766 spinwait_shared.spin_no_yield();
770 *state = self.state.load(Ordering::Relaxed);
771 }
772 };
773 self.lock_common(
774 timeout,
775 TOKEN_UPGRADABLE,
776 try_lock,
777 WRITER_BIT | UPGRADABLE_BIT,
778 )
779 }
780
781 #[cold]
782 fn unlock_upgradable_slow(&self, force_fair: bool) {
783 let mut state = self.state.load(Ordering::Relaxed);
785 while state & PARKED_BIT == 0 {
786 match self.state.compare_exchange_weak(
787 state,
788 state - (ONE_READER | UPGRADABLE_BIT),
789 Ordering::Release,
790 Ordering::Relaxed,
791 ) {
792 Ok(_) => return,
793 Err(x) => state = x,
794 }
795 }
796
797 let callback = |new_state, result: UnparkResult| {
799 let mut state = self.state.load(Ordering::Relaxed);
802 if force_fair || result.be_fair {
803 while let Some(mut new_state) =
806 (state - (ONE_READER | UPGRADABLE_BIT)).checked_add(new_state)
807 {
808 if result.have_more_threads {
809 new_state |= PARKED_BIT;
810 } else {
811 new_state &= !PARKED_BIT;
812 }
813 match self.state.compare_exchange_weak(
814 state,
815 new_state,
816 Ordering::Relaxed,
817 Ordering::Relaxed,
818 ) {
819 Ok(_) => return TOKEN_HANDOFF,
820 Err(x) => state = x,
821 }
822 }
823 }
824
825 loop {
827 let mut new_state = state - (ONE_READER | UPGRADABLE_BIT);
828 if result.have_more_threads {
829 new_state |= PARKED_BIT;
830 } else {
831 new_state &= !PARKED_BIT;
832 }
833 match self.state.compare_exchange_weak(
834 state,
835 new_state,
836 Ordering::Relaxed,
837 Ordering::Relaxed,
838 ) {
839 Ok(_) => return TOKEN_NORMAL,
840 Err(x) => state = x,
841 }
842 }
843 };
844 unsafe {
846 self.wake_parked_threads(0, callback);
847 }
848 }
849
850 #[cold]
851 fn try_upgrade_slow(&self) -> bool {
852 let mut state = self.state.load(Ordering::Relaxed);
853 loop {
854 if state & READERS_MASK != ONE_READER {
855 return false;
856 }
857 match self.state.compare_exchange_weak(
858 state,
859 state - (ONE_READER | UPGRADABLE_BIT) + WRITER_BIT,
860 Ordering::Relaxed,
861 Ordering::Relaxed,
862 ) {
863 Ok(_) => return true,
864 Err(x) => state = x,
865 }
866 }
867 }
868
869 #[cold]
870 fn upgrade_slow(&self, timeout: Option<Instant>) -> bool {
871 self.wait_for_readers(timeout, ONE_READER | UPGRADABLE_BIT)
872 }
873
874 #[cold]
875 fn downgrade_slow(&self) {
876 let callback = |_, result: UnparkResult| {
878 if !result.have_more_threads {
880 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
881 }
882 TOKEN_NORMAL
883 };
884 unsafe {
886 self.wake_parked_threads(ONE_READER, callback);
887 }
888 }
889
890 #[cold]
891 fn downgrade_to_upgradable_slow(&self) {
892 let callback = |_, result: UnparkResult| {
894 if !result.have_more_threads {
896 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
897 }
898 TOKEN_NORMAL
899 };
900 unsafe {
902 self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback);
903 }
904 }
905
906 #[cold]
907 unsafe fn bump_shared_slow(&self) {
908 self.unlock_shared();
909 self.lock_shared();
910 }
911
912 #[cold]
913 fn bump_exclusive_slow(&self) {
914 self.deadlock_release();
915 self.unlock_exclusive_slow(true);
916 self.lock_exclusive();
917 }
918
919 #[cold]
920 fn bump_upgradable_slow(&self) {
921 self.deadlock_release();
922 self.unlock_upgradable_slow(true);
923 self.lock_upgradable();
924 }
925
926 #[inline]
935 unsafe fn wake_parked_threads(
936 &self,
937 new_state: usize,
938 callback: impl FnOnce(usize, UnparkResult) -> UnparkToken,
939 ) {
940 let new_state = Cell::new(new_state);
944 let addr = self as *const _ as usize;
945 let filter = |ParkToken(token)| {
946 let s = new_state.get();
947
948 if s & WRITER_BIT != 0 {
950 return FilterOp::Stop;
951 }
952
953 if token & (UPGRADABLE_BIT | WRITER_BIT) != 0 && s & UPGRADABLE_BIT != 0 {
955 FilterOp::Skip
958 } else {
959 new_state.set(s + token);
960 FilterOp::Unpark
961 }
962 };
963 let callback = |result| callback(new_state.get(), result);
964 parking_lot_core::unpark_filter(addr, filter, callback);
969 }
970
971 #[inline]
974 fn wait_for_readers(&self, timeout: Option<Instant>, prev_value: usize) -> bool {
975 let mut spinwait = SpinWait::new();
978 let mut state = self.state.load(Ordering::Acquire);
979 while state & READERS_MASK != 0 {
980 if spinwait.spin() {
982 state = self.state.load(Ordering::Acquire);
983 continue;
984 }
985
986 if state & WRITER_PARKED_BIT == 0 {
988 if let Err(x) = self.state.compare_exchange_weak(
989 state,
990 state | WRITER_PARKED_BIT,
991 Ordering::Relaxed,
992 Ordering::Relaxed,
993 ) {
994 state = x;
995 continue;
996 }
997 }
998
999 let addr = self as *const _ as usize + 1;
1002 let validate = || {
1003 let state = self.state.load(Ordering::Relaxed);
1004 state & READERS_MASK != 0 && state & WRITER_PARKED_BIT != 0
1005 };
1006 let before_sleep = || {};
1007 let timed_out = |_, _| {};
1008 let park_result = unsafe {
1013 parking_lot_core::park(
1014 addr,
1015 validate,
1016 before_sleep,
1017 timed_out,
1018 TOKEN_EXCLUSIVE,
1019 timeout,
1020 )
1021 };
1022 match park_result {
1023 ParkResult::Unparked(_) | ParkResult::Invalid => {
1027 state = self.state.load(Ordering::Acquire);
1028 continue;
1029 }
1030
1031 ParkResult::TimedOut => {
1033 let state = self.state.fetch_add(
1037 prev_value.wrapping_sub(WRITER_BIT | WRITER_PARKED_BIT),
1038 Ordering::Relaxed,
1039 );
1040 if state & PARKED_BIT != 0 {
1041 let callback = |_, result: UnparkResult| {
1042 if !result.have_more_threads {
1044 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
1045 }
1046 TOKEN_NORMAL
1047 };
1048 unsafe {
1050 self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback);
1051 }
1052 }
1053 return false;
1054 }
1055 }
1056 }
1057 true
1058 }
1059
1060 #[inline]
1062 fn lock_common(
1063 &self,
1064 timeout: Option<Instant>,
1065 token: ParkToken,
1066 mut try_lock: impl FnMut(&mut usize) -> bool,
1067 validate_flags: usize,
1068 ) -> bool {
1069 let mut spinwait = SpinWait::new();
1070 let mut state = self.state.load(Ordering::Relaxed);
1071 loop {
1072 if try_lock(&mut state) {
1074 return true;
1075 }
1076
1077 if state & (PARKED_BIT | WRITER_PARKED_BIT) == 0 && spinwait.spin() {
1079 state = self.state.load(Ordering::Relaxed);
1080 continue;
1081 }
1082
1083 if state & PARKED_BIT == 0 {
1085 if let Err(x) = self.state.compare_exchange_weak(
1086 state,
1087 state | PARKED_BIT,
1088 Ordering::Relaxed,
1089 Ordering::Relaxed,
1090 ) {
1091 state = x;
1092 continue;
1093 }
1094 }
1095
1096 let addr = self as *const _ as usize;
1098 let validate = || {
1099 let state = self.state.load(Ordering::Relaxed);
1100 state & PARKED_BIT != 0 && (state & validate_flags != 0)
1101 };
1102 let before_sleep = || {};
1103 let timed_out = |_, was_last_thread| {
1104 if was_last_thread {
1106 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
1107 }
1108 };
1109
1110 let park_result = unsafe {
1115 parking_lot_core::park(addr, validate, before_sleep, timed_out, token, timeout)
1116 };
1117 match park_result {
1118 ParkResult::Unparked(TOKEN_HANDOFF) => return true,
1121
1122 ParkResult::Unparked(_) => (),
1124
1125 ParkResult::Invalid => (),
1127
1128 ParkResult::TimedOut => return false,
1130 }
1131
1132 spinwait.reset();
1134 state = self.state.load(Ordering::Relaxed);
1135 }
1136 }
1137
1138 #[inline]
1139 fn deadlock_acquire(&self) {
1140 unsafe { deadlock::acquire_resource(self as *const _ as usize) };
1141 unsafe { deadlock::acquire_resource(self as *const _ as usize + 1) };
1142 }
1143
1144 #[inline]
1145 fn deadlock_release(&self) {
1146 unsafe { deadlock::release_resource(self as *const _ as usize) };
1147 unsafe { deadlock::release_resource(self as *const _ as usize + 1) };
1148 }
1149}