parking_lot_core/
parking_lot.rs

1// Copyright 2016 Amanieu d'Antras
2//
3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. This file may not be
6// copied, modified, or distributed except according to those terms.
7use crate::thread_parker::{ThreadParker, ThreadParkerT, UnparkHandleT};
8use crate::util::UncheckedOptionExt;
9use crate::word_lock::WordLock;
10use core::{
11    cell::{Cell, UnsafeCell},
12    ptr,
13    sync::atomic::{AtomicPtr, AtomicUsize, Ordering},
14};
15use smallvec::SmallVec;
16use std::time::{Duration, Instant};
17
18// Don't use Instant on wasm32-unknown-unknown, it just panics.
19cfg_if::cfg_if! {
20    if #[cfg(all(
21        target_family = "wasm",
22        target_os = "unknown",
23        target_vendor = "unknown"
24    ))] {
25        #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
26        struct TimeoutInstant;
27        impl TimeoutInstant {
28            fn now() -> TimeoutInstant {
29                TimeoutInstant
30            }
31        }
32        impl core::ops::Add<Duration> for TimeoutInstant {
33            type Output = Self;
34            fn add(self, _rhs: Duration) -> Self::Output {
35                TimeoutInstant
36            }
37        }
38    } else {
39        use std::time::Instant as TimeoutInstant;
40    }
41}
42
43static NUM_THREADS: AtomicUsize = AtomicUsize::new(0);
44
45/// Holds the pointer to the currently active `HashTable`.
46///
47/// # Safety
48///
49/// Except for the initial value of null, it must always point to a valid `HashTable` instance.
50/// Any `HashTable` this global static has ever pointed to must never be freed.
51static HASHTABLE: AtomicPtr<HashTable> = AtomicPtr::new(ptr::null_mut());
52
53// Even with 3x more buckets than threads, the memory overhead per thread is
54// still only a few hundred bytes per thread.
55const LOAD_FACTOR: usize = 3;
56
57struct HashTable {
58    // Hash buckets for the table
59    entries: Box<[Bucket]>,
60
61    // Number of bits used for the hash function
62    hash_bits: u32,
63
64    // Previous table. This is only kept to keep leak detectors happy.
65    _prev: *const HashTable,
66}
67
68impl HashTable {
69    #[inline]
70    fn new(num_threads: usize, prev: *const HashTable) -> Box<HashTable> {
71        let new_size = (num_threads * LOAD_FACTOR).next_power_of_two();
72        let hash_bits = 0usize.leading_zeros() - new_size.leading_zeros() - 1;
73
74        let now = TimeoutInstant::now();
75        let mut entries = Vec::with_capacity(new_size);
76        for i in 0..new_size {
77            // We must ensure the seed is not zero
78            entries.push(Bucket::new(now, i as u32 + 1));
79        }
80
81        Box::new(HashTable {
82            entries: entries.into_boxed_slice(),
83            hash_bits,
84            _prev: prev,
85        })
86    }
87}
88
89#[repr(align(64))]
90struct Bucket {
91    // Lock protecting the queue
92    mutex: WordLock,
93
94    // Linked list of threads waiting on this bucket
95    queue_head: Cell<*const ThreadData>,
96    queue_tail: Cell<*const ThreadData>,
97
98    // Next time at which point be_fair should be set
99    fair_timeout: UnsafeCell<FairTimeout>,
100}
101
102impl Bucket {
103    #[inline]
104    pub fn new(timeout: TimeoutInstant, seed: u32) -> Self {
105        Self {
106            mutex: WordLock::new(),
107            queue_head: Cell::new(ptr::null()),
108            queue_tail: Cell::new(ptr::null()),
109            fair_timeout: UnsafeCell::new(FairTimeout::new(timeout, seed)),
110        }
111    }
112}
113
114struct FairTimeout {
115    // Next time at which point be_fair should be set
116    timeout: TimeoutInstant,
117
118    // the PRNG state for calculating the next timeout
119    seed: u32,
120}
121
122impl FairTimeout {
123    #[inline]
124    fn new(timeout: TimeoutInstant, seed: u32) -> FairTimeout {
125        FairTimeout { timeout, seed }
126    }
127
128    // Determine whether we should force a fair unlock, and update the timeout
129    #[inline]
130    fn should_timeout(&mut self) -> bool {
131        let now = TimeoutInstant::now();
132        if now > self.timeout {
133            // Time between 0 and 1ms.
134            let nanos = self.gen_u32() % 1_000_000;
135            self.timeout = now + Duration::new(0, nanos);
136            true
137        } else {
138            false
139        }
140    }
141
142    // Pseudorandom number generator from the "Xorshift RNGs" paper by George Marsaglia.
143    fn gen_u32(&mut self) -> u32 {
144        self.seed ^= self.seed << 13;
145        self.seed ^= self.seed >> 17;
146        self.seed ^= self.seed << 5;
147        self.seed
148    }
149}
150
151struct ThreadData {
152    parker: ThreadParker,
153
154    // Key that this thread is sleeping on. This may change if the thread is
155    // requeued to a different key.
156    key: AtomicUsize,
157
158    // Linked list of parked threads in a bucket
159    next_in_queue: Cell<*const ThreadData>,
160
161    // UnparkToken passed to this thread when it is unparked
162    unpark_token: Cell<UnparkToken>,
163
164    // ParkToken value set by the thread when it was parked
165    park_token: Cell<ParkToken>,
166
167    // Is the thread parked with a timeout?
168    parked_with_timeout: Cell<bool>,
169
170    // Extra data for deadlock detection
171    #[cfg(feature = "deadlock_detection")]
172    deadlock_data: deadlock::DeadlockData,
173}
174
175impl ThreadData {
176    fn new() -> ThreadData {
177        // Keep track of the total number of live ThreadData objects and resize
178        // the hash table accordingly.
179        let num_threads = NUM_THREADS.fetch_add(1, Ordering::Relaxed) + 1;
180        grow_hashtable(num_threads);
181
182        ThreadData {
183            parker: ThreadParker::new(),
184            key: AtomicUsize::new(0),
185            next_in_queue: Cell::new(ptr::null()),
186            unpark_token: Cell::new(DEFAULT_UNPARK_TOKEN),
187            park_token: Cell::new(DEFAULT_PARK_TOKEN),
188            parked_with_timeout: Cell::new(false),
189            #[cfg(feature = "deadlock_detection")]
190            deadlock_data: deadlock::DeadlockData::new(),
191        }
192    }
193}
194
195// Invokes the given closure with a reference to the current thread `ThreadData`.
196#[inline(always)]
197fn with_thread_data<T>(f: impl FnOnce(&ThreadData) -> T) -> T {
198    // Unlike word_lock::ThreadData, parking_lot::ThreadData is always expensive
199    // to construct. Try to use a thread-local version if possible. Otherwise just
200    // create a ThreadData on the stack
201    let mut thread_data_storage = None;
202    thread_local!(static THREAD_DATA: ThreadData = ThreadData::new());
203    let thread_data_ptr = THREAD_DATA
204        .try_with(|x| x as *const ThreadData)
205        .unwrap_or_else(|_| thread_data_storage.get_or_insert_with(ThreadData::new));
206
207    f(unsafe { &*thread_data_ptr })
208}
209
210impl Drop for ThreadData {
211    fn drop(&mut self) {
212        NUM_THREADS.fetch_sub(1, Ordering::Relaxed);
213    }
214}
215
216/// Returns a reference to the latest hash table, creating one if it doesn't exist yet.
217/// The reference is valid forever. However, the `HashTable` it references might become stale
218/// at any point. Meaning it still exists, but it is not the instance in active use.
219#[inline]
220fn get_hashtable() -> &'static HashTable {
221    let table = HASHTABLE.load(Ordering::Acquire);
222
223    // If there is no table, create one
224    if table.is_null() {
225        create_hashtable()
226    } else {
227        // SAFETY: when not null, `HASHTABLE` always points to a `HashTable` that is never freed.
228        unsafe { &*table }
229    }
230}
231
232/// Returns a reference to the latest hash table, creating one if it doesn't exist yet.
233/// The reference is valid forever. However, the `HashTable` it references might become stale
234/// at any point. Meaning it still exists, but it is not the instance in active use.
235#[cold]
236fn create_hashtable() -> &'static HashTable {
237    let new_table = Box::into_raw(HashTable::new(LOAD_FACTOR, ptr::null()));
238
239    // If this fails then it means some other thread created the hash table first.
240    let table = match HASHTABLE.compare_exchange(
241        ptr::null_mut(),
242        new_table,
243        Ordering::AcqRel,
244        Ordering::Acquire,
245    ) {
246        Ok(_) => new_table,
247        Err(old_table) => {
248            // Free the table we created
249            // SAFETY: `new_table` is created from `Box::into_raw` above and only freed here.
250            unsafe {
251                let _ = Box::from_raw(new_table);
252            }
253            old_table
254        }
255    };
256    // SAFETY: The `HashTable` behind `table` is never freed. It is either the table pointer we
257    // created here, or it is one loaded from `HASHTABLE`.
258    unsafe { &*table }
259}
260
261// Grow the hash table so that it is big enough for the given number of threads.
262// This isn't performance-critical since it is only done when a ThreadData is
263// created, which only happens once per thread.
264fn grow_hashtable(num_threads: usize) {
265    // Lock all buckets in the existing table and get a reference to it
266    let old_table = loop {
267        let table = get_hashtable();
268
269        // Check if we need to resize the existing table
270        if table.entries.len() >= LOAD_FACTOR * num_threads {
271            return;
272        }
273
274        // Lock all buckets in the old table
275        for bucket in &table.entries[..] {
276            bucket.mutex.lock();
277        }
278
279        // Now check if our table is still the latest one. Another thread could
280        // have grown the hash table between us reading HASHTABLE and locking
281        // the buckets.
282        if HASHTABLE.load(Ordering::Relaxed) == table as *const _ as *mut _ {
283            break table;
284        }
285
286        // Unlock buckets and try again
287        for bucket in &table.entries[..] {
288            // SAFETY: We hold the lock here, as required
289            unsafe { bucket.mutex.unlock() };
290        }
291    };
292
293    // Create the new table
294    let mut new_table = HashTable::new(num_threads, old_table);
295
296    // Move the entries from the old table to the new one
297    for bucket in &old_table.entries[..] {
298        // SAFETY: The park, unpark* and check_wait_graph_fast functions create only correct linked
299        // lists. All `ThreadData` instances in these lists will remain valid as long as they are
300        // present in the lists, meaning as long as their threads are parked.
301        unsafe { rehash_bucket_into(bucket, &mut new_table) };
302    }
303
304    // Publish the new table. No races are possible at this point because
305    // any other thread trying to grow the hash table is blocked on the bucket
306    // locks in the old table.
307    HASHTABLE.store(Box::into_raw(new_table), Ordering::Release);
308
309    // Unlock all buckets in the old table
310    for bucket in &old_table.entries[..] {
311        // SAFETY: We hold the lock here, as required
312        unsafe { bucket.mutex.unlock() };
313    }
314}
315
316/// Iterate through all `ThreadData` objects in the bucket and insert them into the given table
317/// in the bucket their key correspond to for this table.
318///
319/// # Safety
320///
321/// The given `bucket` must have a correctly constructed linked list under `queue_head`, containing
322/// `ThreadData` instances that must stay valid at least as long as the given `table` is in use.
323///
324/// The given `table` must only contain buckets with correctly constructed linked lists.
325unsafe fn rehash_bucket_into(bucket: &'static Bucket, table: &mut HashTable) {
326    let mut current: *const ThreadData = bucket.queue_head.get();
327    while !current.is_null() {
328        let next = (*current).next_in_queue.get();
329        let hash = hash((*current).key.load(Ordering::Relaxed), table.hash_bits);
330        if table.entries[hash].queue_tail.get().is_null() {
331            table.entries[hash].queue_head.set(current);
332        } else {
333            (*table.entries[hash].queue_tail.get())
334                .next_in_queue
335                .set(current);
336        }
337        table.entries[hash].queue_tail.set(current);
338        (*current).next_in_queue.set(ptr::null());
339        current = next;
340    }
341}
342
343// Hash function for addresses
344#[cfg(target_pointer_width = "32")]
345#[inline]
346fn hash(key: usize, bits: u32) -> usize {
347    key.wrapping_mul(0x9E3779B9) >> (32 - bits)
348}
349#[cfg(target_pointer_width = "64")]
350#[inline]
351fn hash(key: usize, bits: u32) -> usize {
352    key.wrapping_mul(0x9E3779B97F4A7C15) >> (64 - bits)
353}
354
355/// Locks the bucket for the given key and returns a reference to it.
356/// The returned bucket must be unlocked again in order to not cause deadlocks.
357#[inline]
358fn lock_bucket(key: usize) -> &'static Bucket {
359    loop {
360        let hashtable = get_hashtable();
361
362        let hash = hash(key, hashtable.hash_bits);
363        let bucket = &hashtable.entries[hash];
364
365        // Lock the bucket
366        bucket.mutex.lock();
367
368        // If no other thread has rehashed the table before we grabbed the lock
369        // then we are good to go! The lock we grabbed prevents any rehashes.
370        if HASHTABLE.load(Ordering::Relaxed) == hashtable as *const _ as *mut _ {
371            return bucket;
372        }
373
374        // Unlock the bucket and try again
375        // SAFETY: We hold the lock here, as required
376        unsafe { bucket.mutex.unlock() };
377    }
378}
379
380/// Locks the bucket for the given key and returns a reference to it. But checks that the key
381/// hasn't been changed in the meantime due to a requeue.
382/// The returned bucket must be unlocked again in order to not cause deadlocks.
383#[inline]
384fn lock_bucket_checked(key: &AtomicUsize) -> (usize, &'static Bucket) {
385    loop {
386        let hashtable = get_hashtable();
387        let current_key = key.load(Ordering::Relaxed);
388
389        let hash = hash(current_key, hashtable.hash_bits);
390        let bucket = &hashtable.entries[hash];
391
392        // Lock the bucket
393        bucket.mutex.lock();
394
395        // Check that both the hash table and key are correct while the bucket
396        // is locked. Note that the key can't change once we locked the proper
397        // bucket for it, so we just keep trying until we have the correct key.
398        if HASHTABLE.load(Ordering::Relaxed) == hashtable as *const _ as *mut _
399            && key.load(Ordering::Relaxed) == current_key
400        {
401            return (current_key, bucket);
402        }
403
404        // Unlock the bucket and try again
405        // SAFETY: We hold the lock here, as required
406        unsafe { bucket.mutex.unlock() };
407    }
408}
409
410/// Locks the two buckets for the given pair of keys and returns references to them.
411/// The returned buckets must be unlocked again in order to not cause deadlocks.
412///
413/// If both keys hash to the same value, both returned references will be to the same bucket. Be
414/// careful to only unlock it once in this case, always use `unlock_bucket_pair`.
415#[inline]
416fn lock_bucket_pair(key1: usize, key2: usize) -> (&'static Bucket, &'static Bucket) {
417    loop {
418        let hashtable = get_hashtable();
419
420        let hash1 = hash(key1, hashtable.hash_bits);
421        let hash2 = hash(key2, hashtable.hash_bits);
422
423        // Get the bucket at the lowest hash/index first
424        let bucket1 = if hash1 <= hash2 {
425            &hashtable.entries[hash1]
426        } else {
427            &hashtable.entries[hash2]
428        };
429
430        // Lock the first bucket
431        bucket1.mutex.lock();
432
433        // If no other thread has rehashed the table before we grabbed the lock
434        // then we are good to go! The lock we grabbed prevents any rehashes.
435        if HASHTABLE.load(Ordering::Relaxed) == hashtable as *const _ as *mut _ {
436            // Now lock the second bucket and return the two buckets
437            if hash1 == hash2 {
438                return (bucket1, bucket1);
439            } else if hash1 < hash2 {
440                let bucket2 = &hashtable.entries[hash2];
441                bucket2.mutex.lock();
442                return (bucket1, bucket2);
443            } else {
444                let bucket2 = &hashtable.entries[hash1];
445                bucket2.mutex.lock();
446                return (bucket2, bucket1);
447            }
448        }
449
450        // Unlock the bucket and try again
451        // SAFETY: We hold the lock here, as required
452        unsafe { bucket1.mutex.unlock() };
453    }
454}
455
456/// Unlock a pair of buckets
457///
458/// # Safety
459///
460/// Both buckets must be locked
461#[inline]
462unsafe fn unlock_bucket_pair(bucket1: &Bucket, bucket2: &Bucket) {
463    bucket1.mutex.unlock();
464    if !ptr::eq(bucket1, bucket2) {
465        bucket2.mutex.unlock();
466    }
467}
468
469/// Result of a park operation.
470#[derive(Copy, Clone, Eq, PartialEq, Debug)]
471pub enum ParkResult {
472    /// We were unparked by another thread with the given token.
473    Unparked(UnparkToken),
474
475    /// The validation callback returned false.
476    Invalid,
477
478    /// The timeout expired.
479    TimedOut,
480}
481
482impl ParkResult {
483    /// Returns true if we were unparked by another thread.
484    #[inline]
485    pub fn is_unparked(self) -> bool {
486        if let ParkResult::Unparked(_) = self {
487            true
488        } else {
489            false
490        }
491    }
492}
493
494/// Result of an unpark operation.
495#[derive(Copy, Clone, Default, Eq, PartialEq, Debug)]
496pub struct UnparkResult {
497    /// The number of threads that were unparked.
498    pub unparked_threads: usize,
499
500    /// The number of threads that were requeued.
501    pub requeued_threads: usize,
502
503    /// Whether there are any threads remaining in the queue. This only returns
504    /// true if a thread was unparked.
505    pub have_more_threads: bool,
506
507    /// This is set to true on average once every 0.5ms for any given key. It
508    /// should be used to switch to a fair unlocking mechanism for a particular
509    /// unlock.
510    pub be_fair: bool,
511
512    /// Private field so new fields can be added without breakage.
513    _sealed: (),
514}
515
516/// Operation that `unpark_requeue` should perform.
517#[derive(Copy, Clone, Eq, PartialEq, Debug)]
518pub enum RequeueOp {
519    /// Abort the operation without doing anything.
520    Abort,
521
522    /// Unpark one thread and requeue the rest onto the target queue.
523    UnparkOneRequeueRest,
524
525    /// Requeue all threads onto the target queue.
526    RequeueAll,
527
528    /// Unpark one thread and leave the rest parked. No requeuing is done.
529    UnparkOne,
530
531    /// Requeue one thread and leave the rest parked on the original queue.
532    RequeueOne,
533}
534
535/// Operation that `unpark_filter` should perform for each thread.
536#[derive(Copy, Clone, Eq, PartialEq, Debug)]
537pub enum FilterOp {
538    /// Unpark the thread and continue scanning the list of parked threads.
539    Unpark,
540
541    /// Don't unpark the thread and continue scanning the list of parked threads.
542    Skip,
543
544    /// Don't unpark the thread and stop scanning the list of parked threads.
545    Stop,
546}
547
548/// A value which is passed from an unparker to a parked thread.
549#[derive(Copy, Clone, Eq, PartialEq, Debug)]
550pub struct UnparkToken(pub usize);
551
552/// A value associated with a parked thread which can be used by `unpark_filter`.
553#[derive(Copy, Clone, Eq, PartialEq, Debug)]
554pub struct ParkToken(pub usize);
555
556/// A default unpark token to use.
557pub const DEFAULT_UNPARK_TOKEN: UnparkToken = UnparkToken(0);
558
559/// A default park token to use.
560pub const DEFAULT_PARK_TOKEN: ParkToken = ParkToken(0);
561
562/// Parks the current thread in the queue associated with the given key.
563///
564/// The `validate` function is called while the queue is locked and can abort
565/// the operation by returning false. If `validate` returns true then the
566/// current thread is appended to the queue and the queue is unlocked.
567///
568/// The `before_sleep` function is called after the queue is unlocked but before
569/// the thread is put to sleep. The thread will then sleep until it is unparked
570/// or the given timeout is reached.
571///
572/// The `timed_out` function is also called while the queue is locked, but only
573/// if the timeout was reached. It is passed the key of the queue it was in when
574/// it timed out, which may be different from the original key if
575/// `unpark_requeue` was called. It is also passed a bool which indicates
576/// whether it was the last thread in the queue.
577///
578/// # Safety
579///
580/// You should only call this function with an address that you control, since
581/// you could otherwise interfere with the operation of other synchronization
582/// primitives.
583///
584/// The `validate` and `timed_out` functions are called while the queue is
585/// locked and must not panic or call into any function in `parking_lot`.
586///
587/// The `before_sleep` function is called outside the queue lock and is allowed
588/// to call `unpark_one`, `unpark_all`, `unpark_requeue` or `unpark_filter`, but
589/// it is not allowed to call `park` or panic.
590#[inline]
591pub unsafe fn park(
592    key: usize,
593    validate: impl FnOnce() -> bool,
594    before_sleep: impl FnOnce(),
595    timed_out: impl FnOnce(usize, bool),
596    park_token: ParkToken,
597    timeout: Option<Instant>,
598) -> ParkResult {
599    // Grab our thread data, this also ensures that the hash table exists
600    with_thread_data(|thread_data| {
601        // Lock the bucket for the given key
602        let bucket = lock_bucket(key);
603
604        // If the validation function fails, just return
605        if !validate() {
606            // SAFETY: We hold the lock here, as required
607            bucket.mutex.unlock();
608            return ParkResult::Invalid;
609        }
610
611        // Append our thread data to the queue and unlock the bucket
612        thread_data.parked_with_timeout.set(timeout.is_some());
613        thread_data.next_in_queue.set(ptr::null());
614        thread_data.key.store(key, Ordering::Relaxed);
615        thread_data.park_token.set(park_token);
616        thread_data.parker.prepare_park();
617        if !bucket.queue_head.get().is_null() {
618            (*bucket.queue_tail.get()).next_in_queue.set(thread_data);
619        } else {
620            bucket.queue_head.set(thread_data);
621        }
622        bucket.queue_tail.set(thread_data);
623        // SAFETY: We hold the lock here, as required
624        bucket.mutex.unlock();
625
626        // Invoke the pre-sleep callback
627        before_sleep();
628
629        // Park our thread and determine whether we were woken up by an unpark
630        // or by our timeout. Note that this isn't precise: we can still be
631        // unparked since we are still in the queue.
632        let unparked = match timeout {
633            Some(timeout) => thread_data.parker.park_until(timeout),
634            None => {
635                thread_data.parker.park();
636                // call deadlock detection on_unpark hook
637                deadlock::on_unpark(thread_data);
638                true
639            }
640        };
641
642        // If we were unparked, return now
643        if unparked {
644            return ParkResult::Unparked(thread_data.unpark_token.get());
645        }
646
647        // Lock our bucket again. Note that the hashtable may have been rehashed in
648        // the meantime. Our key may also have changed if we were requeued.
649        let (key, bucket) = lock_bucket_checked(&thread_data.key);
650
651        // Now we need to check again if we were unparked or timed out. Unlike the
652        // last check this is precise because we hold the bucket lock.
653        if !thread_data.parker.timed_out() {
654            // SAFETY: We hold the lock here, as required
655            bucket.mutex.unlock();
656            return ParkResult::Unparked(thread_data.unpark_token.get());
657        }
658
659        // We timed out, so we now need to remove our thread from the queue
660        let mut link = &bucket.queue_head;
661        let mut current = bucket.queue_head.get();
662        let mut previous = ptr::null();
663        let mut was_last_thread = true;
664        while !current.is_null() {
665            if current == thread_data {
666                let next = (*current).next_in_queue.get();
667                link.set(next);
668                if bucket.queue_tail.get() == current {
669                    bucket.queue_tail.set(previous);
670                } else {
671                    // Scan the rest of the queue to see if there are any other
672                    // entries with the given key.
673                    let mut scan = next;
674                    while !scan.is_null() {
675                        if (*scan).key.load(Ordering::Relaxed) == key {
676                            was_last_thread = false;
677                            break;
678                        }
679                        scan = (*scan).next_in_queue.get();
680                    }
681                }
682
683                // Callback to indicate that we timed out, and whether we were the
684                // last thread on the queue.
685                timed_out(key, was_last_thread);
686                break;
687            } else {
688                if (*current).key.load(Ordering::Relaxed) == key {
689                    was_last_thread = false;
690                }
691                link = &(*current).next_in_queue;
692                previous = current;
693                current = link.get();
694            }
695        }
696
697        // There should be no way for our thread to have been removed from the queue
698        // if we timed out.
699        debug_assert!(!current.is_null());
700
701        // Unlock the bucket, we are done
702        // SAFETY: We hold the lock here, as required
703        bucket.mutex.unlock();
704        ParkResult::TimedOut
705    })
706}
707
708/// Unparks one thread from the queue associated with the given key.
709///
710/// The `callback` function is called while the queue is locked and before the
711/// target thread is woken up. The `UnparkResult` argument to the function
712/// indicates whether a thread was found in the queue and whether this was the
713/// last thread in the queue. This value is also returned by `unpark_one`.
714///
715/// The `callback` function should return an `UnparkToken` value which will be
716/// passed to the thread that is unparked. If no thread is unparked then the
717/// returned value is ignored.
718///
719/// # Safety
720///
721/// You should only call this function with an address that you control, since
722/// you could otherwise interfere with the operation of other synchronization
723/// primitives.
724///
725/// The `callback` function is called while the queue is locked and must not
726/// panic or call into any function in `parking_lot`.
727///
728/// The `parking_lot` functions are not re-entrant and calling this method
729/// from the context of an asynchronous signal handler may result in undefined
730/// behavior, including corruption of internal state and/or deadlocks.
731#[inline]
732pub unsafe fn unpark_one(
733    key: usize,
734    callback: impl FnOnce(UnparkResult) -> UnparkToken,
735) -> UnparkResult {
736    // Lock the bucket for the given key
737    let bucket = lock_bucket(key);
738
739    // Find a thread with a matching key and remove it from the queue
740    let mut link = &bucket.queue_head;
741    let mut current = bucket.queue_head.get();
742    let mut previous = ptr::null();
743    let mut result = UnparkResult::default();
744    while !current.is_null() {
745        if (*current).key.load(Ordering::Relaxed) == key {
746            // Remove the thread from the queue
747            let next = (*current).next_in_queue.get();
748            link.set(next);
749            if bucket.queue_tail.get() == current {
750                bucket.queue_tail.set(previous);
751            } else {
752                // Scan the rest of the queue to see if there are any other
753                // entries with the given key.
754                let mut scan = next;
755                while !scan.is_null() {
756                    if (*scan).key.load(Ordering::Relaxed) == key {
757                        result.have_more_threads = true;
758                        break;
759                    }
760                    scan = (*scan).next_in_queue.get();
761                }
762            }
763
764            // Invoke the callback before waking up the thread
765            result.unparked_threads = 1;
766            result.be_fair = (*bucket.fair_timeout.get()).should_timeout();
767            let token = callback(result);
768
769            // Set the token for the target thread
770            (*current).unpark_token.set(token);
771
772            // This is a bit tricky: we first lock the ThreadParker to prevent
773            // the thread from exiting and freeing its ThreadData if its wait
774            // times out. Then we unlock the queue since we don't want to keep
775            // the queue locked while we perform a system call. Finally we wake
776            // up the parked thread.
777            let handle = (*current).parker.unpark_lock();
778            // SAFETY: We hold the lock here, as required
779            bucket.mutex.unlock();
780            handle.unpark();
781
782            return result;
783        } else {
784            link = &(*current).next_in_queue;
785            previous = current;
786            current = link.get();
787        }
788    }
789
790    // No threads with a matching key were found in the bucket
791    callback(result);
792    // SAFETY: We hold the lock here, as required
793    bucket.mutex.unlock();
794    result
795}
796
797/// Unparks all threads in the queue associated with the given key.
798///
799/// The given `UnparkToken` is passed to all unparked threads.
800///
801/// This function returns the number of threads that were unparked.
802///
803/// # Safety
804///
805/// You should only call this function with an address that you control, since
806/// you could otherwise interfere with the operation of other synchronization
807/// primitives.
808///
809/// The `parking_lot` functions are not re-entrant and calling this method
810/// from the context of an asynchronous signal handler may result in undefined
811/// behavior, including corruption of internal state and/or deadlocks.
812#[inline]
813pub unsafe fn unpark_all(key: usize, unpark_token: UnparkToken) -> usize {
814    // Lock the bucket for the given key
815    let bucket = lock_bucket(key);
816
817    // Remove all threads with the given key in the bucket
818    let mut link = &bucket.queue_head;
819    let mut current = bucket.queue_head.get();
820    let mut previous = ptr::null();
821    let mut threads = SmallVec::<[_; 8]>::new();
822    while !current.is_null() {
823        if (*current).key.load(Ordering::Relaxed) == key {
824            // Remove the thread from the queue
825            let next = (*current).next_in_queue.get();
826            link.set(next);
827            if bucket.queue_tail.get() == current {
828                bucket.queue_tail.set(previous);
829            }
830
831            // Set the token for the target thread
832            (*current).unpark_token.set(unpark_token);
833
834            // Don't wake up threads while holding the queue lock. See comment
835            // in unpark_one. For now just record which threads we need to wake
836            // up.
837            threads.push((*current).parker.unpark_lock());
838            current = next;
839        } else {
840            link = &(*current).next_in_queue;
841            previous = current;
842            current = link.get();
843        }
844    }
845
846    // Unlock the bucket
847    // SAFETY: We hold the lock here, as required
848    bucket.mutex.unlock();
849
850    // Now that we are outside the lock, wake up all the threads that we removed
851    // from the queue.
852    let num_threads = threads.len();
853    for handle in threads.into_iter() {
854        handle.unpark();
855    }
856
857    num_threads
858}
859
860/// Removes all threads from the queue associated with `key_from`, optionally
861/// unparks the first one and requeues the rest onto the queue associated with
862/// `key_to`.
863///
864/// The `validate` function is called while both queues are locked. Its return
865/// value will determine which operation is performed, or whether the operation
866/// should be aborted. See `RequeueOp` for details about the different possible
867/// return values.
868///
869/// The `callback` function is also called while both queues are locked. It is
870/// passed the `RequeueOp` returned by `validate` and an `UnparkResult`
871/// indicating whether a thread was unparked and whether there are threads still
872/// parked in the new queue. This `UnparkResult` value is also returned by
873/// `unpark_requeue`.
874///
875/// The `callback` function should return an `UnparkToken` value which will be
876/// passed to the thread that is unparked. If no thread is unparked then the
877/// returned value is ignored.
878///
879/// # Safety
880///
881/// You should only call this function with an address that you control, since
882/// you could otherwise interfere with the operation of other synchronization
883/// primitives.
884///
885/// The `validate` and `callback` functions are called while the queue is locked
886/// and must not panic or call into any function in `parking_lot`.
887#[inline]
888pub unsafe fn unpark_requeue(
889    key_from: usize,
890    key_to: usize,
891    validate: impl FnOnce() -> RequeueOp,
892    callback: impl FnOnce(RequeueOp, UnparkResult) -> UnparkToken,
893) -> UnparkResult {
894    // Lock the two buckets for the given key
895    let (bucket_from, bucket_to) = lock_bucket_pair(key_from, key_to);
896
897    // If the validation function fails, just return
898    let mut result = UnparkResult::default();
899    let op = validate();
900    if op == RequeueOp::Abort {
901        // SAFETY: Both buckets are locked, as required.
902        unlock_bucket_pair(bucket_from, bucket_to);
903        return result;
904    }
905
906    // Remove all threads with the given key in the source bucket
907    let mut link = &bucket_from.queue_head;
908    let mut current = bucket_from.queue_head.get();
909    let mut previous = ptr::null();
910    let mut requeue_threads: *const ThreadData = ptr::null();
911    let mut requeue_threads_tail: *const ThreadData = ptr::null();
912    let mut wakeup_thread = None;
913    while !current.is_null() {
914        if (*current).key.load(Ordering::Relaxed) == key_from {
915            // Remove the thread from the queue
916            let next = (*current).next_in_queue.get();
917            link.set(next);
918            if bucket_from.queue_tail.get() == current {
919                bucket_from.queue_tail.set(previous);
920            }
921
922            // Prepare the first thread for wakeup and requeue the rest.
923            if (op == RequeueOp::UnparkOneRequeueRest || op == RequeueOp::UnparkOne)
924                && wakeup_thread.is_none()
925            {
926                wakeup_thread = Some(current);
927                result.unparked_threads = 1;
928            } else {
929                if !requeue_threads.is_null() {
930                    (*requeue_threads_tail).next_in_queue.set(current);
931                } else {
932                    requeue_threads = current;
933                }
934                requeue_threads_tail = current;
935                (*current).key.store(key_to, Ordering::Relaxed);
936                result.requeued_threads += 1;
937            }
938            if op == RequeueOp::UnparkOne || op == RequeueOp::RequeueOne {
939                // Scan the rest of the queue to see if there are any other
940                // entries with the given key.
941                let mut scan = next;
942                while !scan.is_null() {
943                    if (*scan).key.load(Ordering::Relaxed) == key_from {
944                        result.have_more_threads = true;
945                        break;
946                    }
947                    scan = (*scan).next_in_queue.get();
948                }
949                break;
950            }
951            current = next;
952        } else {
953            link = &(*current).next_in_queue;
954            previous = current;
955            current = link.get();
956        }
957    }
958
959    // Add the requeued threads to the destination bucket
960    if !requeue_threads.is_null() {
961        (*requeue_threads_tail).next_in_queue.set(ptr::null());
962        if !bucket_to.queue_head.get().is_null() {
963            (*bucket_to.queue_tail.get())
964                .next_in_queue
965                .set(requeue_threads);
966        } else {
967            bucket_to.queue_head.set(requeue_threads);
968        }
969        bucket_to.queue_tail.set(requeue_threads_tail);
970    }
971
972    // Invoke the callback before waking up the thread
973    if result.unparked_threads != 0 {
974        result.be_fair = (*bucket_from.fair_timeout.get()).should_timeout();
975    }
976    let token = callback(op, result);
977
978    // See comment in unpark_one for why we mess with the locking
979    if let Some(wakeup_thread) = wakeup_thread {
980        (*wakeup_thread).unpark_token.set(token);
981        let handle = (*wakeup_thread).parker.unpark_lock();
982        // SAFETY: Both buckets are locked, as required.
983        unlock_bucket_pair(bucket_from, bucket_to);
984        handle.unpark();
985    } else {
986        // SAFETY: Both buckets are locked, as required.
987        unlock_bucket_pair(bucket_from, bucket_to);
988    }
989
990    result
991}
992
993/// Unparks a number of threads from the front of the queue associated with
994/// `key` depending on the results of a filter function which inspects the
995/// `ParkToken` associated with each thread.
996///
997/// The `filter` function is called for each thread in the queue or until
998/// `FilterOp::Stop` is returned. This function is passed the `ParkToken`
999/// associated with a particular thread, which is unparked if `FilterOp::Unpark`
1000/// is returned.
1001///
1002/// The `callback` function is also called while both queues are locked. It is
1003/// passed an `UnparkResult` indicating the number of threads that were unparked
1004/// and whether there are still parked threads in the queue. This `UnparkResult`
1005/// value is also returned by `unpark_filter`.
1006///
1007/// The `callback` function should return an `UnparkToken` value which will be
1008/// passed to all threads that are unparked. If no thread is unparked then the
1009/// returned value is ignored.
1010///
1011/// # Safety
1012///
1013/// You should only call this function with an address that you control, since
1014/// you could otherwise interfere with the operation of other synchronization
1015/// primitives.
1016///
1017/// The `filter` and `callback` functions are called while the queue is locked
1018/// and must not panic or call into any function in `parking_lot`.
1019#[inline]
1020pub unsafe fn unpark_filter(
1021    key: usize,
1022    mut filter: impl FnMut(ParkToken) -> FilterOp,
1023    callback: impl FnOnce(UnparkResult) -> UnparkToken,
1024) -> UnparkResult {
1025    // Lock the bucket for the given key
1026    let bucket = lock_bucket(key);
1027
1028    // Go through the queue looking for threads with a matching key
1029    let mut link = &bucket.queue_head;
1030    let mut current = bucket.queue_head.get();
1031    let mut previous = ptr::null();
1032    let mut threads = SmallVec::<[_; 8]>::new();
1033    let mut result = UnparkResult::default();
1034    while !current.is_null() {
1035        if (*current).key.load(Ordering::Relaxed) == key {
1036            // Call the filter function with the thread's ParkToken
1037            let next = (*current).next_in_queue.get();
1038            match filter((*current).park_token.get()) {
1039                FilterOp::Unpark => {
1040                    // Remove the thread from the queue
1041                    link.set(next);
1042                    if bucket.queue_tail.get() == current {
1043                        bucket.queue_tail.set(previous);
1044                    }
1045
1046                    // Add the thread to our list of threads to unpark
1047                    threads.push((current, None));
1048
1049                    current = next;
1050                }
1051                FilterOp::Skip => {
1052                    result.have_more_threads = true;
1053                    link = &(*current).next_in_queue;
1054                    previous = current;
1055                    current = link.get();
1056                }
1057                FilterOp::Stop => {
1058                    result.have_more_threads = true;
1059                    break;
1060                }
1061            }
1062        } else {
1063            link = &(*current).next_in_queue;
1064            previous = current;
1065            current = link.get();
1066        }
1067    }
1068
1069    // Invoke the callback before waking up the threads
1070    result.unparked_threads = threads.len();
1071    if result.unparked_threads != 0 {
1072        result.be_fair = (*bucket.fair_timeout.get()).should_timeout();
1073    }
1074    let token = callback(result);
1075
1076    // Pass the token to all threads that are going to be unparked and prepare
1077    // them for unparking.
1078    for t in threads.iter_mut() {
1079        (*t.0).unpark_token.set(token);
1080        t.1 = Some((*t.0).parker.unpark_lock());
1081    }
1082
1083    // SAFETY: We hold the lock here, as required
1084    bucket.mutex.unlock();
1085
1086    // Now that we are outside the lock, wake up all the threads that we removed
1087    // from the queue.
1088    for (_, handle) in threads.into_iter() {
1089        handle.unchecked_unwrap().unpark();
1090    }
1091
1092    result
1093}
1094
1095/// \[Experimental\] Deadlock detection
1096///
1097/// Enabled via the `deadlock_detection` feature flag.
1098pub mod deadlock {
1099    #[cfg(feature = "deadlock_detection")]
1100    use super::deadlock_impl;
1101
1102    #[cfg(feature = "deadlock_detection")]
1103    pub(super) use super::deadlock_impl::DeadlockData;
1104
1105    /// Acquire a resource identified by key in the deadlock detector
1106    /// Noop if deadlock_detection feature isn't enabled.
1107    ///
1108    /// # Safety
1109    ///
1110    /// Call after the resource is acquired
1111    #[inline]
1112    pub unsafe fn acquire_resource(_key: usize) {
1113        #[cfg(feature = "deadlock_detection")]
1114        deadlock_impl::acquire_resource(_key);
1115    }
1116
1117    /// Release a resource identified by key in the deadlock detector.
1118    /// Noop if deadlock_detection feature isn't enabled.
1119    ///
1120    /// # Panics
1121    ///
1122    /// Panics if the resource was already released or wasn't acquired in this thread.
1123    ///
1124    /// # Safety
1125    ///
1126    /// Call before the resource is released
1127    #[inline]
1128    pub unsafe fn release_resource(_key: usize) {
1129        #[cfg(feature = "deadlock_detection")]
1130        deadlock_impl::release_resource(_key);
1131    }
1132
1133    /// Returns all deadlocks detected *since* the last call.
1134    /// Each cycle consist of a vector of `DeadlockedThread`.
1135    #[cfg(feature = "deadlock_detection")]
1136    #[inline]
1137    pub fn check_deadlock() -> Vec<Vec<deadlock_impl::DeadlockedThread>> {
1138        deadlock_impl::check_deadlock()
1139    }
1140
1141    #[inline]
1142    pub(super) unsafe fn on_unpark(_td: &super::ThreadData) {
1143        #[cfg(feature = "deadlock_detection")]
1144        deadlock_impl::on_unpark(_td);
1145    }
1146}
1147
1148#[cfg(feature = "deadlock_detection")]
1149mod deadlock_impl {
1150    use super::{get_hashtable, lock_bucket, with_thread_data, ThreadData, NUM_THREADS};
1151    use crate::thread_parker::{ThreadParkerT, UnparkHandleT};
1152    use crate::word_lock::WordLock;
1153    use backtrace::Backtrace;
1154    use petgraph;
1155    use petgraph::graphmap::DiGraphMap;
1156    use std::cell::{Cell, UnsafeCell};
1157    use std::collections::HashSet;
1158    use std::sync::atomic::Ordering;
1159    use std::sync::mpsc;
1160    use thread_id;
1161
1162    /// Representation of a deadlocked thread
1163    pub struct DeadlockedThread {
1164        thread_id: usize,
1165        backtrace: Backtrace,
1166    }
1167
1168    impl DeadlockedThread {
1169        /// The system thread id
1170        pub fn thread_id(&self) -> usize {
1171            self.thread_id
1172        }
1173
1174        /// The thread backtrace
1175        pub fn backtrace(&self) -> &Backtrace {
1176            &self.backtrace
1177        }
1178    }
1179
1180    pub struct DeadlockData {
1181        // Currently owned resources (keys)
1182        resources: UnsafeCell<Vec<usize>>,
1183
1184        // Set when there's a pending callstack request
1185        deadlocked: Cell<bool>,
1186
1187        // Sender used to report the backtrace
1188        backtrace_sender: UnsafeCell<Option<mpsc::Sender<DeadlockedThread>>>,
1189
1190        // System thread id
1191        thread_id: usize,
1192    }
1193
1194    impl DeadlockData {
1195        pub fn new() -> Self {
1196            DeadlockData {
1197                resources: UnsafeCell::new(Vec::new()),
1198                deadlocked: Cell::new(false),
1199                backtrace_sender: UnsafeCell::new(None),
1200                thread_id: thread_id::get(),
1201            }
1202        }
1203    }
1204
1205    pub(super) unsafe fn on_unpark(td: &ThreadData) {
1206        if td.deadlock_data.deadlocked.get() {
1207            let sender = (*td.deadlock_data.backtrace_sender.get()).take().unwrap();
1208            sender
1209                .send(DeadlockedThread {
1210                    thread_id: td.deadlock_data.thread_id,
1211                    backtrace: Backtrace::new(),
1212                })
1213                .unwrap();
1214            // make sure to close this sender
1215            drop(sender);
1216
1217            // park until the end of the time
1218            td.parker.prepare_park();
1219            td.parker.park();
1220            unreachable!("unparked deadlocked thread!");
1221        }
1222    }
1223
1224    pub unsafe fn acquire_resource(key: usize) {
1225        with_thread_data(|thread_data| {
1226            (*thread_data.deadlock_data.resources.get()).push(key);
1227        });
1228    }
1229
1230    pub unsafe fn release_resource(key: usize) {
1231        with_thread_data(|thread_data| {
1232            let resources = &mut (*thread_data.deadlock_data.resources.get());
1233
1234            // There is only one situation where we can fail to find the
1235            // resource: we are currently running TLS destructors and our
1236            // ThreadData has already been freed. There isn't much we can do
1237            // about it at this point, so just ignore it.
1238            if let Some(p) = resources.iter().rposition(|x| *x == key) {
1239                resources.swap_remove(p);
1240            }
1241        });
1242    }
1243
1244    pub fn check_deadlock() -> Vec<Vec<DeadlockedThread>> {
1245        unsafe {
1246            // fast pass
1247            if check_wait_graph_fast() {
1248                // double check
1249                check_wait_graph_slow()
1250            } else {
1251                Vec::new()
1252            }
1253        }
1254    }
1255
1256    // Simple algorithm that builds a wait graph f the threads and the resources,
1257    // then checks for the presence of cycles (deadlocks).
1258    // This variant isn't precise as it doesn't lock the entire table before checking
1259    unsafe fn check_wait_graph_fast() -> bool {
1260        let table = get_hashtable();
1261        let thread_count = NUM_THREADS.load(Ordering::Relaxed);
1262        let mut graph = DiGraphMap::<usize, ()>::with_capacity(thread_count * 2, thread_count * 2);
1263
1264        for b in &(*table).entries[..] {
1265            b.mutex.lock();
1266            let mut current = b.queue_head.get();
1267            while !current.is_null() {
1268                if !(*current).parked_with_timeout.get()
1269                    && !(*current).deadlock_data.deadlocked.get()
1270                {
1271                    // .resources are waiting for their owner
1272                    for &resource in &(*(*current).deadlock_data.resources.get()) {
1273                        graph.add_edge(resource, current as usize, ());
1274                    }
1275                    // owner waits for resource .key
1276                    graph.add_edge(current as usize, (*current).key.load(Ordering::Relaxed), ());
1277                }
1278                current = (*current).next_in_queue.get();
1279            }
1280            // SAFETY: We hold the lock here, as required
1281            b.mutex.unlock();
1282        }
1283
1284        petgraph::algo::is_cyclic_directed(&graph)
1285    }
1286
1287    #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Copy, Clone)]
1288    enum WaitGraphNode {
1289        Thread(*const ThreadData),
1290        Resource(usize),
1291    }
1292
1293    use self::WaitGraphNode::*;
1294
1295    // Contrary to the _fast variant this locks the entries table before looking for cycles.
1296    // Returns all detected thread wait cycles.
1297    // Note that once a cycle is reported it's never reported again.
1298    unsafe fn check_wait_graph_slow() -> Vec<Vec<DeadlockedThread>> {
1299        static DEADLOCK_DETECTION_LOCK: WordLock = WordLock::new();
1300        DEADLOCK_DETECTION_LOCK.lock();
1301
1302        let mut table = get_hashtable();
1303        loop {
1304            // Lock all buckets in the old table
1305            for b in &table.entries[..] {
1306                b.mutex.lock();
1307            }
1308
1309            // Now check if our table is still the latest one. Another thread could
1310            // have grown the hash table between us getting and locking the hash table.
1311            let new_table = get_hashtable();
1312            if new_table as *const _ == table as *const _ {
1313                break;
1314            }
1315
1316            // Unlock buckets and try again
1317            for b in &table.entries[..] {
1318                // SAFETY: We hold the lock here, as required
1319                b.mutex.unlock();
1320            }
1321
1322            table = new_table;
1323        }
1324
1325        let thread_count = NUM_THREADS.load(Ordering::Relaxed);
1326        let mut graph =
1327            DiGraphMap::<WaitGraphNode, ()>::with_capacity(thread_count * 2, thread_count * 2);
1328
1329        for b in &table.entries[..] {
1330            let mut current = b.queue_head.get();
1331            while !current.is_null() {
1332                if !(*current).parked_with_timeout.get()
1333                    && !(*current).deadlock_data.deadlocked.get()
1334                {
1335                    // .resources are waiting for their owner
1336                    for &resource in &(*(*current).deadlock_data.resources.get()) {
1337                        graph.add_edge(Resource(resource), Thread(current), ());
1338                    }
1339                    // owner waits for resource .key
1340                    graph.add_edge(
1341                        Thread(current),
1342                        Resource((*current).key.load(Ordering::Relaxed)),
1343                        (),
1344                    );
1345                }
1346                current = (*current).next_in_queue.get();
1347            }
1348        }
1349
1350        for b in &table.entries[..] {
1351            // SAFETY: We hold the lock here, as required
1352            b.mutex.unlock();
1353        }
1354
1355        // find cycles
1356        let cycles = graph_cycles(&graph);
1357
1358        let mut results = Vec::with_capacity(cycles.len());
1359
1360        for cycle in cycles {
1361            let (sender, receiver) = mpsc::channel();
1362            for td in cycle {
1363                let bucket = lock_bucket((*td).key.load(Ordering::Relaxed));
1364                (*td).deadlock_data.deadlocked.set(true);
1365                *(*td).deadlock_data.backtrace_sender.get() = Some(sender.clone());
1366                let handle = (*td).parker.unpark_lock();
1367                // SAFETY: We hold the lock here, as required
1368                bucket.mutex.unlock();
1369                // unpark the deadlocked thread!
1370                // on unpark it'll notice the deadlocked flag and report back
1371                handle.unpark();
1372            }
1373            // make sure to drop our sender before collecting results
1374            drop(sender);
1375            results.push(receiver.iter().collect());
1376        }
1377
1378        DEADLOCK_DETECTION_LOCK.unlock();
1379
1380        results
1381    }
1382
1383    // normalize a cycle to start with the "smallest" node
1384    fn normalize_cycle<T: Ord + Copy + Clone>(input: &[T]) -> Vec<T> {
1385        let min_pos = input
1386            .iter()
1387            .enumerate()
1388            .min_by_key(|&(_, &t)| t)
1389            .map(|(p, _)| p)
1390            .unwrap_or(0);
1391        input
1392            .iter()
1393            .cycle()
1394            .skip(min_pos)
1395            .take(input.len())
1396            .cloned()
1397            .collect()
1398    }
1399
1400    // returns all thread cycles in the wait graph
1401    fn graph_cycles(g: &DiGraphMap<WaitGraphNode, ()>) -> Vec<Vec<*const ThreadData>> {
1402        use petgraph::visit::depth_first_search;
1403        use petgraph::visit::DfsEvent;
1404        use petgraph::visit::NodeIndexable;
1405
1406        let mut cycles = HashSet::new();
1407        let mut path = Vec::with_capacity(g.node_bound());
1408        // start from threads to get the correct threads cycle
1409        let threads = g
1410            .nodes()
1411            .filter(|n| if let &Thread(_) = n { true } else { false });
1412
1413        depth_first_search(g, threads, |e| match e {
1414            DfsEvent::Discover(Thread(n), _) => path.push(n),
1415            DfsEvent::Finish(Thread(_), _) => {
1416                path.pop();
1417            }
1418            DfsEvent::BackEdge(_, Thread(n)) => {
1419                let from = path.iter().rposition(|&i| i == n).unwrap();
1420                cycles.insert(normalize_cycle(&path[from..]));
1421            }
1422            _ => (),
1423        });
1424
1425        cycles.iter().cloned().collect()
1426    }
1427}
1428
1429#[cfg(test)]
1430mod tests {
1431    use super::{ThreadData, DEFAULT_PARK_TOKEN, DEFAULT_UNPARK_TOKEN};
1432    use std::{
1433        ptr,
1434        sync::{
1435            atomic::{AtomicIsize, AtomicPtr, AtomicUsize, Ordering},
1436            Arc,
1437        },
1438        thread,
1439        time::Duration,
1440    };
1441
1442    /// Calls a closure for every `ThreadData` currently parked on a given key
1443    fn for_each(key: usize, mut f: impl FnMut(&ThreadData)) {
1444        let bucket = super::lock_bucket(key);
1445
1446        let mut current: *const ThreadData = bucket.queue_head.get();
1447        while !current.is_null() {
1448            let current_ref = unsafe { &*current };
1449            if current_ref.key.load(Ordering::Relaxed) == key {
1450                f(current_ref);
1451            }
1452            current = current_ref.next_in_queue.get();
1453        }
1454
1455        // SAFETY: We hold the lock here, as required
1456        unsafe { bucket.mutex.unlock() };
1457    }
1458
1459    macro_rules! test {
1460        ( $( $name:ident(
1461            repeats: $repeats:expr,
1462            latches: $latches:expr,
1463            delay: $delay:expr,
1464            threads: $threads:expr,
1465            single_unparks: $single_unparks:expr);
1466        )* ) => {
1467            $(#[test]
1468            fn $name() {
1469                let delay = Duration::from_micros($delay);
1470                for _ in 0..$repeats {
1471                    run_parking_test($latches, delay, $threads, $single_unparks);
1472                }
1473            })*
1474        };
1475    }
1476
1477    test! {
1478        unpark_all_one_fast(
1479            repeats: 1000, latches: 1, delay: 0, threads: 1, single_unparks: 0
1480        );
1481        unpark_all_hundred_fast(
1482            repeats: 100, latches: 1, delay: 0, threads: 100, single_unparks: 0
1483        );
1484        unpark_one_one_fast(
1485            repeats: 1000, latches: 1, delay: 0, threads: 1, single_unparks: 1
1486        );
1487        unpark_one_hundred_fast(
1488            repeats: 20, latches: 1, delay: 0, threads: 100, single_unparks: 100
1489        );
1490        unpark_one_fifty_then_fifty_all_fast(
1491            repeats: 50, latches: 1, delay: 0, threads: 100, single_unparks: 50
1492        );
1493        unpark_all_one(
1494            repeats: 100, latches: 1, delay: 10000, threads: 1, single_unparks: 0
1495        );
1496        unpark_all_hundred(
1497            repeats: 100, latches: 1, delay: 10000, threads: 100, single_unparks: 0
1498        );
1499        unpark_one_one(
1500            repeats: 10, latches: 1, delay: 10000, threads: 1, single_unparks: 1
1501        );
1502        unpark_one_fifty(
1503            repeats: 1, latches: 1, delay: 10000, threads: 50, single_unparks: 50
1504        );
1505        unpark_one_fifty_then_fifty_all(
1506            repeats: 2, latches: 1, delay: 10000, threads: 100, single_unparks: 50
1507        );
1508        hundred_unpark_all_one_fast(
1509            repeats: 100, latches: 100, delay: 0, threads: 1, single_unparks: 0
1510        );
1511        hundred_unpark_all_one(
1512            repeats: 1, latches: 100, delay: 10000, threads: 1, single_unparks: 0
1513        );
1514    }
1515
1516    fn run_parking_test(
1517        num_latches: usize,
1518        delay: Duration,
1519        num_threads: usize,
1520        num_single_unparks: usize,
1521    ) {
1522        let mut tests = Vec::with_capacity(num_latches);
1523
1524        for _ in 0..num_latches {
1525            let test = Arc::new(SingleLatchTest::new(num_threads));
1526            let mut threads = Vec::with_capacity(num_threads);
1527            for _ in 0..num_threads {
1528                let test = test.clone();
1529                threads.push(thread::spawn(move || test.run()));
1530            }
1531            tests.push((test, threads));
1532        }
1533
1534        for unpark_index in 0..num_single_unparks {
1535            thread::sleep(delay);
1536            for (test, _) in &tests {
1537                test.unpark_one(unpark_index);
1538            }
1539        }
1540
1541        for (test, threads) in tests {
1542            test.finish(num_single_unparks);
1543            for thread in threads {
1544                thread.join().expect("Test thread panic");
1545            }
1546        }
1547    }
1548
1549    struct SingleLatchTest {
1550        semaphore: AtomicIsize,
1551        num_awake: AtomicUsize,
1552        /// Holds the pointer to the last *unprocessed* woken up thread.
1553        last_awoken: AtomicPtr<ThreadData>,
1554        /// Total number of threads participating in this test.
1555        num_threads: usize,
1556    }
1557
1558    impl SingleLatchTest {
1559        pub fn new(num_threads: usize) -> Self {
1560            Self {
1561                // This implements a fair (FIFO) semaphore, and it starts out unavailable.
1562                semaphore: AtomicIsize::new(0),
1563                num_awake: AtomicUsize::new(0),
1564                last_awoken: AtomicPtr::new(ptr::null_mut()),
1565                num_threads,
1566            }
1567        }
1568
1569        pub fn run(&self) {
1570            // Get one slot from the semaphore
1571            self.down();
1572
1573            // Report back to the test verification code that this thread woke up
1574            let this_thread_ptr = super::with_thread_data(|t| t as *const _ as *mut _);
1575            self.last_awoken.store(this_thread_ptr, Ordering::SeqCst);
1576            self.num_awake.fetch_add(1, Ordering::SeqCst);
1577        }
1578
1579        pub fn unpark_one(&self, single_unpark_index: usize) {
1580            // last_awoken should be null at all times except between self.up() and at the bottom
1581            // of this method where it's reset to null again
1582            assert!(self.last_awoken.load(Ordering::SeqCst).is_null());
1583
1584            let mut queue: Vec<*mut ThreadData> = Vec::with_capacity(self.num_threads);
1585            for_each(self.semaphore_addr(), |thread_data| {
1586                queue.push(thread_data as *const _ as *mut _);
1587            });
1588            assert!(queue.len() <= self.num_threads - single_unpark_index);
1589
1590            let num_awake_before_up = self.num_awake.load(Ordering::SeqCst);
1591
1592            self.up();
1593
1594            // Wait for a parked thread to wake up and update num_awake + last_awoken.
1595            while self.num_awake.load(Ordering::SeqCst) != num_awake_before_up + 1 {
1596                thread::yield_now();
1597            }
1598
1599            // At this point the other thread should have set last_awoken inside the run() method
1600            let last_awoken = self.last_awoken.load(Ordering::SeqCst);
1601            assert!(!last_awoken.is_null());
1602            if !queue.is_empty() && queue[0] != last_awoken {
1603                panic!(
1604                    "Woke up wrong thread:\n\tqueue: {:?}\n\tlast awoken: {:?}",
1605                    queue, last_awoken
1606                );
1607            }
1608            self.last_awoken.store(ptr::null_mut(), Ordering::SeqCst);
1609        }
1610
1611        pub fn finish(&self, num_single_unparks: usize) {
1612            // The amount of threads not unparked via unpark_one
1613            let mut num_threads_left = self.num_threads.checked_sub(num_single_unparks).unwrap();
1614
1615            // Wake remaining threads up with unpark_all. Has to be in a loop, because there might
1616            // still be threads that has not yet parked.
1617            while num_threads_left > 0 {
1618                let mut num_waiting_on_address = 0;
1619                for_each(self.semaphore_addr(), |_thread_data| {
1620                    num_waiting_on_address += 1;
1621                });
1622                assert!(num_waiting_on_address <= num_threads_left);
1623
1624                let num_awake_before_unpark = self.num_awake.load(Ordering::SeqCst);
1625
1626                let num_unparked =
1627                    unsafe { super::unpark_all(self.semaphore_addr(), DEFAULT_UNPARK_TOKEN) };
1628                assert!(num_unparked >= num_waiting_on_address);
1629                assert!(num_unparked <= num_threads_left);
1630
1631                // Wait for all unparked threads to wake up and update num_awake + last_awoken.
1632                while self.num_awake.load(Ordering::SeqCst)
1633                    != num_awake_before_unpark + num_unparked
1634                {
1635                    thread::yield_now()
1636                }
1637
1638                num_threads_left = num_threads_left.checked_sub(num_unparked).unwrap();
1639            }
1640            // By now, all threads should have been woken up
1641            assert_eq!(self.num_awake.load(Ordering::SeqCst), self.num_threads);
1642
1643            // Make sure no thread is parked on our semaphore address
1644            let mut num_waiting_on_address = 0;
1645            for_each(self.semaphore_addr(), |_thread_data| {
1646                num_waiting_on_address += 1;
1647            });
1648            assert_eq!(num_waiting_on_address, 0);
1649        }
1650
1651        pub fn down(&self) {
1652            let old_semaphore_value = self.semaphore.fetch_sub(1, Ordering::SeqCst);
1653
1654            if old_semaphore_value > 0 {
1655                // We acquired the semaphore. Done.
1656                return;
1657            }
1658
1659            // We need to wait.
1660            let validate = || true;
1661            let before_sleep = || {};
1662            let timed_out = |_, _| {};
1663            unsafe {
1664                super::park(
1665                    self.semaphore_addr(),
1666                    validate,
1667                    before_sleep,
1668                    timed_out,
1669                    DEFAULT_PARK_TOKEN,
1670                    None,
1671                );
1672            }
1673        }
1674
1675        pub fn up(&self) {
1676            let old_semaphore_value = self.semaphore.fetch_add(1, Ordering::SeqCst);
1677
1678            // Check if anyone was waiting on the semaphore. If they were, then pass ownership to them.
1679            if old_semaphore_value < 0 {
1680                // We need to continue until we have actually unparked someone. It might be that
1681                // the thread we want to pass ownership to has decremented the semaphore counter,
1682                // but not yet parked.
1683                loop {
1684                    match unsafe {
1685                        super::unpark_one(self.semaphore_addr(), |_| DEFAULT_UNPARK_TOKEN)
1686                            .unparked_threads
1687                    } {
1688                        1 => break,
1689                        0 => (),
1690                        i => panic!("Should not wake up {} threads", i),
1691                    }
1692                }
1693            }
1694        }
1695
1696        fn semaphore_addr(&self) -> usize {
1697            &self.semaphore as *const _ as usize
1698        }
1699    }
1700}