1use crate::mutex::MutexGuard;
9use crate::raw_mutex::{RawMutex, TOKEN_HANDOFF, TOKEN_NORMAL};
10use crate::{deadlock, util};
11use core::{
12 fmt, ptr,
13 sync::atomic::{AtomicPtr, Ordering},
14};
15use lock_api::RawMutex as RawMutex_;
16use parking_lot_core::{self, ParkResult, RequeueOp, UnparkResult, DEFAULT_PARK_TOKEN};
17use std::time::{Duration, Instant};
18
19#[derive(Debug, PartialEq, Eq, Copy, Clone)]
22pub struct WaitTimeoutResult(bool);
23
24impl WaitTimeoutResult {
25 #[inline]
27 pub fn timed_out(self) -> bool {
28 self.0
29 }
30}
31
32pub struct Condvar {
90 state: AtomicPtr<RawMutex>,
91}
92
93impl Condvar {
94 #[inline]
97 pub const fn new() -> Condvar {
98 Condvar {
99 state: AtomicPtr::new(ptr::null_mut()),
100 }
101 }
102
103 #[inline]
127 pub fn notify_one(&self) -> bool {
128 let state = self.state.load(Ordering::Relaxed);
130 if state.is_null() {
131 return false;
132 }
133
134 self.notify_one_slow(state)
135 }
136
137 #[cold]
138 fn notify_one_slow(&self, mutex: *mut RawMutex) -> bool {
139 unsafe {
140 let from = self as *const _ as usize;
142 let to = mutex as usize;
143 let validate = || {
144 if self.state.load(Ordering::Relaxed) != mutex {
150 return RequeueOp::Abort;
151 }
152
153 if (*mutex).mark_parked_if_locked() {
160 RequeueOp::RequeueOne
161 } else {
162 RequeueOp::UnparkOne
163 }
164 };
165 let callback = |_op, result: UnparkResult| {
166 if !result.have_more_threads {
168 self.state.store(ptr::null_mut(), Ordering::Relaxed);
169 }
170 TOKEN_NORMAL
171 };
172 let res = parking_lot_core::unpark_requeue(from, to, validate, callback);
173
174 res.unparked_threads + res.requeued_threads != 0
175 }
176 }
177
178 #[inline]
188 pub fn notify_all(&self) -> usize {
189 let state = self.state.load(Ordering::Relaxed);
191 if state.is_null() {
192 return 0;
193 }
194
195 self.notify_all_slow(state)
196 }
197
198 #[cold]
199 fn notify_all_slow(&self, mutex: *mut RawMutex) -> usize {
200 unsafe {
201 let from = self as *const _ as usize;
203 let to = mutex as usize;
204 let validate = || {
205 if self.state.load(Ordering::Relaxed) != mutex {
211 return RequeueOp::Abort;
212 }
213
214 self.state.store(ptr::null_mut(), Ordering::Relaxed);
217
218 if (*mutex).mark_parked_if_locked() {
225 RequeueOp::RequeueAll
226 } else {
227 RequeueOp::UnparkOneRequeueRest
228 }
229 };
230 let callback = |op, result: UnparkResult| {
231 if op == RequeueOp::UnparkOneRequeueRest && result.requeued_threads != 0 {
234 (*mutex).mark_parked();
235 }
236 TOKEN_NORMAL
237 };
238 let res = parking_lot_core::unpark_requeue(from, to, validate, callback);
239
240 res.unparked_threads + res.requeued_threads
241 }
242 }
243
244 #[inline]
258 pub fn wait<T: ?Sized>(&self, mutex_guard: &mut MutexGuard<'_, T>) {
259 self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, None);
260 }
261
262 #[inline]
286 pub fn wait_until<T: ?Sized>(
287 &self,
288 mutex_guard: &mut MutexGuard<'_, T>,
289 timeout: Instant,
290 ) -> WaitTimeoutResult {
291 self.wait_until_internal(
292 unsafe { MutexGuard::mutex(mutex_guard).raw() },
293 Some(timeout),
294 )
295 }
296
297 fn wait_until_internal(&self, mutex: &RawMutex, timeout: Option<Instant>) -> WaitTimeoutResult {
300 unsafe {
301 let result;
302 let mut bad_mutex = false;
303 let mut requeued = false;
304 {
305 let addr = self as *const _ as usize;
306 let lock_addr = mutex as *const _ as *mut _;
307 let validate = || {
308 let state = self.state.load(Ordering::Relaxed);
312 if state.is_null() {
313 self.state.store(lock_addr, Ordering::Relaxed);
314 } else if state != lock_addr {
315 bad_mutex = true;
316 return false;
317 }
318 true
319 };
320 let before_sleep = || {
321 mutex.unlock();
323 };
324 let timed_out = |k, was_last_thread| {
325 requeued = k != addr;
329
330 if !requeued && was_last_thread {
334 self.state.store(ptr::null_mut(), Ordering::Relaxed);
335 }
336 };
337 result = parking_lot_core::park(
338 addr,
339 validate,
340 before_sleep,
341 timed_out,
342 DEFAULT_PARK_TOKEN,
343 timeout,
344 );
345 }
346
347 if bad_mutex {
351 panic!("attempted to use a condition variable with more than one mutex");
352 }
353
354 if result == ParkResult::Unparked(TOKEN_HANDOFF) {
356 deadlock::acquire_resource(mutex as *const _ as usize);
357 } else {
358 mutex.lock();
359 }
360
361 WaitTimeoutResult(!(result.is_unparked() || requeued))
362 }
363 }
364
365 #[inline]
384 pub fn wait_for<T: ?Sized>(
385 &self,
386 mutex_guard: &mut MutexGuard<'_, T>,
387 timeout: Duration,
388 ) -> WaitTimeoutResult {
389 let deadline = util::to_deadline(timeout);
390 self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, deadline)
391 }
392}
393
394impl Default for Condvar {
395 #[inline]
396 fn default() -> Condvar {
397 Condvar::new()
398 }
399}
400
401impl fmt::Debug for Condvar {
402 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
403 f.pad("Condvar { .. }")
404 }
405}
406
407#[cfg(test)]
408mod tests {
409 use crate::{Condvar, Mutex, MutexGuard};
410 use std::sync::mpsc::channel;
411 use std::sync::Arc;
412 use std::thread;
413 use std::time::Duration;
414 use std::time::Instant;
415
416 #[test]
417 fn smoke() {
418 let c = Condvar::new();
419 c.notify_one();
420 c.notify_all();
421 }
422
423 #[test]
424 fn notify_one() {
425 let m = Arc::new(Mutex::new(()));
426 let m2 = m.clone();
427 let c = Arc::new(Condvar::new());
428 let c2 = c.clone();
429
430 let mut g = m.lock();
431 let _t = thread::spawn(move || {
432 let _g = m2.lock();
433 c2.notify_one();
434 });
435 c.wait(&mut g);
436 }
437
438 #[test]
439 fn notify_all() {
440 const N: usize = 10;
441
442 let data = Arc::new((Mutex::new(0), Condvar::new()));
443 let (tx, rx) = channel();
444 for _ in 0..N {
445 let data = data.clone();
446 let tx = tx.clone();
447 thread::spawn(move || {
448 let &(ref lock, ref cond) = &*data;
449 let mut cnt = lock.lock();
450 *cnt += 1;
451 if *cnt == N {
452 tx.send(()).unwrap();
453 }
454 while *cnt != 0 {
455 cond.wait(&mut cnt);
456 }
457 tx.send(()).unwrap();
458 });
459 }
460 drop(tx);
461
462 let &(ref lock, ref cond) = &*data;
463 rx.recv().unwrap();
464 let mut cnt = lock.lock();
465 *cnt = 0;
466 cond.notify_all();
467 drop(cnt);
468
469 for _ in 0..N {
470 rx.recv().unwrap();
471 }
472 }
473
474 #[test]
475 fn notify_one_return_true() {
476 let m = Arc::new(Mutex::new(()));
477 let m2 = m.clone();
478 let c = Arc::new(Condvar::new());
479 let c2 = c.clone();
480
481 let mut g = m.lock();
482 let _t = thread::spawn(move || {
483 let _g = m2.lock();
484 assert!(c2.notify_one());
485 });
486 c.wait(&mut g);
487 }
488
489 #[test]
490 fn notify_one_return_false() {
491 let m = Arc::new(Mutex::new(()));
492 let c = Arc::new(Condvar::new());
493
494 let _t = thread::spawn(move || {
495 let _g = m.lock();
496 assert!(!c.notify_one());
497 });
498 }
499
500 #[test]
501 fn notify_all_return() {
502 const N: usize = 10;
503
504 let data = Arc::new((Mutex::new(0), Condvar::new()));
505 let (tx, rx) = channel();
506 for _ in 0..N {
507 let data = data.clone();
508 let tx = tx.clone();
509 thread::spawn(move || {
510 let &(ref lock, ref cond) = &*data;
511 let mut cnt = lock.lock();
512 *cnt += 1;
513 if *cnt == N {
514 tx.send(()).unwrap();
515 }
516 while *cnt != 0 {
517 cond.wait(&mut cnt);
518 }
519 tx.send(()).unwrap();
520 });
521 }
522 drop(tx);
523
524 let &(ref lock, ref cond) = &*data;
525 rx.recv().unwrap();
526 let mut cnt = lock.lock();
527 *cnt = 0;
528 assert_eq!(cond.notify_all(), N);
529 drop(cnt);
530
531 for _ in 0..N {
532 rx.recv().unwrap();
533 }
534
535 assert_eq!(cond.notify_all(), 0);
536 }
537
538 #[test]
539 fn wait_for() {
540 let m = Arc::new(Mutex::new(()));
541 let m2 = m.clone();
542 let c = Arc::new(Condvar::new());
543 let c2 = c.clone();
544
545 let mut g = m.lock();
546 let no_timeout = c.wait_for(&mut g, Duration::from_millis(1));
547 assert!(no_timeout.timed_out());
548
549 let _t = thread::spawn(move || {
550 let _g = m2.lock();
551 c2.notify_one();
552 });
553 let timeout_res = c.wait_for(&mut g, Duration::from_secs(u64::max_value()));
554 assert!(!timeout_res.timed_out());
555
556 drop(g);
557 }
558
559 #[test]
560 fn wait_until() {
561 let m = Arc::new(Mutex::new(()));
562 let m2 = m.clone();
563 let c = Arc::new(Condvar::new());
564 let c2 = c.clone();
565
566 let mut g = m.lock();
567 let no_timeout = c.wait_until(&mut g, Instant::now() + Duration::from_millis(1));
568 assert!(no_timeout.timed_out());
569 let _t = thread::spawn(move || {
570 let _g = m2.lock();
571 c2.notify_one();
572 });
573 let timeout_res = c.wait_until(
574 &mut g,
575 Instant::now() + Duration::from_millis(u32::max_value() as u64),
576 );
577 assert!(!timeout_res.timed_out());
578 drop(g);
579 }
580
581 #[test]
582 #[should_panic]
583 fn two_mutexes() {
584 let m = Arc::new(Mutex::new(()));
585 let m2 = m.clone();
586 let m3 = Arc::new(Mutex::new(()));
587 let c = Arc::new(Condvar::new());
588 let c2 = c.clone();
589
590 struct PanicGuard<'a>(&'a Condvar);
592 impl<'a> Drop for PanicGuard<'a> {
593 fn drop(&mut self) {
594 self.0.notify_one();
595 }
596 }
597
598 let (tx, rx) = channel();
599 let g = m.lock();
600 let _t = thread::spawn(move || {
601 let mut g = m2.lock();
602 tx.send(()).unwrap();
603 c2.wait(&mut g);
604 });
605 drop(g);
606 rx.recv().unwrap();
607 let _g = m.lock();
608 let _guard = PanicGuard(&*c);
609 c.wait(&mut m3.lock());
610 }
611
612 #[test]
613 fn two_mutexes_disjoint() {
614 let m = Arc::new(Mutex::new(()));
615 let m2 = m.clone();
616 let m3 = Arc::new(Mutex::new(()));
617 let c = Arc::new(Condvar::new());
618 let c2 = c.clone();
619
620 let mut g = m.lock();
621 let _t = thread::spawn(move || {
622 let _g = m2.lock();
623 c2.notify_one();
624 });
625 c.wait(&mut g);
626 drop(g);
627
628 let _ = c.wait_for(&mut m3.lock(), Duration::from_millis(1));
629 }
630
631 #[test]
632 fn test_debug_condvar() {
633 let c = Condvar::new();
634 assert_eq!(format!("{:?}", c), "Condvar { .. }");
635 }
636
637 #[test]
638 fn test_condvar_requeue() {
639 let m = Arc::new(Mutex::new(()));
640 let m2 = m.clone();
641 let c = Arc::new(Condvar::new());
642 let c2 = c.clone();
643 let t = thread::spawn(move || {
644 let mut g = m2.lock();
645 c2.wait(&mut g);
646 });
647
648 let mut g = m.lock();
649 while !c.notify_one() {
650 MutexGuard::bump(&mut g);
652 thread::yield_now();
655 }
656 drop(g);
658 t.join().unwrap();
659 }
660
661 #[test]
662 fn test_issue_129() {
663 let locks = Arc::new((Mutex::new(()), Condvar::new()));
664
665 let (tx, rx) = channel();
666 for _ in 0..4 {
667 let locks = locks.clone();
668 let tx = tx.clone();
669 thread::spawn(move || {
670 let mut guard = locks.0.lock();
671 locks.1.wait(&mut guard);
672 locks.1.wait_for(&mut guard, Duration::from_millis(1));
673 locks.1.notify_one();
674 tx.send(()).unwrap();
675 });
676 }
677
678 thread::sleep(Duration::from_millis(100));
679 locks.1.notify_one();
680
681 for _ in 0..4 {
682 assert_eq!(rx.recv_timeout(Duration::from_millis(500)), Ok(()));
683 }
684 }
685}
686
687#[cfg(test)]
690mod webkit_queue_test {
691 use crate::{Condvar, Mutex, MutexGuard};
692 use std::{collections::VecDeque, sync::Arc, thread, time::Duration};
693
694 #[derive(Clone, Copy)]
695 enum Timeout {
696 Bounded(Duration),
697 Forever,
698 }
699
700 #[derive(Clone, Copy)]
701 enum NotifyStyle {
702 One,
703 All,
704 }
705
706 struct Queue {
707 items: VecDeque<usize>,
708 should_continue: bool,
709 }
710
711 impl Queue {
712 fn new() -> Self {
713 Self {
714 items: VecDeque::new(),
715 should_continue: true,
716 }
717 }
718 }
719
720 fn wait<T: ?Sized>(
721 condition: &Condvar,
722 lock: &mut MutexGuard<'_, T>,
723 predicate: impl Fn(&mut MutexGuard<'_, T>) -> bool,
724 timeout: &Timeout,
725 ) {
726 while !predicate(lock) {
727 match timeout {
728 Timeout::Forever => condition.wait(lock),
729 Timeout::Bounded(bound) => {
730 condition.wait_for(lock, *bound);
731 }
732 }
733 }
734 }
735
736 fn notify(style: NotifyStyle, condition: &Condvar, should_notify: bool) {
737 match style {
738 NotifyStyle::One => {
739 condition.notify_one();
740 }
741 NotifyStyle::All => {
742 if should_notify {
743 condition.notify_all();
744 }
745 }
746 }
747 }
748
749 fn run_queue_test(
750 num_producers: usize,
751 num_consumers: usize,
752 max_queue_size: usize,
753 messages_per_producer: usize,
754 notify_style: NotifyStyle,
755 timeout: Timeout,
756 delay: Duration,
757 ) {
758 let input_queue = Arc::new(Mutex::new(Queue::new()));
759 let empty_condition = Arc::new(Condvar::new());
760 let full_condition = Arc::new(Condvar::new());
761
762 let output_vec = Arc::new(Mutex::new(vec![]));
763
764 let consumers = (0..num_consumers)
765 .map(|_| {
766 consumer_thread(
767 input_queue.clone(),
768 empty_condition.clone(),
769 full_condition.clone(),
770 timeout,
771 notify_style,
772 output_vec.clone(),
773 max_queue_size,
774 )
775 })
776 .collect::<Vec<_>>();
777 let producers = (0..num_producers)
778 .map(|_| {
779 producer_thread(
780 messages_per_producer,
781 input_queue.clone(),
782 empty_condition.clone(),
783 full_condition.clone(),
784 timeout,
785 notify_style,
786 max_queue_size,
787 )
788 })
789 .collect::<Vec<_>>();
790
791 thread::sleep(delay);
792
793 for producer in producers.into_iter() {
794 producer.join().expect("Producer thread panicked");
795 }
796
797 {
798 let mut input_queue = input_queue.lock();
799 input_queue.should_continue = false;
800 }
801 empty_condition.notify_all();
802
803 for consumer in consumers.into_iter() {
804 consumer.join().expect("Consumer thread panicked");
805 }
806
807 let mut output_vec = output_vec.lock();
808 assert_eq!(output_vec.len(), num_producers * messages_per_producer);
809 output_vec.sort();
810 for msg_idx in 0..messages_per_producer {
811 for producer_idx in 0..num_producers {
812 assert_eq!(msg_idx, output_vec[msg_idx * num_producers + producer_idx]);
813 }
814 }
815 }
816
817 fn consumer_thread(
818 input_queue: Arc<Mutex<Queue>>,
819 empty_condition: Arc<Condvar>,
820 full_condition: Arc<Condvar>,
821 timeout: Timeout,
822 notify_style: NotifyStyle,
823 output_queue: Arc<Mutex<Vec<usize>>>,
824 max_queue_size: usize,
825 ) -> thread::JoinHandle<()> {
826 thread::spawn(move || loop {
827 let (should_notify, result) = {
828 let mut queue = input_queue.lock();
829 wait(
830 &*empty_condition,
831 &mut queue,
832 |state| -> bool { !state.items.is_empty() || !state.should_continue },
833 &timeout,
834 );
835 if queue.items.is_empty() && !queue.should_continue {
836 return;
837 }
838 let should_notify = queue.items.len() == max_queue_size;
839 let result = queue.items.pop_front();
840 std::mem::drop(queue);
841 (should_notify, result)
842 };
843 notify(notify_style, &*full_condition, should_notify);
844
845 if let Some(result) = result {
846 output_queue.lock().push(result);
847 }
848 })
849 }
850
851 fn producer_thread(
852 num_messages: usize,
853 queue: Arc<Mutex<Queue>>,
854 empty_condition: Arc<Condvar>,
855 full_condition: Arc<Condvar>,
856 timeout: Timeout,
857 notify_style: NotifyStyle,
858 max_queue_size: usize,
859 ) -> thread::JoinHandle<()> {
860 thread::spawn(move || {
861 for message in 0..num_messages {
862 let should_notify = {
863 let mut queue = queue.lock();
864 wait(
865 &*full_condition,
866 &mut queue,
867 |state| state.items.len() < max_queue_size,
868 &timeout,
869 );
870 let should_notify = queue.items.is_empty();
871 queue.items.push_back(message);
872 std::mem::drop(queue);
873 should_notify
874 };
875 notify(notify_style, &*empty_condition, should_notify);
876 }
877 })
878 }
879
880 macro_rules! run_queue_tests {
881 ( $( $name:ident(
882 num_producers: $num_producers:expr,
883 num_consumers: $num_consumers:expr,
884 max_queue_size: $max_queue_size:expr,
885 messages_per_producer: $messages_per_producer:expr,
886 notification_style: $notification_style:expr,
887 timeout: $timeout:expr,
888 delay_seconds: $delay_seconds:expr);
889 )* ) => {
890 $(#[test]
891 fn $name() {
892 let delay = Duration::from_secs($delay_seconds);
893 run_queue_test(
894 $num_producers,
895 $num_consumers,
896 $max_queue_size,
897 $messages_per_producer,
898 $notification_style,
899 $timeout,
900 delay,
901 );
902 })*
903 };
904 }
905
906 run_queue_tests! {
907 sanity_check_queue(
908 num_producers: 1,
909 num_consumers: 1,
910 max_queue_size: 1,
911 messages_per_producer: 100_000,
912 notification_style: NotifyStyle::All,
913 timeout: Timeout::Bounded(Duration::from_secs(1)),
914 delay_seconds: 0
915 );
916 sanity_check_queue_timeout(
917 num_producers: 1,
918 num_consumers: 1,
919 max_queue_size: 1,
920 messages_per_producer: 100_000,
921 notification_style: NotifyStyle::All,
922 timeout: Timeout::Forever,
923 delay_seconds: 0
924 );
925 new_test_without_timeout_5(
926 num_producers: 1,
927 num_consumers: 5,
928 max_queue_size: 1,
929 messages_per_producer: 100_000,
930 notification_style: NotifyStyle::All,
931 timeout: Timeout::Forever,
932 delay_seconds: 0
933 );
934 one_producer_one_consumer_one_slot(
935 num_producers: 1,
936 num_consumers: 1,
937 max_queue_size: 1,
938 messages_per_producer: 100_000,
939 notification_style: NotifyStyle::All,
940 timeout: Timeout::Forever,
941 delay_seconds: 0
942 );
943 one_producer_one_consumer_one_slot_timeout(
944 num_producers: 1,
945 num_consumers: 1,
946 max_queue_size: 1,
947 messages_per_producer: 100_000,
948 notification_style: NotifyStyle::All,
949 timeout: Timeout::Forever,
950 delay_seconds: 1
951 );
952 one_producer_one_consumer_hundred_slots(
953 num_producers: 1,
954 num_consumers: 1,
955 max_queue_size: 100,
956 messages_per_producer: 1_000_000,
957 notification_style: NotifyStyle::All,
958 timeout: Timeout::Forever,
959 delay_seconds: 0
960 );
961 ten_producers_one_consumer_one_slot(
962 num_producers: 10,
963 num_consumers: 1,
964 max_queue_size: 1,
965 messages_per_producer: 10000,
966 notification_style: NotifyStyle::All,
967 timeout: Timeout::Forever,
968 delay_seconds: 0
969 );
970 ten_producers_one_consumer_hundred_slots_notify_all(
971 num_producers: 10,
972 num_consumers: 1,
973 max_queue_size: 100,
974 messages_per_producer: 10000,
975 notification_style: NotifyStyle::All,
976 timeout: Timeout::Forever,
977 delay_seconds: 0
978 );
979 ten_producers_one_consumer_hundred_slots_notify_one(
980 num_producers: 10,
981 num_consumers: 1,
982 max_queue_size: 100,
983 messages_per_producer: 10000,
984 notification_style: NotifyStyle::One,
985 timeout: Timeout::Forever,
986 delay_seconds: 0
987 );
988 one_producer_ten_consumers_one_slot(
989 num_producers: 1,
990 num_consumers: 10,
991 max_queue_size: 1,
992 messages_per_producer: 10000,
993 notification_style: NotifyStyle::All,
994 timeout: Timeout::Forever,
995 delay_seconds: 0
996 );
997 one_producer_ten_consumers_hundred_slots_notify_all(
998 num_producers: 1,
999 num_consumers: 10,
1000 max_queue_size: 100,
1001 messages_per_producer: 100_000,
1002 notification_style: NotifyStyle::All,
1003 timeout: Timeout::Forever,
1004 delay_seconds: 0
1005 );
1006 one_producer_ten_consumers_hundred_slots_notify_one(
1007 num_producers: 1,
1008 num_consumers: 10,
1009 max_queue_size: 100,
1010 messages_per_producer: 100_000,
1011 notification_style: NotifyStyle::One,
1012 timeout: Timeout::Forever,
1013 delay_seconds: 0
1014 );
1015 ten_producers_ten_consumers_one_slot(
1016 num_producers: 10,
1017 num_consumers: 10,
1018 max_queue_size: 1,
1019 messages_per_producer: 50000,
1020 notification_style: NotifyStyle::All,
1021 timeout: Timeout::Forever,
1022 delay_seconds: 0
1023 );
1024 ten_producers_ten_consumers_hundred_slots_notify_all(
1025 num_producers: 10,
1026 num_consumers: 10,
1027 max_queue_size: 100,
1028 messages_per_producer: 50000,
1029 notification_style: NotifyStyle::All,
1030 timeout: Timeout::Forever,
1031 delay_seconds: 0
1032 );
1033 ten_producers_ten_consumers_hundred_slots_notify_one(
1034 num_producers: 10,
1035 num_consumers: 10,
1036 max_queue_size: 100,
1037 messages_per_producer: 50000,
1038 notification_style: NotifyStyle::One,
1039 timeout: Timeout::Forever,
1040 delay_seconds: 0
1041 );
1042 }
1043}