parking_lot_core/
word_lock.rs

1// Copyright 2016 Amanieu d'Antras
2//
3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. This file may not be
6// copied, modified, or distributed except according to those terms.
7
8use crate::spinwait::SpinWait;
9use crate::thread_parker::{ThreadParker, ThreadParkerT, UnparkHandleT};
10use core::{
11    cell::Cell,
12    mem, ptr,
13    sync::atomic::{fence, AtomicUsize, Ordering},
14};
15
16struct ThreadData {
17    parker: ThreadParker,
18
19    // Linked list of threads in the queue. The queue is split into two parts:
20    // the processed part and the unprocessed part. When new nodes are added to
21    // the list, they only have the next pointer set, and queue_tail is null.
22    //
23    // Nodes are processed with the queue lock held, which consists of setting
24    // the prev pointer for each node and setting the queue_tail pointer on the
25    // first processed node of the list.
26    //
27    // This setup allows nodes to be added to the queue without a lock, while
28    // still allowing O(1) removal of nodes from the processed part of the list.
29    // The only cost is the O(n) processing, but this only needs to be done
30    // once for each node, and therefore isn't too expensive.
31    queue_tail: Cell<*const ThreadData>,
32    prev: Cell<*const ThreadData>,
33    next: Cell<*const ThreadData>,
34}
35
36impl ThreadData {
37    #[inline]
38    fn new() -> ThreadData {
39        assert!(mem::align_of::<ThreadData>() > !QUEUE_MASK);
40        ThreadData {
41            parker: ThreadParker::new(),
42            queue_tail: Cell::new(ptr::null()),
43            prev: Cell::new(ptr::null()),
44            next: Cell::new(ptr::null()),
45        }
46    }
47}
48
49// Invokes the given closure with a reference to the current thread `ThreadData`.
50#[inline]
51fn with_thread_data<T>(f: impl FnOnce(&ThreadData) -> T) -> T {
52    let mut thread_data_ptr = ptr::null();
53    // If ThreadData is expensive to construct, then we want to use a cached
54    // version in thread-local storage if possible.
55    if !ThreadParker::IS_CHEAP_TO_CONSTRUCT {
56        thread_local!(static THREAD_DATA: ThreadData = ThreadData::new());
57        if let Ok(tls_thread_data) = THREAD_DATA.try_with(|x| x as *const ThreadData) {
58            thread_data_ptr = tls_thread_data;
59        }
60    }
61    // Otherwise just create a ThreadData on the stack
62    let mut thread_data_storage = None;
63    if thread_data_ptr.is_null() {
64        thread_data_ptr = thread_data_storage.get_or_insert_with(ThreadData::new);
65    }
66
67    f(unsafe { &*thread_data_ptr })
68}
69
70const LOCKED_BIT: usize = 1;
71const QUEUE_LOCKED_BIT: usize = 2;
72const QUEUE_MASK: usize = !3;
73
74// Word-sized lock that is used to implement the parking_lot API. Since this
75// can't use parking_lot, it instead manages its own queue of waiting threads.
76pub struct WordLock {
77    state: AtomicUsize,
78}
79
80impl WordLock {
81    /// Returns a new, unlocked, WordLock.
82    pub const fn new() -> Self {
83        WordLock {
84            state: AtomicUsize::new(0),
85        }
86    }
87
88    #[inline]
89    pub fn lock(&self) {
90        if self
91            .state
92            .compare_exchange_weak(0, LOCKED_BIT, Ordering::Acquire, Ordering::Relaxed)
93            .is_ok()
94        {
95            return;
96        }
97        self.lock_slow();
98    }
99
100    /// Must not be called on an already unlocked `WordLock`!
101    #[inline]
102    pub unsafe fn unlock(&self) {
103        let state = self.state.fetch_sub(LOCKED_BIT, Ordering::Release);
104        if state.is_queue_locked() || state.queue_head().is_null() {
105            return;
106        }
107        self.unlock_slow();
108    }
109
110    #[cold]
111    fn lock_slow(&self) {
112        let mut spinwait = SpinWait::new();
113        let mut state = self.state.load(Ordering::Relaxed);
114        loop {
115            // Grab the lock if it isn't locked, even if there is a queue on it
116            if !state.is_locked() {
117                match self.state.compare_exchange_weak(
118                    state,
119                    state | LOCKED_BIT,
120                    Ordering::Acquire,
121                    Ordering::Relaxed,
122                ) {
123                    Ok(_) => return,
124                    Err(x) => state = x,
125                }
126                continue;
127            }
128
129            // If there is no queue, try spinning a few times
130            if state.queue_head().is_null() && spinwait.spin() {
131                state = self.state.load(Ordering::Relaxed);
132                continue;
133            }
134
135            // Get our thread data and prepare it for parking
136            state = with_thread_data(|thread_data| {
137                // The pthread implementation is still unsafe, so we need to surround `prepare_park`
138                // with `unsafe {}`.
139                #[allow(unused_unsafe)]
140                unsafe {
141                    thread_data.parker.prepare_park();
142                }
143
144                // Add our thread to the front of the queue
145                let queue_head = state.queue_head();
146                if queue_head.is_null() {
147                    thread_data.queue_tail.set(thread_data);
148                    thread_data.prev.set(ptr::null());
149                } else {
150                    thread_data.queue_tail.set(ptr::null());
151                    thread_data.prev.set(ptr::null());
152                    thread_data.next.set(queue_head);
153                }
154                if let Err(x) = self.state.compare_exchange_weak(
155                    state,
156                    state.with_queue_head(thread_data),
157                    Ordering::AcqRel,
158                    Ordering::Relaxed,
159                ) {
160                    return x;
161                }
162
163                // Sleep until we are woken up by an unlock
164                // Ignoring unused unsafe, since it's only a few platforms where this is unsafe.
165                #[allow(unused_unsafe)]
166                unsafe {
167                    thread_data.parker.park();
168                }
169
170                // Loop back and try locking again
171                spinwait.reset();
172                self.state.load(Ordering::Relaxed)
173            });
174        }
175    }
176
177    #[cold]
178    fn unlock_slow(&self) {
179        let mut state = self.state.load(Ordering::Relaxed);
180        loop {
181            // We just unlocked the WordLock. Just check if there is a thread
182            // to wake up. If the queue is locked then another thread is already
183            // taking care of waking up a thread.
184            if state.is_queue_locked() || state.queue_head().is_null() {
185                return;
186            }
187
188            // Try to grab the queue lock
189            match self.state.compare_exchange_weak(
190                state,
191                state | QUEUE_LOCKED_BIT,
192                Ordering::Acquire,
193                Ordering::Relaxed,
194            ) {
195                Ok(_) => break,
196                Err(x) => state = x,
197            }
198        }
199
200        // Now we have the queue lock and the queue is non-empty
201        'outer: loop {
202            // First, we need to fill in the prev pointers for any newly added
203            // threads. We do this until we reach a node that we previously
204            // processed, which has a non-null queue_tail pointer.
205            let queue_head = state.queue_head();
206            let mut queue_tail;
207            let mut current = queue_head;
208            loop {
209                queue_tail = unsafe { (*current).queue_tail.get() };
210                if !queue_tail.is_null() {
211                    break;
212                }
213                unsafe {
214                    let next = (*current).next.get();
215                    (*next).prev.set(current);
216                    current = next;
217                }
218            }
219
220            // Set queue_tail on the queue head to indicate that the whole list
221            // has prev pointers set correctly.
222            unsafe {
223                (*queue_head).queue_tail.set(queue_tail);
224            }
225
226            // If the WordLock is locked, then there is no point waking up a
227            // thread now. Instead we let the next unlocker take care of waking
228            // up a thread.
229            if state.is_locked() {
230                match self.state.compare_exchange_weak(
231                    state,
232                    state & !QUEUE_LOCKED_BIT,
233                    Ordering::Release,
234                    Ordering::Relaxed,
235                ) {
236                    Ok(_) => return,
237                    Err(x) => state = x,
238                }
239
240                // Need an acquire fence before reading the new queue
241                fence_acquire(&self.state);
242                continue;
243            }
244
245            // Remove the last thread from the queue and unlock the queue
246            let new_tail = unsafe { (*queue_tail).prev.get() };
247            if new_tail.is_null() {
248                loop {
249                    match self.state.compare_exchange_weak(
250                        state,
251                        state & LOCKED_BIT,
252                        Ordering::Release,
253                        Ordering::Relaxed,
254                    ) {
255                        Ok(_) => break,
256                        Err(x) => state = x,
257                    }
258
259                    // If the compare_exchange failed because a new thread was
260                    // added to the queue then we need to re-scan the queue to
261                    // find the previous element.
262                    if state.queue_head().is_null() {
263                        continue;
264                    } else {
265                        // Need an acquire fence before reading the new queue
266                        fence_acquire(&self.state);
267                        continue 'outer;
268                    }
269                }
270            } else {
271                unsafe {
272                    (*queue_head).queue_tail.set(new_tail);
273                }
274                self.state.fetch_and(!QUEUE_LOCKED_BIT, Ordering::Release);
275            }
276
277            // Finally, wake up the thread we removed from the queue. Note that
278            // we don't need to worry about any races here since the thread is
279            // guaranteed to be sleeping right now and we are the only one who
280            // can wake it up.
281            unsafe {
282                (*queue_tail).parker.unpark_lock().unpark();
283            }
284            break;
285        }
286    }
287}
288
289// Thread-Sanitizer only has partial fence support, so when running under it, we
290// try and avoid false positives by using a discarded acquire load instead.
291#[inline]
292fn fence_acquire(a: &AtomicUsize) {
293    if cfg!(tsan_enabled) {
294        let _ = a.load(Ordering::Acquire);
295    } else {
296        fence(Ordering::Acquire);
297    }
298}
299
300trait LockState {
301    fn is_locked(self) -> bool;
302    fn is_queue_locked(self) -> bool;
303    fn queue_head(self) -> *const ThreadData;
304    fn with_queue_head(self, thread_data: *const ThreadData) -> Self;
305}
306
307impl LockState for usize {
308    #[inline]
309    fn is_locked(self) -> bool {
310        self & LOCKED_BIT != 0
311    }
312
313    #[inline]
314    fn is_queue_locked(self) -> bool {
315        self & QUEUE_LOCKED_BIT != 0
316    }
317
318    #[inline]
319    fn queue_head(self) -> *const ThreadData {
320        (self & QUEUE_MASK) as *const ThreadData
321    }
322
323    #[inline]
324    fn with_queue_head(self, thread_data: *const ThreadData) -> Self {
325        (self & !QUEUE_MASK) | thread_data as *const _ as usize
326    }
327}