futures_util/lock/
mutex.rs

1use std::cell::UnsafeCell;
2use std::marker::PhantomData;
3use std::ops::{Deref, DerefMut};
4use std::pin::Pin;
5use std::sync::atomic::{AtomicUsize, Ordering};
6use std::sync::{Arc, Mutex as StdMutex};
7use std::{fmt, mem};
8
9use slab::Slab;
10
11use futures_core::future::{FusedFuture, Future};
12use futures_core::task::{Context, Poll, Waker};
13
14/// A futures-aware mutex.
15///
16/// # Fairness
17///
18/// This mutex provides no fairness guarantees. Tasks may not acquire the mutex
19/// in the order that they requested the lock, and it's possible for a single task
20/// which repeatedly takes the lock to starve other tasks, which may be left waiting
21/// indefinitely.
22pub struct Mutex<T: ?Sized> {
23    state: AtomicUsize,
24    waiters: StdMutex<Slab<Waiter>>,
25    value: UnsafeCell<T>,
26}
27
28impl<T: ?Sized> fmt::Debug for Mutex<T> {
29    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30        let state = self.state.load(Ordering::SeqCst);
31        f.debug_struct("Mutex")
32            .field("is_locked", &((state & IS_LOCKED) != 0))
33            .field("has_waiters", &((state & HAS_WAITERS) != 0))
34            .finish()
35    }
36}
37
38impl<T> From<T> for Mutex<T> {
39    fn from(t: T) -> Self {
40        Self::new(t)
41    }
42}
43
44impl<T: Default> Default for Mutex<T> {
45    fn default() -> Self {
46        Self::new(Default::default())
47    }
48}
49
50enum Waiter {
51    Waiting(Waker),
52    Woken,
53}
54
55impl Waiter {
56    fn register(&mut self, waker: &Waker) {
57        match self {
58            Self::Waiting(w) if waker.will_wake(w) => {}
59            _ => *self = Self::Waiting(waker.clone()),
60        }
61    }
62
63    fn wake(&mut self) {
64        match mem::replace(self, Self::Woken) {
65            Self::Waiting(waker) => waker.wake(),
66            Self::Woken => {}
67        }
68    }
69}
70
71const IS_LOCKED: usize = 1 << 0;
72const HAS_WAITERS: usize = 1 << 1;
73
74impl<T> Mutex<T> {
75    /// Creates a new futures-aware mutex.
76    pub fn new(t: T) -> Self {
77        Self {
78            state: AtomicUsize::new(0),
79            waiters: StdMutex::new(Slab::new()),
80            value: UnsafeCell::new(t),
81        }
82    }
83
84    /// Consumes this mutex, returning the underlying data.
85    ///
86    /// # Examples
87    ///
88    /// ```
89    /// use futures::lock::Mutex;
90    ///
91    /// let mutex = Mutex::new(0);
92    /// assert_eq!(mutex.into_inner(), 0);
93    /// ```
94    pub fn into_inner(self) -> T {
95        self.value.into_inner()
96    }
97}
98
99impl<T: ?Sized> Mutex<T> {
100    /// Attempt to acquire the lock immediately.
101    ///
102    /// If the lock is currently held, this will return `None`.
103    pub fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
104        let old_state = self.state.fetch_or(IS_LOCKED, Ordering::Acquire);
105        if (old_state & IS_LOCKED) == 0 {
106            Some(MutexGuard { mutex: self })
107        } else {
108            None
109        }
110    }
111
112    /// Attempt to acquire the lock immediately.
113    ///
114    /// If the lock is currently held, this will return `None`.
115    pub fn try_lock_owned(self: &Arc<Self>) -> Option<OwnedMutexGuard<T>> {
116        let old_state = self.state.fetch_or(IS_LOCKED, Ordering::Acquire);
117        if (old_state & IS_LOCKED) == 0 {
118            Some(OwnedMutexGuard { mutex: self.clone() })
119        } else {
120            None
121        }
122    }
123
124    /// Acquire the lock asynchronously.
125    ///
126    /// This method returns a future that will resolve once the lock has been
127    /// successfully acquired.
128    pub fn lock(&self) -> MutexLockFuture<'_, T> {
129        MutexLockFuture { mutex: Some(self), wait_key: WAIT_KEY_NONE }
130    }
131
132    /// Acquire the lock asynchronously.
133    ///
134    /// This method returns a future that will resolve once the lock has been
135    /// successfully acquired.
136    pub fn lock_owned(self: Arc<Self>) -> OwnedMutexLockFuture<T> {
137        OwnedMutexLockFuture { mutex: Some(self), wait_key: WAIT_KEY_NONE }
138    }
139
140    /// Returns a mutable reference to the underlying data.
141    ///
142    /// Since this call borrows the `Mutex` mutably, no actual locking needs to
143    /// take place -- the mutable borrow statically guarantees no locks exist.
144    ///
145    /// # Examples
146    ///
147    /// ```
148    /// # futures::executor::block_on(async {
149    /// use futures::lock::Mutex;
150    ///
151    /// let mut mutex = Mutex::new(0);
152    /// *mutex.get_mut() = 10;
153    /// assert_eq!(*mutex.lock().await, 10);
154    /// # });
155    /// ```
156    pub fn get_mut(&mut self) -> &mut T {
157        // We know statically that there are no other references to `self`, so
158        // there's no need to lock the inner mutex.
159        unsafe { &mut *self.value.get() }
160    }
161
162    fn remove_waker(&self, wait_key: usize, wake_another: bool) {
163        if wait_key != WAIT_KEY_NONE {
164            let mut waiters = self.waiters.lock().unwrap();
165            match waiters.remove(wait_key) {
166                Waiter::Waiting(_) => {}
167                Waiter::Woken => {
168                    // We were awoken, but then dropped before we could
169                    // wake up to acquire the lock. Wake up another
170                    // waiter.
171                    if wake_another {
172                        if let Some((_i, waiter)) = waiters.iter_mut().next() {
173                            waiter.wake();
174                        }
175                    }
176                }
177            }
178            if waiters.is_empty() {
179                self.state.fetch_and(!HAS_WAITERS, Ordering::Relaxed); // released by mutex unlock
180            }
181        }
182    }
183
184    // Unlocks the mutex. Called by MutexGuard and MappedMutexGuard when they are
185    // dropped.
186    fn unlock(&self) {
187        let old_state = self.state.fetch_and(!IS_LOCKED, Ordering::AcqRel);
188        if (old_state & HAS_WAITERS) != 0 {
189            let mut waiters = self.waiters.lock().unwrap();
190            if let Some((_i, waiter)) = waiters.iter_mut().next() {
191                waiter.wake();
192            }
193        }
194    }
195}
196
197// Sentinel for when no slot in the `Slab` has been dedicated to this object.
198const WAIT_KEY_NONE: usize = usize::MAX;
199
200/// A future which resolves when the target mutex has been successfully acquired, owned version.
201pub struct OwnedMutexLockFuture<T: ?Sized> {
202    // `None` indicates that the mutex was successfully acquired.
203    mutex: Option<Arc<Mutex<T>>>,
204    wait_key: usize,
205}
206
207impl<T: ?Sized> fmt::Debug for OwnedMutexLockFuture<T> {
208    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
209        f.debug_struct("OwnedMutexLockFuture")
210            .field("was_acquired", &self.mutex.is_none())
211            .field("mutex", &self.mutex)
212            .field(
213                "wait_key",
214                &(if self.wait_key == WAIT_KEY_NONE { None } else { Some(self.wait_key) }),
215            )
216            .finish()
217    }
218}
219
220impl<T: ?Sized> FusedFuture for OwnedMutexLockFuture<T> {
221    fn is_terminated(&self) -> bool {
222        self.mutex.is_none()
223    }
224}
225
226impl<T: ?Sized> Future for OwnedMutexLockFuture<T> {
227    type Output = OwnedMutexGuard<T>;
228
229    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
230        let this = self.get_mut();
231
232        let mutex = this.mutex.as_ref().expect("polled OwnedMutexLockFuture after completion");
233
234        if let Some(lock) = mutex.try_lock_owned() {
235            mutex.remove_waker(this.wait_key, false);
236            this.mutex = None;
237            return Poll::Ready(lock);
238        }
239
240        {
241            let mut waiters = mutex.waiters.lock().unwrap();
242            if this.wait_key == WAIT_KEY_NONE {
243                this.wait_key = waiters.insert(Waiter::Waiting(cx.waker().clone()));
244                if waiters.len() == 1 {
245                    mutex.state.fetch_or(HAS_WAITERS, Ordering::Relaxed); // released by mutex unlock
246                }
247            } else {
248                waiters[this.wait_key].register(cx.waker());
249            }
250        }
251
252        // Ensure that we haven't raced `MutexGuard::drop`'s unlock path by
253        // attempting to acquire the lock again.
254        if let Some(lock) = mutex.try_lock_owned() {
255            mutex.remove_waker(this.wait_key, false);
256            this.mutex = None;
257            return Poll::Ready(lock);
258        }
259
260        Poll::Pending
261    }
262}
263
264impl<T: ?Sized> Drop for OwnedMutexLockFuture<T> {
265    fn drop(&mut self) {
266        if let Some(mutex) = self.mutex.as_ref() {
267            // This future was dropped before it acquired the mutex.
268            //
269            // Remove ourselves from the map, waking up another waiter if we
270            // had been awoken to acquire the lock.
271            mutex.remove_waker(self.wait_key, true);
272        }
273    }
274}
275
276/// An RAII guard returned by the `lock_owned` and `try_lock_owned` methods.
277/// When this structure is dropped (falls out of scope), the lock will be
278/// unlocked.
279pub struct OwnedMutexGuard<T: ?Sized> {
280    mutex: Arc<Mutex<T>>,
281}
282
283impl<T: ?Sized + fmt::Debug> fmt::Debug for OwnedMutexGuard<T> {
284    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
285        f.debug_struct("OwnedMutexGuard")
286            .field("value", &&**self)
287            .field("mutex", &self.mutex)
288            .finish()
289    }
290}
291
292impl<T: ?Sized> Drop for OwnedMutexGuard<T> {
293    fn drop(&mut self) {
294        self.mutex.unlock()
295    }
296}
297
298impl<T: ?Sized> Deref for OwnedMutexGuard<T> {
299    type Target = T;
300    fn deref(&self) -> &T {
301        unsafe { &*self.mutex.value.get() }
302    }
303}
304
305impl<T: ?Sized> DerefMut for OwnedMutexGuard<T> {
306    fn deref_mut(&mut self) -> &mut T {
307        unsafe { &mut *self.mutex.value.get() }
308    }
309}
310
311/// A future which resolves when the target mutex has been successfully acquired.
312pub struct MutexLockFuture<'a, T: ?Sized> {
313    // `None` indicates that the mutex was successfully acquired.
314    mutex: Option<&'a Mutex<T>>,
315    wait_key: usize,
316}
317
318impl<T: ?Sized> fmt::Debug for MutexLockFuture<'_, T> {
319    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
320        f.debug_struct("MutexLockFuture")
321            .field("was_acquired", &self.mutex.is_none())
322            .field("mutex", &self.mutex)
323            .field(
324                "wait_key",
325                &(if self.wait_key == WAIT_KEY_NONE { None } else { Some(self.wait_key) }),
326            )
327            .finish()
328    }
329}
330
331impl<T: ?Sized> FusedFuture for MutexLockFuture<'_, T> {
332    fn is_terminated(&self) -> bool {
333        self.mutex.is_none()
334    }
335}
336
337impl<'a, T: ?Sized> Future for MutexLockFuture<'a, T> {
338    type Output = MutexGuard<'a, T>;
339
340    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
341        let mutex = self.mutex.expect("polled MutexLockFuture after completion");
342
343        if let Some(lock) = mutex.try_lock() {
344            mutex.remove_waker(self.wait_key, false);
345            self.mutex = None;
346            return Poll::Ready(lock);
347        }
348
349        {
350            let mut waiters = mutex.waiters.lock().unwrap();
351            if self.wait_key == WAIT_KEY_NONE {
352                self.wait_key = waiters.insert(Waiter::Waiting(cx.waker().clone()));
353                if waiters.len() == 1 {
354                    mutex.state.fetch_or(HAS_WAITERS, Ordering::Relaxed); // released by mutex unlock
355                }
356            } else {
357                waiters[self.wait_key].register(cx.waker());
358            }
359        }
360
361        // Ensure that we haven't raced `MutexGuard::drop`'s unlock path by
362        // attempting to acquire the lock again.
363        if let Some(lock) = mutex.try_lock() {
364            mutex.remove_waker(self.wait_key, false);
365            self.mutex = None;
366            return Poll::Ready(lock);
367        }
368
369        Poll::Pending
370    }
371}
372
373impl<T: ?Sized> Drop for MutexLockFuture<'_, T> {
374    fn drop(&mut self) {
375        if let Some(mutex) = self.mutex {
376            // This future was dropped before it acquired the mutex.
377            //
378            // Remove ourselves from the map, waking up another waiter if we
379            // had been awoken to acquire the lock.
380            mutex.remove_waker(self.wait_key, true);
381        }
382    }
383}
384
385/// An RAII guard returned by the `lock` and `try_lock` methods.
386/// When this structure is dropped (falls out of scope), the lock will be
387/// unlocked.
388pub struct MutexGuard<'a, T: ?Sized> {
389    mutex: &'a Mutex<T>,
390}
391
392impl<'a, T: ?Sized> MutexGuard<'a, T> {
393    /// Returns a locked view over a portion of the locked data.
394    ///
395    /// # Example
396    ///
397    /// ```
398    /// # futures::executor::block_on(async {
399    /// use futures::lock::{Mutex, MutexGuard};
400    ///
401    /// let data = Mutex::new(Some("value".to_string()));
402    /// {
403    ///     let locked_str = MutexGuard::map(data.lock().await, |opt| opt.as_mut().unwrap());
404    ///     assert_eq!(&*locked_str, "value");
405    /// }
406    /// # });
407    /// ```
408    #[inline]
409    pub fn map<U: ?Sized, F>(this: Self, f: F) -> MappedMutexGuard<'a, T, U>
410    where
411        F: FnOnce(&mut T) -> &mut U,
412    {
413        let mutex = this.mutex;
414        let value = f(unsafe { &mut *this.mutex.value.get() });
415        // Don't run the `drop` method for MutexGuard. The ownership of the underlying
416        // locked state is being moved to the returned MappedMutexGuard.
417        mem::forget(this);
418        MappedMutexGuard { mutex, value, _marker: PhantomData }
419    }
420}
421
422impl<T: ?Sized + fmt::Debug> fmt::Debug for MutexGuard<'_, T> {
423    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
424        f.debug_struct("MutexGuard").field("value", &&**self).field("mutex", &self.mutex).finish()
425    }
426}
427
428impl<T: ?Sized> Drop for MutexGuard<'_, T> {
429    fn drop(&mut self) {
430        self.mutex.unlock()
431    }
432}
433
434impl<T: ?Sized> Deref for MutexGuard<'_, T> {
435    type Target = T;
436    fn deref(&self) -> &T {
437        unsafe { &*self.mutex.value.get() }
438    }
439}
440
441impl<T: ?Sized> DerefMut for MutexGuard<'_, T> {
442    fn deref_mut(&mut self) -> &mut T {
443        unsafe { &mut *self.mutex.value.get() }
444    }
445}
446
447/// An RAII guard returned by the `MutexGuard::map` and `MappedMutexGuard::map` methods.
448/// When this structure is dropped (falls out of scope), the lock will be unlocked.
449pub struct MappedMutexGuard<'a, T: ?Sized, U: ?Sized> {
450    mutex: &'a Mutex<T>,
451    value: *mut U,
452    _marker: PhantomData<&'a mut U>,
453}
454
455impl<'a, T: ?Sized, U: ?Sized> MappedMutexGuard<'a, T, U> {
456    /// Returns a locked view over a portion of the locked data.
457    ///
458    /// # Example
459    ///
460    /// ```
461    /// # futures::executor::block_on(async {
462    /// use futures::lock::{MappedMutexGuard, Mutex, MutexGuard};
463    ///
464    /// let data = Mutex::new(Some("value".to_string()));
465    /// {
466    ///     let locked_str = MutexGuard::map(data.lock().await, |opt| opt.as_mut().unwrap());
467    ///     let locked_char = MappedMutexGuard::map(locked_str, |s| s.get_mut(0..1).unwrap());
468    ///     assert_eq!(&*locked_char, "v");
469    /// }
470    /// # });
471    /// ```
472    #[inline]
473    pub fn map<V: ?Sized, F>(this: Self, f: F) -> MappedMutexGuard<'a, T, V>
474    where
475        F: FnOnce(&mut U) -> &mut V,
476    {
477        let mutex = this.mutex;
478        let value = f(unsafe { &mut *this.value });
479        // Don't run the `drop` method for MappedMutexGuard. The ownership of the underlying
480        // locked state is being moved to the returned MappedMutexGuard.
481        mem::forget(this);
482        MappedMutexGuard { mutex, value, _marker: PhantomData }
483    }
484}
485
486impl<T: ?Sized, U: ?Sized + fmt::Debug> fmt::Debug for MappedMutexGuard<'_, T, U> {
487    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
488        f.debug_struct("MappedMutexGuard")
489            .field("value", &&**self)
490            .field("mutex", &self.mutex)
491            .finish()
492    }
493}
494
495impl<T: ?Sized, U: ?Sized> Drop for MappedMutexGuard<'_, T, U> {
496    fn drop(&mut self) {
497        self.mutex.unlock()
498    }
499}
500
501impl<T: ?Sized, U: ?Sized> Deref for MappedMutexGuard<'_, T, U> {
502    type Target = U;
503    fn deref(&self) -> &U {
504        unsafe { &*self.value }
505    }
506}
507
508impl<T: ?Sized, U: ?Sized> DerefMut for MappedMutexGuard<'_, T, U> {
509    fn deref_mut(&mut self) -> &mut U {
510        unsafe { &mut *self.value }
511    }
512}
513
514// Mutexes can be moved freely between threads and acquired on any thread so long
515// as the inner value can be safely sent between threads.
516unsafe impl<T: ?Sized + Send> Send for Mutex<T> {}
517unsafe impl<T: ?Sized + Send> Sync for Mutex<T> {}
518
519// It's safe to switch which thread the acquire is being attempted on so long as
520// `T` can be accessed on that thread.
521unsafe impl<T: ?Sized + Send> Send for MutexLockFuture<'_, T> {}
522
523// doesn't have any interesting `&self` methods (only Debug)
524unsafe impl<T: ?Sized> Sync for MutexLockFuture<'_, T> {}
525
526// It's safe to switch which thread the acquire is being attempted on so long as
527// `T` can be accessed on that thread.
528unsafe impl<T: ?Sized + Send> Send for OwnedMutexLockFuture<T> {}
529
530// doesn't have any interesting `&self` methods (only Debug)
531unsafe impl<T: ?Sized> Sync for OwnedMutexLockFuture<T> {}
532
533// Safe to send since we don't track any thread-specific details-- the inner
534// lock is essentially spinlock-equivalent (attempt to flip an atomic bool)
535unsafe impl<T: ?Sized + Send> Send for MutexGuard<'_, T> {}
536unsafe impl<T: ?Sized + Sync> Sync for MutexGuard<'_, T> {}
537
538unsafe impl<T: ?Sized + Send> Send for OwnedMutexGuard<T> {}
539unsafe impl<T: ?Sized + Sync> Sync for OwnedMutexGuard<T> {}
540
541unsafe impl<T: ?Sized + Send, U: ?Sized + Send> Send for MappedMutexGuard<'_, T, U> {}
542unsafe impl<T: ?Sized + Sync, U: ?Sized + Sync> Sync for MappedMutexGuard<'_, T, U> {}
543
544#[test]
545fn test_mutex_guard_debug_not_recurse() {
546    let mutex = Mutex::new(42);
547    let guard = mutex.try_lock().unwrap();
548    let _ = format!("{:?}", guard);
549    let guard = MutexGuard::map(guard, |n| n);
550    let _ = format!("{:?}", guard);
551}