parking_lot/
raw_mutex.rs

1// Copyright 2016 Amanieu d'Antras
2//
3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. This file may not be
6// copied, modified, or distributed except according to those terms.
7
8use crate::{deadlock, util};
9use core::{
10    sync::atomic::{AtomicU8, Ordering},
11    time::Duration,
12};
13use lock_api::RawMutex as RawMutex_;
14use parking_lot_core::{self, ParkResult, SpinWait, UnparkResult, UnparkToken, DEFAULT_PARK_TOKEN};
15use std::time::Instant;
16
17// UnparkToken used to indicate that that the target thread should attempt to
18// lock the mutex again as soon as it is unparked.
19pub(crate) const TOKEN_NORMAL: UnparkToken = UnparkToken(0);
20
21// UnparkToken used to indicate that the mutex is being handed off to the target
22// thread directly without unlocking it.
23pub(crate) const TOKEN_HANDOFF: UnparkToken = UnparkToken(1);
24
25/// This bit is set in the `state` of a `RawMutex` when that mutex is locked by some thread.
26const LOCKED_BIT: u8 = 0b01;
27/// This bit is set in the `state` of a `RawMutex` just before parking a thread. A thread is being
28/// parked if it wants to lock the mutex, but it is currently being held by some other thread.
29const PARKED_BIT: u8 = 0b10;
30
31/// Raw mutex type backed by the parking lot.
32pub struct RawMutex {
33    /// This atomic integer holds the current state of the mutex instance. Only the two lowest bits
34    /// are used. See `LOCKED_BIT` and `PARKED_BIT` for the bitmask for these bits.
35    ///
36    /// # State table:
37    ///
38    /// PARKED_BIT | LOCKED_BIT | Description
39    ///     0      |     0      | The mutex is not locked, nor is anyone waiting for it.
40    /// -----------+------------+------------------------------------------------------------------
41    ///     0      |     1      | The mutex is locked by exactly one thread. No other thread is
42    ///            |            | waiting for it.
43    /// -----------+------------+------------------------------------------------------------------
44    ///     1      |     0      | The mutex is not locked. One or more thread is parked or about to
45    ///            |            | park. At least one of the parked threads are just about to be
46    ///            |            | unparked, or a thread heading for parking might abort the park.
47    /// -----------+------------+------------------------------------------------------------------
48    ///     1      |     1      | The mutex is locked by exactly one thread. One or more thread is
49    ///            |            | parked or about to park, waiting for the lock to become available.
50    ///            |            | In this state, PARKED_BIT is only ever cleared when a bucket lock
51    ///            |            | is held (i.e. in a parking_lot_core callback). This ensures that
52    ///            |            | we never end up in a situation where there are parked threads but
53    ///            |            | PARKED_BIT is not set (which would result in those threads
54    ///            |            | potentially never getting woken up).
55    state: AtomicU8,
56}
57
58unsafe impl lock_api::RawMutex for RawMutex {
59    const INIT: RawMutex = RawMutex {
60        state: AtomicU8::new(0),
61    };
62
63    type GuardMarker = crate::GuardMarker;
64
65    #[inline]
66    fn lock(&self) {
67        if self
68            .state
69            .compare_exchange_weak(0, LOCKED_BIT, Ordering::Acquire, Ordering::Relaxed)
70            .is_err()
71        {
72            self.lock_slow(None);
73        }
74        unsafe { deadlock::acquire_resource(self as *const _ as usize) };
75    }
76
77    #[inline]
78    fn try_lock(&self) -> bool {
79        let mut state = self.state.load(Ordering::Relaxed);
80        loop {
81            if state & LOCKED_BIT != 0 {
82                return false;
83            }
84            match self.state.compare_exchange_weak(
85                state,
86                state | LOCKED_BIT,
87                Ordering::Acquire,
88                Ordering::Relaxed,
89            ) {
90                Ok(_) => {
91                    unsafe { deadlock::acquire_resource(self as *const _ as usize) };
92                    return true;
93                }
94                Err(x) => state = x,
95            }
96        }
97    }
98
99    #[inline]
100    unsafe fn unlock(&self) {
101        deadlock::release_resource(self as *const _ as usize);
102        if self
103            .state
104            .compare_exchange(LOCKED_BIT, 0, Ordering::Release, Ordering::Relaxed)
105            .is_ok()
106        {
107            return;
108        }
109        self.unlock_slow(false);
110    }
111
112    #[inline]
113    fn is_locked(&self) -> bool {
114        let state = self.state.load(Ordering::Relaxed);
115        state & LOCKED_BIT != 0
116    }
117}
118
119unsafe impl lock_api::RawMutexFair for RawMutex {
120    #[inline]
121    unsafe fn unlock_fair(&self) {
122        deadlock::release_resource(self as *const _ as usize);
123        if self
124            .state
125            .compare_exchange(LOCKED_BIT, 0, Ordering::Release, Ordering::Relaxed)
126            .is_ok()
127        {
128            return;
129        }
130        self.unlock_slow(true);
131    }
132
133    #[inline]
134    unsafe fn bump(&self) {
135        if self.state.load(Ordering::Relaxed) & PARKED_BIT != 0 {
136            self.bump_slow();
137        }
138    }
139}
140
141unsafe impl lock_api::RawMutexTimed for RawMutex {
142    type Duration = Duration;
143    type Instant = Instant;
144
145    #[inline]
146    fn try_lock_until(&self, timeout: Instant) -> bool {
147        let result = if self
148            .state
149            .compare_exchange_weak(0, LOCKED_BIT, Ordering::Acquire, Ordering::Relaxed)
150            .is_ok()
151        {
152            true
153        } else {
154            self.lock_slow(Some(timeout))
155        };
156        if result {
157            unsafe { deadlock::acquire_resource(self as *const _ as usize) };
158        }
159        result
160    }
161
162    #[inline]
163    fn try_lock_for(&self, timeout: Duration) -> bool {
164        let result = if self
165            .state
166            .compare_exchange_weak(0, LOCKED_BIT, Ordering::Acquire, Ordering::Relaxed)
167            .is_ok()
168        {
169            true
170        } else {
171            self.lock_slow(util::to_deadline(timeout))
172        };
173        if result {
174            unsafe { deadlock::acquire_resource(self as *const _ as usize) };
175        }
176        result
177    }
178}
179
180impl RawMutex {
181    // Used by Condvar when requeuing threads to us, must be called while
182    // holding the queue lock.
183    #[inline]
184    pub(crate) fn mark_parked_if_locked(&self) -> bool {
185        let mut state = self.state.load(Ordering::Relaxed);
186        loop {
187            if state & LOCKED_BIT == 0 {
188                return false;
189            }
190            match self.state.compare_exchange_weak(
191                state,
192                state | PARKED_BIT,
193                Ordering::Relaxed,
194                Ordering::Relaxed,
195            ) {
196                Ok(_) => return true,
197                Err(x) => state = x,
198            }
199        }
200    }
201
202    // Used by Condvar when requeuing threads to us, must be called while
203    // holding the queue lock.
204    #[inline]
205    pub(crate) fn mark_parked(&self) {
206        self.state.fetch_or(PARKED_BIT, Ordering::Relaxed);
207    }
208
209    #[cold]
210    fn lock_slow(&self, timeout: Option<Instant>) -> bool {
211        let mut spinwait = SpinWait::new();
212        let mut state = self.state.load(Ordering::Relaxed);
213        loop {
214            // Grab the lock if it isn't locked, even if there is a queue on it
215            if state & LOCKED_BIT == 0 {
216                match self.state.compare_exchange_weak(
217                    state,
218                    state | LOCKED_BIT,
219                    Ordering::Acquire,
220                    Ordering::Relaxed,
221                ) {
222                    Ok(_) => return true,
223                    Err(x) => state = x,
224                }
225                continue;
226            }
227
228            // If there is no queue, try spinning a few times
229            if state & PARKED_BIT == 0 && spinwait.spin() {
230                state = self.state.load(Ordering::Relaxed);
231                continue;
232            }
233
234            // Set the parked bit
235            if state & PARKED_BIT == 0 {
236                if let Err(x) = self.state.compare_exchange_weak(
237                    state,
238                    state | PARKED_BIT,
239                    Ordering::Relaxed,
240                    Ordering::Relaxed,
241                ) {
242                    state = x;
243                    continue;
244                }
245            }
246
247            // Park our thread until we are woken up by an unlock
248            let addr = self as *const _ as usize;
249            let validate = || self.state.load(Ordering::Relaxed) == LOCKED_BIT | PARKED_BIT;
250            let before_sleep = || {};
251            let timed_out = |_, was_last_thread| {
252                // Clear the parked bit if we were the last parked thread
253                if was_last_thread {
254                    self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
255                }
256            };
257            // SAFETY:
258            //   * `addr` is an address we control.
259            //   * `validate`/`timed_out` does not panic or call into any function of `parking_lot`.
260            //   * `before_sleep` does not call `park`, nor does it panic.
261            match unsafe {
262                parking_lot_core::park(
263                    addr,
264                    validate,
265                    before_sleep,
266                    timed_out,
267                    DEFAULT_PARK_TOKEN,
268                    timeout,
269                )
270            } {
271                // The thread that unparked us passed the lock on to us
272                // directly without unlocking it.
273                ParkResult::Unparked(TOKEN_HANDOFF) => return true,
274
275                // We were unparked normally, try acquiring the lock again
276                ParkResult::Unparked(_) => (),
277
278                // The validation function failed, try locking again
279                ParkResult::Invalid => (),
280
281                // Timeout expired
282                ParkResult::TimedOut => return false,
283            }
284
285            // Loop back and try locking again
286            spinwait.reset();
287            state = self.state.load(Ordering::Relaxed);
288        }
289    }
290
291    #[cold]
292    fn unlock_slow(&self, force_fair: bool) {
293        // Unpark one thread and leave the parked bit set if there might
294        // still be parked threads on this address.
295        let addr = self as *const _ as usize;
296        let callback = |result: UnparkResult| {
297            // If we are using a fair unlock then we should keep the
298            // mutex locked and hand it off to the unparked thread.
299            if result.unparked_threads != 0 && (force_fair || result.be_fair) {
300                // Clear the parked bit if there are no more parked
301                // threads.
302                if !result.have_more_threads {
303                    self.state.store(LOCKED_BIT, Ordering::Relaxed);
304                }
305                return TOKEN_HANDOFF;
306            }
307
308            // Clear the locked bit, and the parked bit as well if there
309            // are no more parked threads.
310            if result.have_more_threads {
311                self.state.store(PARKED_BIT, Ordering::Release);
312            } else {
313                self.state.store(0, Ordering::Release);
314            }
315            TOKEN_NORMAL
316        };
317        // SAFETY:
318        //   * `addr` is an address we control.
319        //   * `callback` does not panic or call into any function of `parking_lot`.
320        unsafe {
321            parking_lot_core::unpark_one(addr, callback);
322        }
323    }
324
325    #[cold]
326    fn bump_slow(&self) {
327        unsafe { deadlock::release_resource(self as *const _ as usize) };
328        self.unlock_slow(true);
329        self.lock();
330    }
331}