parking_lot/
condvar.rs

1// Copyright 2016 Amanieu d'Antras
2//
3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. This file may not be
6// copied, modified, or distributed except according to those terms.
7
8use crate::mutex::MutexGuard;
9use crate::raw_mutex::{RawMutex, TOKEN_HANDOFF, TOKEN_NORMAL};
10use crate::{deadlock, util};
11use core::{
12    fmt, ptr,
13    sync::atomic::{AtomicPtr, Ordering},
14};
15use lock_api::RawMutex as RawMutex_;
16use parking_lot_core::{self, ParkResult, RequeueOp, UnparkResult, DEFAULT_PARK_TOKEN};
17use std::time::{Duration, Instant};
18
19/// A type indicating whether a timed wait on a condition variable returned
20/// due to a time out or not.
21#[derive(Debug, PartialEq, Eq, Copy, Clone)]
22pub struct WaitTimeoutResult(bool);
23
24impl WaitTimeoutResult {
25    /// Returns whether the wait was known to have timed out.
26    #[inline]
27    pub fn timed_out(self) -> bool {
28        self.0
29    }
30}
31
32/// A Condition Variable
33///
34/// Condition variables represent the ability to block a thread such that it
35/// consumes no CPU time while waiting for an event to occur. Condition
36/// variables are typically associated with a boolean predicate (a condition)
37/// and a mutex. The predicate is always verified inside of the mutex before
38/// determining that thread must block.
39///
40/// Note that this module places one additional restriction over the system
41/// condition variables: each condvar can be used with only one mutex at a
42/// time. Any attempt to use multiple mutexes on the same condition variable
43/// simultaneously will result in a runtime panic. However it is possible to
44/// switch to a different mutex if there are no threads currently waiting on
45/// the condition variable.
46///
47/// # Differences from the standard library `Condvar`
48///
49/// - No spurious wakeups: A wait will only return a non-timeout result if it
50///   was woken up by `notify_one` or `notify_all`.
51/// - `Condvar::notify_all` will only wake up a single thread, the rest are
52///   requeued to wait for the `Mutex` to be unlocked by the thread that was
53///   woken up.
54/// - Only requires 1 word of space, whereas the standard library boxes the
55///   `Condvar` due to platform limitations.
56/// - Can be statically constructed (requires the `const_fn` nightly feature).
57/// - Does not require any drop glue when dropped.
58/// - Inline fast path for the uncontended case.
59///
60/// # Examples
61///
62/// ```
63/// use parking_lot::{Mutex, Condvar};
64/// use std::sync::Arc;
65/// use std::thread;
66///
67/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
68/// let pair2 = pair.clone();
69///
70/// // Inside of our lock, spawn a new thread, and then wait for it to start
71/// thread::spawn(move|| {
72///     let &(ref lock, ref cvar) = &*pair2;
73///     let mut started = lock.lock();
74///     *started = true;
75///     cvar.notify_one();
76/// });
77///
78/// // wait for the thread to start up
79/// let &(ref lock, ref cvar) = &*pair;
80/// let mut started = lock.lock();
81/// if !*started {
82///     cvar.wait(&mut started);
83/// }
84/// // Note that we used an if instead of a while loop above. This is only
85/// // possible because parking_lot's Condvar will never spuriously wake up.
86/// // This means that wait() will only return after notify_one or notify_all is
87/// // called.
88/// ```
89pub struct Condvar {
90    state: AtomicPtr<RawMutex>,
91}
92
93impl Condvar {
94    /// Creates a new condition variable which is ready to be waited on and
95    /// notified.
96    #[inline]
97    pub const fn new() -> Condvar {
98        Condvar {
99            state: AtomicPtr::new(ptr::null_mut()),
100        }
101    }
102
103    /// Wakes up one blocked thread on this condvar.
104    ///
105    /// Returns whether a thread was woken up.
106    ///
107    /// If there is a blocked thread on this condition variable, then it will
108    /// be woken up from its call to `wait` or `wait_timeout`. Calls to
109    /// `notify_one` are not buffered in any way.
110    ///
111    /// To wake up all threads, see `notify_all()`.
112    ///
113    /// # Examples
114    ///
115    /// ```
116    /// use parking_lot::Condvar;
117    ///
118    /// let condvar = Condvar::new();
119    ///
120    /// // do something with condvar, share it with other threads
121    ///
122    /// if !condvar.notify_one() {
123    ///     println!("Nobody was listening for this.");
124    /// }
125    /// ```
126    #[inline]
127    pub fn notify_one(&self) -> bool {
128        // Nothing to do if there are no waiting threads
129        let state = self.state.load(Ordering::Relaxed);
130        if state.is_null() {
131            return false;
132        }
133
134        self.notify_one_slow(state)
135    }
136
137    #[cold]
138    fn notify_one_slow(&self, mutex: *mut RawMutex) -> bool {
139        unsafe {
140            // Unpark one thread and requeue the rest onto the mutex
141            let from = self as *const _ as usize;
142            let to = mutex as usize;
143            let validate = || {
144                // Make sure that our atomic state still points to the same
145                // mutex. If not then it means that all threads on the current
146                // mutex were woken up and a new waiting thread switched to a
147                // different mutex. In that case we can get away with doing
148                // nothing.
149                if self.state.load(Ordering::Relaxed) != mutex {
150                    return RequeueOp::Abort;
151                }
152
153                // Unpark one thread if the mutex is unlocked, otherwise just
154                // requeue everything to the mutex. This is safe to do here
155                // since unlocking the mutex when the parked bit is set requires
156                // locking the queue. There is the possibility of a race if the
157                // mutex gets locked after we check, but that doesn't matter in
158                // this case.
159                if (*mutex).mark_parked_if_locked() {
160                    RequeueOp::RequeueOne
161                } else {
162                    RequeueOp::UnparkOne
163                }
164            };
165            let callback = |_op, result: UnparkResult| {
166                // Clear our state if there are no more waiting threads
167                if !result.have_more_threads {
168                    self.state.store(ptr::null_mut(), Ordering::Relaxed);
169                }
170                TOKEN_NORMAL
171            };
172            let res = parking_lot_core::unpark_requeue(from, to, validate, callback);
173
174            res.unparked_threads + res.requeued_threads != 0
175        }
176    }
177
178    /// Wakes up all blocked threads on this condvar.
179    ///
180    /// Returns the number of threads woken up.
181    ///
182    /// This method will ensure that any current waiters on the condition
183    /// variable are awoken. Calls to `notify_all()` are not buffered in any
184    /// way.
185    ///
186    /// To wake up only one thread, see `notify_one()`.
187    #[inline]
188    pub fn notify_all(&self) -> usize {
189        // Nothing to do if there are no waiting threads
190        let state = self.state.load(Ordering::Relaxed);
191        if state.is_null() {
192            return 0;
193        }
194
195        self.notify_all_slow(state)
196    }
197
198    #[cold]
199    fn notify_all_slow(&self, mutex: *mut RawMutex) -> usize {
200        unsafe {
201            // Unpark one thread and requeue the rest onto the mutex
202            let from = self as *const _ as usize;
203            let to = mutex as usize;
204            let validate = || {
205                // Make sure that our atomic state still points to the same
206                // mutex. If not then it means that all threads on the current
207                // mutex were woken up and a new waiting thread switched to a
208                // different mutex. In that case we can get away with doing
209                // nothing.
210                if self.state.load(Ordering::Relaxed) != mutex {
211                    return RequeueOp::Abort;
212                }
213
214                // Clear our state since we are going to unpark or requeue all
215                // threads.
216                self.state.store(ptr::null_mut(), Ordering::Relaxed);
217
218                // Unpark one thread if the mutex is unlocked, otherwise just
219                // requeue everything to the mutex. This is safe to do here
220                // since unlocking the mutex when the parked bit is set requires
221                // locking the queue. There is the possibility of a race if the
222                // mutex gets locked after we check, but that doesn't matter in
223                // this case.
224                if (*mutex).mark_parked_if_locked() {
225                    RequeueOp::RequeueAll
226                } else {
227                    RequeueOp::UnparkOneRequeueRest
228                }
229            };
230            let callback = |op, result: UnparkResult| {
231                // If we requeued threads to the mutex, mark it as having
232                // parked threads. The RequeueAll case is already handled above.
233                if op == RequeueOp::UnparkOneRequeueRest && result.requeued_threads != 0 {
234                    (*mutex).mark_parked();
235                }
236                TOKEN_NORMAL
237            };
238            let res = parking_lot_core::unpark_requeue(from, to, validate, callback);
239
240            res.unparked_threads + res.requeued_threads
241        }
242    }
243
244    /// Blocks the current thread until this condition variable receives a
245    /// notification.
246    ///
247    /// This function will atomically unlock the mutex specified (represented by
248    /// `mutex_guard`) and block the current thread. This means that any calls
249    /// to `notify_*()` which happen logically after the mutex is unlocked are
250    /// candidates to wake this thread up. When this function call returns, the
251    /// lock specified will have been re-acquired.
252    ///
253    /// # Panics
254    ///
255    /// This function will panic if another thread is waiting on the `Condvar`
256    /// with a different `Mutex` object.
257    #[inline]
258    pub fn wait<T: ?Sized>(&self, mutex_guard: &mut MutexGuard<'_, T>) {
259        self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, None);
260    }
261
262    /// Waits on this condition variable for a notification, timing out after
263    /// the specified time instant.
264    ///
265    /// The semantics of this function are equivalent to `wait()` except that
266    /// the thread will be blocked roughly until `timeout` is reached. This
267    /// method should not be used for precise timing due to anomalies such as
268    /// preemption or platform differences that may not cause the maximum
269    /// amount of time waited to be precisely `timeout`.
270    ///
271    /// Note that the best effort is made to ensure that the time waited is
272    /// measured with a monotonic clock, and not affected by the changes made to
273    /// the system time.
274    ///
275    /// The returned `WaitTimeoutResult` value indicates if the timeout is
276    /// known to have elapsed.
277    ///
278    /// Like `wait`, the lock specified will be re-acquired when this function
279    /// returns, regardless of whether the timeout elapsed or not.
280    ///
281    /// # Panics
282    ///
283    /// This function will panic if another thread is waiting on the `Condvar`
284    /// with a different `Mutex` object.
285    #[inline]
286    pub fn wait_until<T: ?Sized>(
287        &self,
288        mutex_guard: &mut MutexGuard<'_, T>,
289        timeout: Instant,
290    ) -> WaitTimeoutResult {
291        self.wait_until_internal(
292            unsafe { MutexGuard::mutex(mutex_guard).raw() },
293            Some(timeout),
294        )
295    }
296
297    // This is a non-generic function to reduce the monomorphization cost of
298    // using `wait_until`.
299    fn wait_until_internal(&self, mutex: &RawMutex, timeout: Option<Instant>) -> WaitTimeoutResult {
300        unsafe {
301            let result;
302            let mut bad_mutex = false;
303            let mut requeued = false;
304            {
305                let addr = self as *const _ as usize;
306                let lock_addr = mutex as *const _ as *mut _;
307                let validate = || {
308                    // Ensure we don't use two different mutexes with the same
309                    // Condvar at the same time. This is done while locked to
310                    // avoid races with notify_one
311                    let state = self.state.load(Ordering::Relaxed);
312                    if state.is_null() {
313                        self.state.store(lock_addr, Ordering::Relaxed);
314                    } else if state != lock_addr {
315                        bad_mutex = true;
316                        return false;
317                    }
318                    true
319                };
320                let before_sleep = || {
321                    // Unlock the mutex before sleeping...
322                    mutex.unlock();
323                };
324                let timed_out = |k, was_last_thread| {
325                    // If we were requeued to a mutex, then we did not time out.
326                    // We'll just park ourselves on the mutex again when we try
327                    // to lock it later.
328                    requeued = k != addr;
329
330                    // If we were the last thread on the queue then we need to
331                    // clear our state. This is normally done by the
332                    // notify_{one,all} functions when not timing out.
333                    if !requeued && was_last_thread {
334                        self.state.store(ptr::null_mut(), Ordering::Relaxed);
335                    }
336                };
337                result = parking_lot_core::park(
338                    addr,
339                    validate,
340                    before_sleep,
341                    timed_out,
342                    DEFAULT_PARK_TOKEN,
343                    timeout,
344                );
345            }
346
347            // Panic if we tried to use multiple mutexes with a Condvar. Note
348            // that at this point the MutexGuard is still locked. It will be
349            // unlocked by the unwinding logic.
350            if bad_mutex {
351                panic!("attempted to use a condition variable with more than one mutex");
352            }
353
354            // ... and re-lock it once we are done sleeping
355            if result == ParkResult::Unparked(TOKEN_HANDOFF) {
356                deadlock::acquire_resource(mutex as *const _ as usize);
357            } else {
358                mutex.lock();
359            }
360
361            WaitTimeoutResult(!(result.is_unparked() || requeued))
362        }
363    }
364
365    /// Waits on this condition variable for a notification, timing out after a
366    /// specified duration.
367    ///
368    /// The semantics of this function are equivalent to `wait()` except that
369    /// the thread will be blocked for roughly no longer than `timeout`. This
370    /// method should not be used for precise timing due to anomalies such as
371    /// preemption or platform differences that may not cause the maximum
372    /// amount of time waited to be precisely `timeout`.
373    ///
374    /// Note that the best effort is made to ensure that the time waited is
375    /// measured with a monotonic clock, and not affected by the changes made to
376    /// the system time.
377    ///
378    /// The returned `WaitTimeoutResult` value indicates if the timeout is
379    /// known to have elapsed.
380    ///
381    /// Like `wait`, the lock specified will be re-acquired when this function
382    /// returns, regardless of whether the timeout elapsed or not.
383    #[inline]
384    pub fn wait_for<T: ?Sized>(
385        &self,
386        mutex_guard: &mut MutexGuard<'_, T>,
387        timeout: Duration,
388    ) -> WaitTimeoutResult {
389        let deadline = util::to_deadline(timeout);
390        self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, deadline)
391    }
392}
393
394impl Default for Condvar {
395    #[inline]
396    fn default() -> Condvar {
397        Condvar::new()
398    }
399}
400
401impl fmt::Debug for Condvar {
402    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
403        f.pad("Condvar { .. }")
404    }
405}
406
407#[cfg(test)]
408mod tests {
409    use crate::{Condvar, Mutex, MutexGuard};
410    use std::sync::mpsc::channel;
411    use std::sync::Arc;
412    use std::thread;
413    use std::time::Duration;
414    use std::time::Instant;
415
416    #[test]
417    fn smoke() {
418        let c = Condvar::new();
419        c.notify_one();
420        c.notify_all();
421    }
422
423    #[test]
424    fn notify_one() {
425        let m = Arc::new(Mutex::new(()));
426        let m2 = m.clone();
427        let c = Arc::new(Condvar::new());
428        let c2 = c.clone();
429
430        let mut g = m.lock();
431        let _t = thread::spawn(move || {
432            let _g = m2.lock();
433            c2.notify_one();
434        });
435        c.wait(&mut g);
436    }
437
438    #[test]
439    fn notify_all() {
440        const N: usize = 10;
441
442        let data = Arc::new((Mutex::new(0), Condvar::new()));
443        let (tx, rx) = channel();
444        for _ in 0..N {
445            let data = data.clone();
446            let tx = tx.clone();
447            thread::spawn(move || {
448                let &(ref lock, ref cond) = &*data;
449                let mut cnt = lock.lock();
450                *cnt += 1;
451                if *cnt == N {
452                    tx.send(()).unwrap();
453                }
454                while *cnt != 0 {
455                    cond.wait(&mut cnt);
456                }
457                tx.send(()).unwrap();
458            });
459        }
460        drop(tx);
461
462        let &(ref lock, ref cond) = &*data;
463        rx.recv().unwrap();
464        let mut cnt = lock.lock();
465        *cnt = 0;
466        cond.notify_all();
467        drop(cnt);
468
469        for _ in 0..N {
470            rx.recv().unwrap();
471        }
472    }
473
474    #[test]
475    fn notify_one_return_true() {
476        let m = Arc::new(Mutex::new(()));
477        let m2 = m.clone();
478        let c = Arc::new(Condvar::new());
479        let c2 = c.clone();
480
481        let mut g = m.lock();
482        let _t = thread::spawn(move || {
483            let _g = m2.lock();
484            assert!(c2.notify_one());
485        });
486        c.wait(&mut g);
487    }
488
489    #[test]
490    fn notify_one_return_false() {
491        let m = Arc::new(Mutex::new(()));
492        let c = Arc::new(Condvar::new());
493
494        let _t = thread::spawn(move || {
495            let _g = m.lock();
496            assert!(!c.notify_one());
497        });
498    }
499
500    #[test]
501    fn notify_all_return() {
502        const N: usize = 10;
503
504        let data = Arc::new((Mutex::new(0), Condvar::new()));
505        let (tx, rx) = channel();
506        for _ in 0..N {
507            let data = data.clone();
508            let tx = tx.clone();
509            thread::spawn(move || {
510                let &(ref lock, ref cond) = &*data;
511                let mut cnt = lock.lock();
512                *cnt += 1;
513                if *cnt == N {
514                    tx.send(()).unwrap();
515                }
516                while *cnt != 0 {
517                    cond.wait(&mut cnt);
518                }
519                tx.send(()).unwrap();
520            });
521        }
522        drop(tx);
523
524        let &(ref lock, ref cond) = &*data;
525        rx.recv().unwrap();
526        let mut cnt = lock.lock();
527        *cnt = 0;
528        assert_eq!(cond.notify_all(), N);
529        drop(cnt);
530
531        for _ in 0..N {
532            rx.recv().unwrap();
533        }
534
535        assert_eq!(cond.notify_all(), 0);
536    }
537
538    #[test]
539    fn wait_for() {
540        let m = Arc::new(Mutex::new(()));
541        let m2 = m.clone();
542        let c = Arc::new(Condvar::new());
543        let c2 = c.clone();
544
545        let mut g = m.lock();
546        let no_timeout = c.wait_for(&mut g, Duration::from_millis(1));
547        assert!(no_timeout.timed_out());
548
549        let _t = thread::spawn(move || {
550            let _g = m2.lock();
551            c2.notify_one();
552        });
553        let timeout_res = c.wait_for(&mut g, Duration::from_secs(u64::max_value()));
554        assert!(!timeout_res.timed_out());
555
556        drop(g);
557    }
558
559    #[test]
560    fn wait_until() {
561        let m = Arc::new(Mutex::new(()));
562        let m2 = m.clone();
563        let c = Arc::new(Condvar::new());
564        let c2 = c.clone();
565
566        let mut g = m.lock();
567        let no_timeout = c.wait_until(&mut g, Instant::now() + Duration::from_millis(1));
568        assert!(no_timeout.timed_out());
569        let _t = thread::spawn(move || {
570            let _g = m2.lock();
571            c2.notify_one();
572        });
573        let timeout_res = c.wait_until(
574            &mut g,
575            Instant::now() + Duration::from_millis(u32::max_value() as u64),
576        );
577        assert!(!timeout_res.timed_out());
578        drop(g);
579    }
580
581    #[test]
582    #[should_panic]
583    fn two_mutexes() {
584        let m = Arc::new(Mutex::new(()));
585        let m2 = m.clone();
586        let m3 = Arc::new(Mutex::new(()));
587        let c = Arc::new(Condvar::new());
588        let c2 = c.clone();
589
590        // Make sure we don't leave the child thread dangling
591        struct PanicGuard<'a>(&'a Condvar);
592        impl<'a> Drop for PanicGuard<'a> {
593            fn drop(&mut self) {
594                self.0.notify_one();
595            }
596        }
597
598        let (tx, rx) = channel();
599        let g = m.lock();
600        let _t = thread::spawn(move || {
601            let mut g = m2.lock();
602            tx.send(()).unwrap();
603            c2.wait(&mut g);
604        });
605        drop(g);
606        rx.recv().unwrap();
607        let _g = m.lock();
608        let _guard = PanicGuard(&*c);
609        c.wait(&mut m3.lock());
610    }
611
612    #[test]
613    fn two_mutexes_disjoint() {
614        let m = Arc::new(Mutex::new(()));
615        let m2 = m.clone();
616        let m3 = Arc::new(Mutex::new(()));
617        let c = Arc::new(Condvar::new());
618        let c2 = c.clone();
619
620        let mut g = m.lock();
621        let _t = thread::spawn(move || {
622            let _g = m2.lock();
623            c2.notify_one();
624        });
625        c.wait(&mut g);
626        drop(g);
627
628        let _ = c.wait_for(&mut m3.lock(), Duration::from_millis(1));
629    }
630
631    #[test]
632    fn test_debug_condvar() {
633        let c = Condvar::new();
634        assert_eq!(format!("{:?}", c), "Condvar { .. }");
635    }
636
637    #[test]
638    fn test_condvar_requeue() {
639        let m = Arc::new(Mutex::new(()));
640        let m2 = m.clone();
641        let c = Arc::new(Condvar::new());
642        let c2 = c.clone();
643        let t = thread::spawn(move || {
644            let mut g = m2.lock();
645            c2.wait(&mut g);
646        });
647
648        let mut g = m.lock();
649        while !c.notify_one() {
650            // Wait for the thread to get into wait()
651            MutexGuard::bump(&mut g);
652            // Yield, so the other thread gets a chance to do something.
653            // (At least Miri needs this, because it doesn't preempt threads.)
654            thread::yield_now();
655        }
656        // The thread should have been requeued to the mutex, which we wake up now.
657        drop(g);
658        t.join().unwrap();
659    }
660
661    #[test]
662    fn test_issue_129() {
663        let locks = Arc::new((Mutex::new(()), Condvar::new()));
664
665        let (tx, rx) = channel();
666        for _ in 0..4 {
667            let locks = locks.clone();
668            let tx = tx.clone();
669            thread::spawn(move || {
670                let mut guard = locks.0.lock();
671                locks.1.wait(&mut guard);
672                locks.1.wait_for(&mut guard, Duration::from_millis(1));
673                locks.1.notify_one();
674                tx.send(()).unwrap();
675            });
676        }
677
678        thread::sleep(Duration::from_millis(100));
679        locks.1.notify_one();
680
681        for _ in 0..4 {
682            assert_eq!(rx.recv_timeout(Duration::from_millis(500)), Ok(()));
683        }
684    }
685}
686
687/// This module contains an integration test that is heavily inspired from WebKit's own integration
688/// tests for it's own Condvar.
689#[cfg(test)]
690mod webkit_queue_test {
691    use crate::{Condvar, Mutex, MutexGuard};
692    use std::{collections::VecDeque, sync::Arc, thread, time::Duration};
693
694    #[derive(Clone, Copy)]
695    enum Timeout {
696        Bounded(Duration),
697        Forever,
698    }
699
700    #[derive(Clone, Copy)]
701    enum NotifyStyle {
702        One,
703        All,
704    }
705
706    struct Queue {
707        items: VecDeque<usize>,
708        should_continue: bool,
709    }
710
711    impl Queue {
712        fn new() -> Self {
713            Self {
714                items: VecDeque::new(),
715                should_continue: true,
716            }
717        }
718    }
719
720    fn wait<T: ?Sized>(
721        condition: &Condvar,
722        lock: &mut MutexGuard<'_, T>,
723        predicate: impl Fn(&mut MutexGuard<'_, T>) -> bool,
724        timeout: &Timeout,
725    ) {
726        while !predicate(lock) {
727            match timeout {
728                Timeout::Forever => condition.wait(lock),
729                Timeout::Bounded(bound) => {
730                    condition.wait_for(lock, *bound);
731                }
732            }
733        }
734    }
735
736    fn notify(style: NotifyStyle, condition: &Condvar, should_notify: bool) {
737        match style {
738            NotifyStyle::One => {
739                condition.notify_one();
740            }
741            NotifyStyle::All => {
742                if should_notify {
743                    condition.notify_all();
744                }
745            }
746        }
747    }
748
749    fn run_queue_test(
750        num_producers: usize,
751        num_consumers: usize,
752        max_queue_size: usize,
753        messages_per_producer: usize,
754        notify_style: NotifyStyle,
755        timeout: Timeout,
756        delay: Duration,
757    ) {
758        let input_queue = Arc::new(Mutex::new(Queue::new()));
759        let empty_condition = Arc::new(Condvar::new());
760        let full_condition = Arc::new(Condvar::new());
761
762        let output_vec = Arc::new(Mutex::new(vec![]));
763
764        let consumers = (0..num_consumers)
765            .map(|_| {
766                consumer_thread(
767                    input_queue.clone(),
768                    empty_condition.clone(),
769                    full_condition.clone(),
770                    timeout,
771                    notify_style,
772                    output_vec.clone(),
773                    max_queue_size,
774                )
775            })
776            .collect::<Vec<_>>();
777        let producers = (0..num_producers)
778            .map(|_| {
779                producer_thread(
780                    messages_per_producer,
781                    input_queue.clone(),
782                    empty_condition.clone(),
783                    full_condition.clone(),
784                    timeout,
785                    notify_style,
786                    max_queue_size,
787                )
788            })
789            .collect::<Vec<_>>();
790
791        thread::sleep(delay);
792
793        for producer in producers.into_iter() {
794            producer.join().expect("Producer thread panicked");
795        }
796
797        {
798            let mut input_queue = input_queue.lock();
799            input_queue.should_continue = false;
800        }
801        empty_condition.notify_all();
802
803        for consumer in consumers.into_iter() {
804            consumer.join().expect("Consumer thread panicked");
805        }
806
807        let mut output_vec = output_vec.lock();
808        assert_eq!(output_vec.len(), num_producers * messages_per_producer);
809        output_vec.sort();
810        for msg_idx in 0..messages_per_producer {
811            for producer_idx in 0..num_producers {
812                assert_eq!(msg_idx, output_vec[msg_idx * num_producers + producer_idx]);
813            }
814        }
815    }
816
817    fn consumer_thread(
818        input_queue: Arc<Mutex<Queue>>,
819        empty_condition: Arc<Condvar>,
820        full_condition: Arc<Condvar>,
821        timeout: Timeout,
822        notify_style: NotifyStyle,
823        output_queue: Arc<Mutex<Vec<usize>>>,
824        max_queue_size: usize,
825    ) -> thread::JoinHandle<()> {
826        thread::spawn(move || loop {
827            let (should_notify, result) = {
828                let mut queue = input_queue.lock();
829                wait(
830                    &*empty_condition,
831                    &mut queue,
832                    |state| -> bool { !state.items.is_empty() || !state.should_continue },
833                    &timeout,
834                );
835                if queue.items.is_empty() && !queue.should_continue {
836                    return;
837                }
838                let should_notify = queue.items.len() == max_queue_size;
839                let result = queue.items.pop_front();
840                std::mem::drop(queue);
841                (should_notify, result)
842            };
843            notify(notify_style, &*full_condition, should_notify);
844
845            if let Some(result) = result {
846                output_queue.lock().push(result);
847            }
848        })
849    }
850
851    fn producer_thread(
852        num_messages: usize,
853        queue: Arc<Mutex<Queue>>,
854        empty_condition: Arc<Condvar>,
855        full_condition: Arc<Condvar>,
856        timeout: Timeout,
857        notify_style: NotifyStyle,
858        max_queue_size: usize,
859    ) -> thread::JoinHandle<()> {
860        thread::spawn(move || {
861            for message in 0..num_messages {
862                let should_notify = {
863                    let mut queue = queue.lock();
864                    wait(
865                        &*full_condition,
866                        &mut queue,
867                        |state| state.items.len() < max_queue_size,
868                        &timeout,
869                    );
870                    let should_notify = queue.items.is_empty();
871                    queue.items.push_back(message);
872                    std::mem::drop(queue);
873                    should_notify
874                };
875                notify(notify_style, &*empty_condition, should_notify);
876            }
877        })
878    }
879
880    macro_rules! run_queue_tests {
881        ( $( $name:ident(
882            num_producers: $num_producers:expr,
883            num_consumers: $num_consumers:expr,
884            max_queue_size: $max_queue_size:expr,
885            messages_per_producer: $messages_per_producer:expr,
886            notification_style: $notification_style:expr,
887            timeout: $timeout:expr,
888            delay_seconds: $delay_seconds:expr);
889        )* ) => {
890            $(#[test]
891            fn $name() {
892                let delay = Duration::from_secs($delay_seconds);
893                run_queue_test(
894                    $num_producers,
895                    $num_consumers,
896                    $max_queue_size,
897                    $messages_per_producer,
898                    $notification_style,
899                    $timeout,
900                    delay,
901                    );
902            })*
903        };
904    }
905
906    run_queue_tests! {
907        sanity_check_queue(
908            num_producers: 1,
909            num_consumers: 1,
910            max_queue_size: 1,
911            messages_per_producer: 100_000,
912            notification_style: NotifyStyle::All,
913            timeout: Timeout::Bounded(Duration::from_secs(1)),
914            delay_seconds: 0
915        );
916        sanity_check_queue_timeout(
917            num_producers: 1,
918            num_consumers: 1,
919            max_queue_size: 1,
920            messages_per_producer: 100_000,
921            notification_style: NotifyStyle::All,
922            timeout: Timeout::Forever,
923            delay_seconds: 0
924        );
925        new_test_without_timeout_5(
926            num_producers: 1,
927            num_consumers: 5,
928            max_queue_size: 1,
929            messages_per_producer: 100_000,
930            notification_style: NotifyStyle::All,
931            timeout: Timeout::Forever,
932            delay_seconds: 0
933        );
934        one_producer_one_consumer_one_slot(
935            num_producers: 1,
936            num_consumers: 1,
937            max_queue_size: 1,
938            messages_per_producer: 100_000,
939            notification_style: NotifyStyle::All,
940            timeout: Timeout::Forever,
941            delay_seconds: 0
942        );
943        one_producer_one_consumer_one_slot_timeout(
944            num_producers: 1,
945            num_consumers: 1,
946            max_queue_size: 1,
947            messages_per_producer: 100_000,
948            notification_style: NotifyStyle::All,
949            timeout: Timeout::Forever,
950            delay_seconds: 1
951        );
952        one_producer_one_consumer_hundred_slots(
953            num_producers: 1,
954            num_consumers: 1,
955            max_queue_size: 100,
956            messages_per_producer: 1_000_000,
957            notification_style: NotifyStyle::All,
958            timeout: Timeout::Forever,
959            delay_seconds: 0
960        );
961        ten_producers_one_consumer_one_slot(
962            num_producers: 10,
963            num_consumers: 1,
964            max_queue_size: 1,
965            messages_per_producer: 10000,
966            notification_style: NotifyStyle::All,
967            timeout: Timeout::Forever,
968            delay_seconds: 0
969        );
970        ten_producers_one_consumer_hundred_slots_notify_all(
971            num_producers: 10,
972            num_consumers: 1,
973            max_queue_size: 100,
974            messages_per_producer: 10000,
975            notification_style: NotifyStyle::All,
976            timeout: Timeout::Forever,
977            delay_seconds: 0
978        );
979        ten_producers_one_consumer_hundred_slots_notify_one(
980            num_producers: 10,
981            num_consumers: 1,
982            max_queue_size: 100,
983            messages_per_producer: 10000,
984            notification_style: NotifyStyle::One,
985            timeout: Timeout::Forever,
986            delay_seconds: 0
987        );
988        one_producer_ten_consumers_one_slot(
989            num_producers: 1,
990            num_consumers: 10,
991            max_queue_size: 1,
992            messages_per_producer: 10000,
993            notification_style: NotifyStyle::All,
994            timeout: Timeout::Forever,
995            delay_seconds: 0
996        );
997        one_producer_ten_consumers_hundred_slots_notify_all(
998            num_producers: 1,
999            num_consumers: 10,
1000            max_queue_size: 100,
1001            messages_per_producer: 100_000,
1002            notification_style: NotifyStyle::All,
1003            timeout: Timeout::Forever,
1004            delay_seconds: 0
1005        );
1006        one_producer_ten_consumers_hundred_slots_notify_one(
1007            num_producers: 1,
1008            num_consumers: 10,
1009            max_queue_size: 100,
1010            messages_per_producer: 100_000,
1011            notification_style: NotifyStyle::One,
1012            timeout: Timeout::Forever,
1013            delay_seconds: 0
1014        );
1015        ten_producers_ten_consumers_one_slot(
1016            num_producers: 10,
1017            num_consumers: 10,
1018            max_queue_size: 1,
1019            messages_per_producer: 50000,
1020            notification_style: NotifyStyle::All,
1021            timeout: Timeout::Forever,
1022            delay_seconds: 0
1023        );
1024        ten_producers_ten_consumers_hundred_slots_notify_all(
1025            num_producers: 10,
1026            num_consumers: 10,
1027            max_queue_size: 100,
1028            messages_per_producer: 50000,
1029            notification_style: NotifyStyle::All,
1030            timeout: Timeout::Forever,
1031            delay_seconds: 0
1032        );
1033        ten_producers_ten_consumers_hundred_slots_notify_one(
1034            num_producers: 10,
1035            num_consumers: 10,
1036            max_queue_size: 100,
1037            messages_per_producer: 50000,
1038            notification_style: NotifyStyle::One,
1039            timeout: Timeout::Forever,
1040            delay_seconds: 0
1041        );
1042    }
1043}