async_lock/mutex.rs
1use std::cell::UnsafeCell;
2use std::fmt;
3use std::future::Future;
4use std::ops::{Deref, DerefMut};
5use std::process;
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::Arc;
8
9// Note: we cannot use `target_family = "wasm"` here because it requires Rust 1.54.
10#[cfg(not(any(target_arch = "wasm32", target_arch = "wasm64")))]
11use std::time::{Duration, Instant};
12
13use std::usize;
14
15use event_listener::Event;
16
17/// An async mutex.
18///
19/// The locking mechanism uses eventual fairness to ensure locking will be fair on average without
20/// sacrificing performance. This is done by forcing a fair lock whenever a lock operation is
21/// starved for longer than 0.5 milliseconds.
22///
23/// # Examples
24///
25/// ```
26/// # futures_lite::future::block_on(async {
27/// use async_lock::Mutex;
28///
29/// let m = Mutex::new(1);
30///
31/// let mut guard = m.lock().await;
32/// *guard = 2;
33///
34/// assert!(m.try_lock().is_none());
35/// drop(guard);
36/// assert_eq!(*m.try_lock().unwrap(), 2);
37/// # })
38/// ```
39pub struct Mutex<T: ?Sized> {
40 /// Current state of the mutex.
41 ///
42 /// The least significant bit is set to 1 if the mutex is locked.
43 /// The other bits hold the number of starved lock operations.
44 state: AtomicUsize,
45
46 /// Lock operations waiting for the mutex to be released.
47 lock_ops: Event,
48
49 /// The value inside the mutex.
50 data: UnsafeCell<T>,
51}
52
53unsafe impl<T: Send + ?Sized> Send for Mutex<T> {}
54unsafe impl<T: Send + ?Sized> Sync for Mutex<T> {}
55
56impl<T> Mutex<T> {
57 /// Creates a new async mutex.
58 ///
59 /// # Examples
60 ///
61 /// ```
62 /// use async_lock::Mutex;
63 ///
64 /// let mutex = Mutex::new(0);
65 /// ```
66 pub const fn new(data: T) -> Mutex<T> {
67 Mutex {
68 state: AtomicUsize::new(0),
69 lock_ops: Event::new(),
70 data: UnsafeCell::new(data),
71 }
72 }
73
74 /// Consumes the mutex, returning the underlying data.
75 ///
76 /// # Examples
77 ///
78 /// ```
79 /// use async_lock::Mutex;
80 ///
81 /// let mutex = Mutex::new(10);
82 /// assert_eq!(mutex.into_inner(), 10);
83 /// ```
84 pub fn into_inner(self) -> T {
85 self.data.into_inner()
86 }
87}
88
89impl<T: ?Sized> Mutex<T> {
90 /// Acquires the mutex.
91 ///
92 /// Returns a guard that releases the mutex when dropped.
93 ///
94 /// # Examples
95 ///
96 /// ```
97 /// # futures_lite::future::block_on(async {
98 /// use async_lock::Mutex;
99 ///
100 /// let mutex = Mutex::new(10);
101 /// let guard = mutex.lock().await;
102 /// assert_eq!(*guard, 10);
103 /// # })
104 /// ```
105 #[inline]
106 pub async fn lock(&self) -> MutexGuard<'_, T> {
107 if let Some(guard) = self.try_lock() {
108 return guard;
109 }
110 self.acquire_slow().await;
111 MutexGuard(self)
112 }
113
114 /// Slow path for acquiring the mutex.
115 #[cold]
116 async fn acquire_slow(&self) {
117 // Get the current time.
118 #[cfg(not(any(target_arch = "wasm32", target_arch = "wasm64")))]
119 let start = Instant::now();
120
121 loop {
122 // Start listening for events.
123 let listener = self.lock_ops.listen();
124
125 // Try locking if nobody is being starved.
126 match self
127 .state
128 .compare_exchange(0, 1, Ordering::Acquire, Ordering::Acquire)
129 .unwrap_or_else(|x| x)
130 {
131 // Lock acquired!
132 0 => return,
133
134 // Lock is held and nobody is starved.
135 1 => {}
136
137 // Somebody is starved.
138 _ => break,
139 }
140
141 // Wait for a notification.
142 listener.await;
143
144 // Try locking if nobody is being starved.
145 match self
146 .state
147 .compare_exchange(0, 1, Ordering::Acquire, Ordering::Acquire)
148 .unwrap_or_else(|x| x)
149 {
150 // Lock acquired!
151 0 => return,
152
153 // Lock is held and nobody is starved.
154 1 => {}
155
156 // Somebody is starved.
157 _ => {
158 // Notify the first listener in line because we probably received a
159 // notification that was meant for a starved task.
160 self.lock_ops.notify(1);
161 break;
162 }
163 }
164
165 // If waiting for too long, fall back to a fairer locking strategy that will prevent
166 // newer lock operations from starving us forever.
167 #[cfg(not(any(target_arch = "wasm32", target_arch = "wasm64")))]
168 if start.elapsed() > Duration::from_micros(500) {
169 break;
170 }
171 }
172
173 // Increment the number of starved lock operations.
174 if self.state.fetch_add(2, Ordering::Release) > usize::MAX / 2 {
175 // In case of potential overflow, abort.
176 process::abort();
177 }
178
179 // Decrement the counter when exiting this function.
180 let _call = CallOnDrop(|| {
181 self.state.fetch_sub(2, Ordering::Release);
182 });
183
184 loop {
185 // Start listening for events.
186 let listener = self.lock_ops.listen();
187
188 // Try locking if nobody else is being starved.
189 match self
190 .state
191 .compare_exchange(2, 2 | 1, Ordering::Acquire, Ordering::Acquire)
192 .unwrap_or_else(|x| x)
193 {
194 // Lock acquired!
195 2 => return,
196
197 // Lock is held by someone.
198 s if s % 2 == 1 => {}
199
200 // Lock is available.
201 _ => {
202 // Be fair: notify the first listener and then go wait in line.
203 self.lock_ops.notify(1);
204 }
205 }
206
207 // Wait for a notification.
208 listener.await;
209
210 // Try acquiring the lock without waiting for others.
211 if self.state.fetch_or(1, Ordering::Acquire) % 2 == 0 {
212 return;
213 }
214 }
215 }
216
217 /// Attempts to acquire the mutex.
218 ///
219 /// If the mutex could not be acquired at this time, then [`None`] is returned. Otherwise, a
220 /// guard is returned that releases the mutex when dropped.
221 ///
222 /// # Examples
223 ///
224 /// ```
225 /// use async_lock::Mutex;
226 ///
227 /// let mutex = Mutex::new(10);
228 /// if let Some(guard) = mutex.try_lock() {
229 /// assert_eq!(*guard, 10);
230 /// }
231 /// # ;
232 /// ```
233 #[inline]
234 pub fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
235 if self
236 .state
237 .compare_exchange(0, 1, Ordering::Acquire, Ordering::Acquire)
238 .is_ok()
239 {
240 Some(MutexGuard(self))
241 } else {
242 None
243 }
244 }
245
246 /// Returns a mutable reference to the underlying data.
247 ///
248 /// Since this call borrows the mutex mutably, no actual locking takes place -- the mutable
249 /// borrow statically guarantees the mutex is not already acquired.
250 ///
251 /// # Examples
252 ///
253 /// ```
254 /// # futures_lite::future::block_on(async {
255 /// use async_lock::Mutex;
256 ///
257 /// let mut mutex = Mutex::new(0);
258 /// *mutex.get_mut() = 10;
259 /// assert_eq!(*mutex.lock().await, 10);
260 /// # })
261 /// ```
262 pub fn get_mut(&mut self) -> &mut T {
263 unsafe { &mut *self.data.get() }
264 }
265}
266
267impl<T: ?Sized> Mutex<T> {
268 async fn lock_arc_impl(self: Arc<Self>) -> MutexGuardArc<T> {
269 if let Some(guard) = self.try_lock_arc() {
270 return guard;
271 }
272 self.acquire_slow().await;
273 MutexGuardArc(self)
274 }
275
276 /// Acquires the mutex and clones a reference to it.
277 ///
278 /// Returns an owned guard that releases the mutex when dropped.
279 ///
280 /// # Examples
281 ///
282 /// ```
283 /// # futures_lite::future::block_on(async {
284 /// use async_lock::Mutex;
285 /// use std::sync::Arc;
286 ///
287 /// let mutex = Arc::new(Mutex::new(10));
288 /// let guard = mutex.lock_arc().await;
289 /// assert_eq!(*guard, 10);
290 /// # })
291 /// ```
292 #[inline]
293 pub fn lock_arc(self: &Arc<Self>) -> impl Future<Output = MutexGuardArc<T>> {
294 self.clone().lock_arc_impl()
295 }
296
297 /// Attempts to acquire the mutex and clone a reference to it.
298 ///
299 /// If the mutex could not be acquired at this time, then [`None`] is returned. Otherwise, an
300 /// owned guard is returned that releases the mutex when dropped.
301 ///
302 /// # Examples
303 ///
304 /// ```
305 /// use async_lock::Mutex;
306 /// use std::sync::Arc;
307 ///
308 /// let mutex = Arc::new(Mutex::new(10));
309 /// if let Some(guard) = mutex.try_lock() {
310 /// assert_eq!(*guard, 10);
311 /// }
312 /// # ;
313 /// ```
314 #[inline]
315 pub fn try_lock_arc(self: &Arc<Self>) -> Option<MutexGuardArc<T>> {
316 if self
317 .state
318 .compare_exchange(0, 1, Ordering::Acquire, Ordering::Acquire)
319 .is_ok()
320 {
321 Some(MutexGuardArc(self.clone()))
322 } else {
323 None
324 }
325 }
326}
327
328impl<T: fmt::Debug + ?Sized> fmt::Debug for Mutex<T> {
329 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
330 struct Locked;
331 impl fmt::Debug for Locked {
332 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
333 f.write_str("<locked>")
334 }
335 }
336
337 match self.try_lock() {
338 None => f.debug_struct("Mutex").field("data", &Locked).finish(),
339 Some(guard) => f.debug_struct("Mutex").field("data", &&*guard).finish(),
340 }
341 }
342}
343
344impl<T> From<T> for Mutex<T> {
345 fn from(val: T) -> Mutex<T> {
346 Mutex::new(val)
347 }
348}
349
350impl<T: Default + ?Sized> Default for Mutex<T> {
351 fn default() -> Mutex<T> {
352 Mutex::new(Default::default())
353 }
354}
355
356/// A guard that releases the mutex when dropped.
357pub struct MutexGuard<'a, T: ?Sized>(&'a Mutex<T>);
358
359unsafe impl<T: Send + ?Sized> Send for MutexGuard<'_, T> {}
360unsafe impl<T: Sync + ?Sized> Sync for MutexGuard<'_, T> {}
361
362impl<'a, T: ?Sized> MutexGuard<'a, T> {
363 /// Returns a reference to the mutex a guard came from.
364 ///
365 /// # Examples
366 ///
367 /// ```
368 /// # futures_lite::future::block_on(async {
369 /// use async_lock::{Mutex, MutexGuard};
370 ///
371 /// let mutex = Mutex::new(10i32);
372 /// let guard = mutex.lock().await;
373 /// dbg!(MutexGuard::source(&guard));
374 /// # })
375 /// ```
376 pub fn source(guard: &MutexGuard<'a, T>) -> &'a Mutex<T> {
377 guard.0
378 }
379}
380
381impl<T: ?Sized> Drop for MutexGuard<'_, T> {
382 fn drop(&mut self) {
383 // Remove the last bit and notify a waiting lock operation.
384 self.0.state.fetch_sub(1, Ordering::Release);
385 self.0.lock_ops.notify(1);
386 }
387}
388
389impl<T: fmt::Debug + ?Sized> fmt::Debug for MutexGuard<'_, T> {
390 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
391 fmt::Debug::fmt(&**self, f)
392 }
393}
394
395impl<T: fmt::Display + ?Sized> fmt::Display for MutexGuard<'_, T> {
396 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
397 (**self).fmt(f)
398 }
399}
400
401impl<T: ?Sized> Deref for MutexGuard<'_, T> {
402 type Target = T;
403
404 fn deref(&self) -> &T {
405 unsafe { &*self.0.data.get() }
406 }
407}
408
409impl<T: ?Sized> DerefMut for MutexGuard<'_, T> {
410 fn deref_mut(&mut self) -> &mut T {
411 unsafe { &mut *self.0.data.get() }
412 }
413}
414
415/// An owned guard that releases the mutex when dropped.
416pub struct MutexGuardArc<T: ?Sized>(Arc<Mutex<T>>);
417
418unsafe impl<T: Send + ?Sized> Send for MutexGuardArc<T> {}
419unsafe impl<T: Sync + ?Sized> Sync for MutexGuardArc<T> {}
420
421impl<T: ?Sized> MutexGuardArc<T> {
422 /// Returns a reference to the mutex a guard came from.
423 ///
424 /// # Examples
425 ///
426 /// ```
427 /// # futures_lite::future::block_on(async {
428 /// use async_lock::{Mutex, MutexGuardArc};
429 /// use std::sync::Arc;
430 ///
431 /// let mutex = Arc::new(Mutex::new(10i32));
432 /// let guard = mutex.lock_arc().await;
433 /// dbg!(MutexGuardArc::source(&guard));
434 /// # })
435 /// ```
436 pub fn source(guard: &MutexGuardArc<T>) -> &Arc<Mutex<T>> {
437 &guard.0
438 }
439}
440
441impl<T: ?Sized> Drop for MutexGuardArc<T> {
442 fn drop(&mut self) {
443 // Remove the last bit and notify a waiting lock operation.
444 self.0.state.fetch_sub(1, Ordering::Release);
445 self.0.lock_ops.notify(1);
446 }
447}
448
449impl<T: fmt::Debug + ?Sized> fmt::Debug for MutexGuardArc<T> {
450 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
451 fmt::Debug::fmt(&**self, f)
452 }
453}
454
455impl<T: fmt::Display + ?Sized> fmt::Display for MutexGuardArc<T> {
456 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
457 (**self).fmt(f)
458 }
459}
460
461impl<T: ?Sized> Deref for MutexGuardArc<T> {
462 type Target = T;
463
464 fn deref(&self) -> &T {
465 unsafe { &*self.0.data.get() }
466 }
467}
468
469impl<T: ?Sized> DerefMut for MutexGuardArc<T> {
470 fn deref_mut(&mut self) -> &mut T {
471 unsafe { &mut *self.0.data.get() }
472 }
473}
474
475/// Calls a function when dropped.
476struct CallOnDrop<F: Fn()>(F);
477
478impl<F: Fn()> Drop for CallOnDrop<F> {
479 fn drop(&mut self) {
480 (self.0)();
481 }
482}