async_lock/
mutex.rs

1use std::cell::UnsafeCell;
2use std::fmt;
3use std::future::Future;
4use std::ops::{Deref, DerefMut};
5use std::process;
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::Arc;
8
9// Note: we cannot use `target_family = "wasm"` here because it requires Rust 1.54.
10#[cfg(not(any(target_arch = "wasm32", target_arch = "wasm64")))]
11use std::time::{Duration, Instant};
12
13use std::usize;
14
15use event_listener::Event;
16
17/// An async mutex.
18///
19/// The locking mechanism uses eventual fairness to ensure locking will be fair on average without
20/// sacrificing performance. This is done by forcing a fair lock whenever a lock operation is
21/// starved for longer than 0.5 milliseconds.
22///
23/// # Examples
24///
25/// ```
26/// # futures_lite::future::block_on(async {
27/// use async_lock::Mutex;
28///
29/// let m = Mutex::new(1);
30///
31/// let mut guard = m.lock().await;
32/// *guard = 2;
33///
34/// assert!(m.try_lock().is_none());
35/// drop(guard);
36/// assert_eq!(*m.try_lock().unwrap(), 2);
37/// # })
38/// ```
39pub struct Mutex<T: ?Sized> {
40    /// Current state of the mutex.
41    ///
42    /// The least significant bit is set to 1 if the mutex is locked.
43    /// The other bits hold the number of starved lock operations.
44    state: AtomicUsize,
45
46    /// Lock operations waiting for the mutex to be released.
47    lock_ops: Event,
48
49    /// The value inside the mutex.
50    data: UnsafeCell<T>,
51}
52
53unsafe impl<T: Send + ?Sized> Send for Mutex<T> {}
54unsafe impl<T: Send + ?Sized> Sync for Mutex<T> {}
55
56impl<T> Mutex<T> {
57    /// Creates a new async mutex.
58    ///
59    /// # Examples
60    ///
61    /// ```
62    /// use async_lock::Mutex;
63    ///
64    /// let mutex = Mutex::new(0);
65    /// ```
66    pub const fn new(data: T) -> Mutex<T> {
67        Mutex {
68            state: AtomicUsize::new(0),
69            lock_ops: Event::new(),
70            data: UnsafeCell::new(data),
71        }
72    }
73
74    /// Consumes the mutex, returning the underlying data.
75    ///
76    /// # Examples
77    ///
78    /// ```
79    /// use async_lock::Mutex;
80    ///
81    /// let mutex = Mutex::new(10);
82    /// assert_eq!(mutex.into_inner(), 10);
83    /// ```
84    pub fn into_inner(self) -> T {
85        self.data.into_inner()
86    }
87}
88
89impl<T: ?Sized> Mutex<T> {
90    /// Acquires the mutex.
91    ///
92    /// Returns a guard that releases the mutex when dropped.
93    ///
94    /// # Examples
95    ///
96    /// ```
97    /// # futures_lite::future::block_on(async {
98    /// use async_lock::Mutex;
99    ///
100    /// let mutex = Mutex::new(10);
101    /// let guard = mutex.lock().await;
102    /// assert_eq!(*guard, 10);
103    /// # })
104    /// ```
105    #[inline]
106    pub async fn lock(&self) -> MutexGuard<'_, T> {
107        if let Some(guard) = self.try_lock() {
108            return guard;
109        }
110        self.acquire_slow().await;
111        MutexGuard(self)
112    }
113
114    /// Slow path for acquiring the mutex.
115    #[cold]
116    async fn acquire_slow(&self) {
117        // Get the current time.
118        #[cfg(not(any(target_arch = "wasm32", target_arch = "wasm64")))]
119        let start = Instant::now();
120
121        loop {
122            // Start listening for events.
123            let listener = self.lock_ops.listen();
124
125            // Try locking if nobody is being starved.
126            match self
127                .state
128                .compare_exchange(0, 1, Ordering::Acquire, Ordering::Acquire)
129                .unwrap_or_else(|x| x)
130            {
131                // Lock acquired!
132                0 => return,
133
134                // Lock is held and nobody is starved.
135                1 => {}
136
137                // Somebody is starved.
138                _ => break,
139            }
140
141            // Wait for a notification.
142            listener.await;
143
144            // Try locking if nobody is being starved.
145            match self
146                .state
147                .compare_exchange(0, 1, Ordering::Acquire, Ordering::Acquire)
148                .unwrap_or_else(|x| x)
149            {
150                // Lock acquired!
151                0 => return,
152
153                // Lock is held and nobody is starved.
154                1 => {}
155
156                // Somebody is starved.
157                _ => {
158                    // Notify the first listener in line because we probably received a
159                    // notification that was meant for a starved task.
160                    self.lock_ops.notify(1);
161                    break;
162                }
163            }
164
165            // If waiting for too long, fall back to a fairer locking strategy that will prevent
166            // newer lock operations from starving us forever.
167            #[cfg(not(any(target_arch = "wasm32", target_arch = "wasm64")))]
168            if start.elapsed() > Duration::from_micros(500) {
169                break;
170            }
171        }
172
173        // Increment the number of starved lock operations.
174        if self.state.fetch_add(2, Ordering::Release) > usize::MAX / 2 {
175            // In case of potential overflow, abort.
176            process::abort();
177        }
178
179        // Decrement the counter when exiting this function.
180        let _call = CallOnDrop(|| {
181            self.state.fetch_sub(2, Ordering::Release);
182        });
183
184        loop {
185            // Start listening for events.
186            let listener = self.lock_ops.listen();
187
188            // Try locking if nobody else is being starved.
189            match self
190                .state
191                .compare_exchange(2, 2 | 1, Ordering::Acquire, Ordering::Acquire)
192                .unwrap_or_else(|x| x)
193            {
194                // Lock acquired!
195                2 => return,
196
197                // Lock is held by someone.
198                s if s % 2 == 1 => {}
199
200                // Lock is available.
201                _ => {
202                    // Be fair: notify the first listener and then go wait in line.
203                    self.lock_ops.notify(1);
204                }
205            }
206
207            // Wait for a notification.
208            listener.await;
209
210            // Try acquiring the lock without waiting for others.
211            if self.state.fetch_or(1, Ordering::Acquire) % 2 == 0 {
212                return;
213            }
214        }
215    }
216
217    /// Attempts to acquire the mutex.
218    ///
219    /// If the mutex could not be acquired at this time, then [`None`] is returned. Otherwise, a
220    /// guard is returned that releases the mutex when dropped.
221    ///
222    /// # Examples
223    ///
224    /// ```
225    /// use async_lock::Mutex;
226    ///
227    /// let mutex = Mutex::new(10);
228    /// if let Some(guard) = mutex.try_lock() {
229    ///     assert_eq!(*guard, 10);
230    /// }
231    /// # ;
232    /// ```
233    #[inline]
234    pub fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
235        if self
236            .state
237            .compare_exchange(0, 1, Ordering::Acquire, Ordering::Acquire)
238            .is_ok()
239        {
240            Some(MutexGuard(self))
241        } else {
242            None
243        }
244    }
245
246    /// Returns a mutable reference to the underlying data.
247    ///
248    /// Since this call borrows the mutex mutably, no actual locking takes place -- the mutable
249    /// borrow statically guarantees the mutex is not already acquired.
250    ///
251    /// # Examples
252    ///
253    /// ```
254    /// # futures_lite::future::block_on(async {
255    /// use async_lock::Mutex;
256    ///
257    /// let mut mutex = Mutex::new(0);
258    /// *mutex.get_mut() = 10;
259    /// assert_eq!(*mutex.lock().await, 10);
260    /// # })
261    /// ```
262    pub fn get_mut(&mut self) -> &mut T {
263        unsafe { &mut *self.data.get() }
264    }
265}
266
267impl<T: ?Sized> Mutex<T> {
268    async fn lock_arc_impl(self: Arc<Self>) -> MutexGuardArc<T> {
269        if let Some(guard) = self.try_lock_arc() {
270            return guard;
271        }
272        self.acquire_slow().await;
273        MutexGuardArc(self)
274    }
275
276    /// Acquires the mutex and clones a reference to it.
277    ///
278    /// Returns an owned guard that releases the mutex when dropped.
279    ///
280    /// # Examples
281    ///
282    /// ```
283    /// # futures_lite::future::block_on(async {
284    /// use async_lock::Mutex;
285    /// use std::sync::Arc;
286    ///
287    /// let mutex = Arc::new(Mutex::new(10));
288    /// let guard = mutex.lock_arc().await;
289    /// assert_eq!(*guard, 10);
290    /// # })
291    /// ```
292    #[inline]
293    pub fn lock_arc(self: &Arc<Self>) -> impl Future<Output = MutexGuardArc<T>> {
294        self.clone().lock_arc_impl()
295    }
296
297    /// Attempts to acquire the mutex and clone a reference to it.
298    ///
299    /// If the mutex could not be acquired at this time, then [`None`] is returned. Otherwise, an
300    /// owned guard is returned that releases the mutex when dropped.
301    ///
302    /// # Examples
303    ///
304    /// ```
305    /// use async_lock::Mutex;
306    /// use std::sync::Arc;
307    ///
308    /// let mutex = Arc::new(Mutex::new(10));
309    /// if let Some(guard) = mutex.try_lock() {
310    ///     assert_eq!(*guard, 10);
311    /// }
312    /// # ;
313    /// ```
314    #[inline]
315    pub fn try_lock_arc(self: &Arc<Self>) -> Option<MutexGuardArc<T>> {
316        if self
317            .state
318            .compare_exchange(0, 1, Ordering::Acquire, Ordering::Acquire)
319            .is_ok()
320        {
321            Some(MutexGuardArc(self.clone()))
322        } else {
323            None
324        }
325    }
326}
327
328impl<T: fmt::Debug + ?Sized> fmt::Debug for Mutex<T> {
329    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
330        struct Locked;
331        impl fmt::Debug for Locked {
332            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
333                f.write_str("<locked>")
334            }
335        }
336
337        match self.try_lock() {
338            None => f.debug_struct("Mutex").field("data", &Locked).finish(),
339            Some(guard) => f.debug_struct("Mutex").field("data", &&*guard).finish(),
340        }
341    }
342}
343
344impl<T> From<T> for Mutex<T> {
345    fn from(val: T) -> Mutex<T> {
346        Mutex::new(val)
347    }
348}
349
350impl<T: Default + ?Sized> Default for Mutex<T> {
351    fn default() -> Mutex<T> {
352        Mutex::new(Default::default())
353    }
354}
355
356/// A guard that releases the mutex when dropped.
357pub struct MutexGuard<'a, T: ?Sized>(&'a Mutex<T>);
358
359unsafe impl<T: Send + ?Sized> Send for MutexGuard<'_, T> {}
360unsafe impl<T: Sync + ?Sized> Sync for MutexGuard<'_, T> {}
361
362impl<'a, T: ?Sized> MutexGuard<'a, T> {
363    /// Returns a reference to the mutex a guard came from.
364    ///
365    /// # Examples
366    ///
367    /// ```
368    /// # futures_lite::future::block_on(async {
369    /// use async_lock::{Mutex, MutexGuard};
370    ///
371    /// let mutex = Mutex::new(10i32);
372    /// let guard = mutex.lock().await;
373    /// dbg!(MutexGuard::source(&guard));
374    /// # })
375    /// ```
376    pub fn source(guard: &MutexGuard<'a, T>) -> &'a Mutex<T> {
377        guard.0
378    }
379}
380
381impl<T: ?Sized> Drop for MutexGuard<'_, T> {
382    fn drop(&mut self) {
383        // Remove the last bit and notify a waiting lock operation.
384        self.0.state.fetch_sub(1, Ordering::Release);
385        self.0.lock_ops.notify(1);
386    }
387}
388
389impl<T: fmt::Debug + ?Sized> fmt::Debug for MutexGuard<'_, T> {
390    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
391        fmt::Debug::fmt(&**self, f)
392    }
393}
394
395impl<T: fmt::Display + ?Sized> fmt::Display for MutexGuard<'_, T> {
396    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
397        (**self).fmt(f)
398    }
399}
400
401impl<T: ?Sized> Deref for MutexGuard<'_, T> {
402    type Target = T;
403
404    fn deref(&self) -> &T {
405        unsafe { &*self.0.data.get() }
406    }
407}
408
409impl<T: ?Sized> DerefMut for MutexGuard<'_, T> {
410    fn deref_mut(&mut self) -> &mut T {
411        unsafe { &mut *self.0.data.get() }
412    }
413}
414
415/// An owned guard that releases the mutex when dropped.
416pub struct MutexGuardArc<T: ?Sized>(Arc<Mutex<T>>);
417
418unsafe impl<T: Send + ?Sized> Send for MutexGuardArc<T> {}
419unsafe impl<T: Sync + ?Sized> Sync for MutexGuardArc<T> {}
420
421impl<T: ?Sized> MutexGuardArc<T> {
422    /// Returns a reference to the mutex a guard came from.
423    ///
424    /// # Examples
425    ///
426    /// ```
427    /// # futures_lite::future::block_on(async {
428    /// use async_lock::{Mutex, MutexGuardArc};
429    /// use std::sync::Arc;
430    ///
431    /// let mutex = Arc::new(Mutex::new(10i32));
432    /// let guard = mutex.lock_arc().await;
433    /// dbg!(MutexGuardArc::source(&guard));
434    /// # })
435    /// ```
436    pub fn source(guard: &MutexGuardArc<T>) -> &Arc<Mutex<T>> {
437        &guard.0
438    }
439}
440
441impl<T: ?Sized> Drop for MutexGuardArc<T> {
442    fn drop(&mut self) {
443        // Remove the last bit and notify a waiting lock operation.
444        self.0.state.fetch_sub(1, Ordering::Release);
445        self.0.lock_ops.notify(1);
446    }
447}
448
449impl<T: fmt::Debug + ?Sized> fmt::Debug for MutexGuardArc<T> {
450    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
451        fmt::Debug::fmt(&**self, f)
452    }
453}
454
455impl<T: fmt::Display + ?Sized> fmt::Display for MutexGuardArc<T> {
456    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
457        (**self).fmt(f)
458    }
459}
460
461impl<T: ?Sized> Deref for MutexGuardArc<T> {
462    type Target = T;
463
464    fn deref(&self) -> &T {
465        unsafe { &*self.0.data.get() }
466    }
467}
468
469impl<T: ?Sized> DerefMut for MutexGuardArc<T> {
470    fn deref_mut(&mut self) -> &mut T {
471        unsafe { &mut *self.0.data.get() }
472    }
473}
474
475/// Calls a function when dropped.
476struct CallOnDrop<F: Fn()>(F);
477
478impl<F: Fn()> Drop for CallOnDrop<F> {
479    fn drop(&mut self) {
480        (self.0)();
481    }
482}