crossbeam_utils/sync/
sharded_lock.rs

1use std::cell::UnsafeCell;
2use std::collections::HashMap;
3use std::fmt;
4use std::marker::PhantomData;
5use std::mem;
6use std::ops::{Deref, DerefMut};
7use std::panic::{RefUnwindSafe, UnwindSafe};
8use std::sync::{LockResult, PoisonError, TryLockError, TryLockResult};
9use std::sync::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
10use std::thread::{self, ThreadId};
11
12use crate::sync::once_lock::OnceLock;
13use crate::CachePadded;
14
15/// The number of shards per sharded lock. Must be a power of two.
16const NUM_SHARDS: usize = 8;
17
18/// A shard containing a single reader-writer lock.
19struct Shard {
20    /// The inner reader-writer lock.
21    lock: RwLock<()>,
22
23    /// The write-guard keeping this shard locked.
24    ///
25    /// Write operations will lock each shard and store the guard here. These guards get dropped at
26    /// the same time the big guard is dropped.
27    write_guard: UnsafeCell<Option<RwLockWriteGuard<'static, ()>>>,
28}
29
30/// A sharded reader-writer lock.
31///
32/// This lock is equivalent to [`RwLock`], except read operations are faster and write operations
33/// are slower.
34///
35/// A `ShardedLock` is internally made of a list of *shards*, each being a [`RwLock`] occupying a
36/// single cache line. Read operations will pick one of the shards depending on the current thread
37/// and lock it. Write operations need to lock all shards in succession.
38///
39/// By splitting the lock into shards, concurrent read operations will in most cases choose
40/// different shards and thus update different cache lines, which is good for scalability. However,
41/// write operations need to do more work and are therefore slower than usual.
42///
43/// The priority policy of the lock is dependent on the underlying operating system's
44/// implementation, and this type does not guarantee that any particular policy will be used.
45///
46/// # Poisoning
47///
48/// A `ShardedLock`, like [`RwLock`], will become poisoned on a panic. Note that it may only be
49/// poisoned if a panic occurs while a write operation is in progress. If a panic occurs in any
50/// read operation, the lock will not be poisoned.
51///
52/// # Examples
53///
54/// ```
55/// use crossbeam_utils::sync::ShardedLock;
56///
57/// let lock = ShardedLock::new(5);
58///
59/// // Any number of read locks can be held at once.
60/// {
61///     let r1 = lock.read().unwrap();
62///     let r2 = lock.read().unwrap();
63///     assert_eq!(*r1, 5);
64///     assert_eq!(*r2, 5);
65/// } // Read locks are dropped at this point.
66///
67/// // However, only one write lock may be held.
68/// {
69///     let mut w = lock.write().unwrap();
70///     *w += 1;
71///     assert_eq!(*w, 6);
72/// } // Write lock is dropped here.
73/// ```
74///
75/// [`RwLock`]: std::sync::RwLock
76pub struct ShardedLock<T: ?Sized> {
77    /// A list of locks protecting the internal data.
78    shards: Box<[CachePadded<Shard>]>,
79
80    /// The internal data.
81    value: UnsafeCell<T>,
82}
83
84unsafe impl<T: ?Sized + Send> Send for ShardedLock<T> {}
85unsafe impl<T: ?Sized + Send + Sync> Sync for ShardedLock<T> {}
86
87impl<T: ?Sized> UnwindSafe for ShardedLock<T> {}
88impl<T: ?Sized> RefUnwindSafe for ShardedLock<T> {}
89
90impl<T> ShardedLock<T> {
91    /// Creates a new sharded reader-writer lock.
92    ///
93    /// # Examples
94    ///
95    /// ```
96    /// use crossbeam_utils::sync::ShardedLock;
97    ///
98    /// let lock = ShardedLock::new(5);
99    /// ```
100    pub fn new(value: T) -> ShardedLock<T> {
101        ShardedLock {
102            shards: (0..NUM_SHARDS)
103                .map(|_| {
104                    CachePadded::new(Shard {
105                        lock: RwLock::new(()),
106                        write_guard: UnsafeCell::new(None),
107                    })
108                })
109                .collect::<Box<[_]>>(),
110            value: UnsafeCell::new(value),
111        }
112    }
113
114    /// Consumes this lock, returning the underlying data.
115    ///
116    /// # Errors
117    ///
118    /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
119    /// operation panics.
120    ///
121    /// # Examples
122    ///
123    /// ```
124    /// use crossbeam_utils::sync::ShardedLock;
125    ///
126    /// let lock = ShardedLock::new(String::new());
127    /// {
128    ///     let mut s = lock.write().unwrap();
129    ///     *s = "modified".to_owned();
130    /// }
131    /// assert_eq!(lock.into_inner().unwrap(), "modified");
132    /// ```
133    pub fn into_inner(self) -> LockResult<T> {
134        let is_poisoned = self.is_poisoned();
135        let inner = self.value.into_inner();
136
137        if is_poisoned {
138            Err(PoisonError::new(inner))
139        } else {
140            Ok(inner)
141        }
142    }
143}
144
145impl<T: ?Sized> ShardedLock<T> {
146    /// Returns `true` if the lock is poisoned.
147    ///
148    /// If another thread can still access the lock, it may become poisoned at any time. A `false`
149    /// result should not be trusted without additional synchronization.
150    ///
151    /// # Examples
152    ///
153    /// ```
154    /// use crossbeam_utils::sync::ShardedLock;
155    /// use std::sync::Arc;
156    /// use std::thread;
157    ///
158    /// let lock = Arc::new(ShardedLock::new(0));
159    /// let c_lock = lock.clone();
160    ///
161    /// let _ = thread::spawn(move || {
162    ///     let _lock = c_lock.write().unwrap();
163    ///     panic!(); // the lock gets poisoned
164    /// }).join();
165    /// assert_eq!(lock.is_poisoned(), true);
166    /// ```
167    pub fn is_poisoned(&self) -> bool {
168        self.shards[0].lock.is_poisoned()
169    }
170
171    /// Returns a mutable reference to the underlying data.
172    ///
173    /// Since this call borrows the lock mutably, no actual locking needs to take place.
174    ///
175    /// # Errors
176    ///
177    /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
178    /// operation panics.
179    ///
180    /// # Examples
181    ///
182    /// ```
183    /// use crossbeam_utils::sync::ShardedLock;
184    ///
185    /// let mut lock = ShardedLock::new(0);
186    /// *lock.get_mut().unwrap() = 10;
187    /// assert_eq!(*lock.read().unwrap(), 10);
188    /// ```
189    pub fn get_mut(&mut self) -> LockResult<&mut T> {
190        let is_poisoned = self.is_poisoned();
191        let inner = unsafe { &mut *self.value.get() };
192
193        if is_poisoned {
194            Err(PoisonError::new(inner))
195        } else {
196            Ok(inner)
197        }
198    }
199
200    /// Attempts to acquire this lock with shared read access.
201    ///
202    /// If the access could not be granted at this time, an error is returned. Otherwise, a guard
203    /// is returned which will release the shared access when it is dropped. This method does not
204    /// provide any guarantees with respect to the ordering of whether contentious readers or
205    /// writers will acquire the lock first.
206    ///
207    /// # Errors
208    ///
209    /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
210    /// operation panics.
211    ///
212    /// # Examples
213    ///
214    /// ```
215    /// use crossbeam_utils::sync::ShardedLock;
216    ///
217    /// let lock = ShardedLock::new(1);
218    ///
219    /// match lock.try_read() {
220    ///     Ok(n) => assert_eq!(*n, 1),
221    ///     Err(_) => unreachable!(),
222    /// };
223    /// ```
224    pub fn try_read(&self) -> TryLockResult<ShardedLockReadGuard<'_, T>> {
225        // Take the current thread index and map it to a shard index. Thread indices will tend to
226        // distribute shards among threads equally, thus reducing contention due to read-locking.
227        let current_index = current_index().unwrap_or(0);
228        let shard_index = current_index & (self.shards.len() - 1);
229
230        match self.shards[shard_index].lock.try_read() {
231            Ok(guard) => Ok(ShardedLockReadGuard {
232                lock: self,
233                _guard: guard,
234                _marker: PhantomData,
235            }),
236            Err(TryLockError::Poisoned(err)) => {
237                let guard = ShardedLockReadGuard {
238                    lock: self,
239                    _guard: err.into_inner(),
240                    _marker: PhantomData,
241                };
242                Err(TryLockError::Poisoned(PoisonError::new(guard)))
243            }
244            Err(TryLockError::WouldBlock) => Err(TryLockError::WouldBlock),
245        }
246    }
247
248    /// Locks with shared read access, blocking the current thread until it can be acquired.
249    ///
250    /// The calling thread will be blocked until there are no more writers which hold the lock.
251    /// There may be other readers currently inside the lock when this method returns. This method
252    /// does not provide any guarantees with respect to the ordering of whether contentious readers
253    /// or writers will acquire the lock first.
254    ///
255    /// Returns a guard which will release the shared access when dropped.
256    ///
257    /// # Errors
258    ///
259    /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
260    /// operation panics.
261    ///
262    /// # Panics
263    ///
264    /// This method might panic when called if the lock is already held by the current thread.
265    ///
266    /// # Examples
267    ///
268    /// ```
269    /// use crossbeam_utils::sync::ShardedLock;
270    /// use std::sync::Arc;
271    /// use std::thread;
272    ///
273    /// let lock = Arc::new(ShardedLock::new(1));
274    /// let c_lock = lock.clone();
275    ///
276    /// let n = lock.read().unwrap();
277    /// assert_eq!(*n, 1);
278    ///
279    /// thread::spawn(move || {
280    ///     let r = c_lock.read();
281    ///     assert!(r.is_ok());
282    /// }).join().unwrap();
283    /// ```
284    pub fn read(&self) -> LockResult<ShardedLockReadGuard<'_, T>> {
285        // Take the current thread index and map it to a shard index. Thread indices will tend to
286        // distribute shards among threads equally, thus reducing contention due to read-locking.
287        let current_index = current_index().unwrap_or(0);
288        let shard_index = current_index & (self.shards.len() - 1);
289
290        match self.shards[shard_index].lock.read() {
291            Ok(guard) => Ok(ShardedLockReadGuard {
292                lock: self,
293                _guard: guard,
294                _marker: PhantomData,
295            }),
296            Err(err) => Err(PoisonError::new(ShardedLockReadGuard {
297                lock: self,
298                _guard: err.into_inner(),
299                _marker: PhantomData,
300            })),
301        }
302    }
303
304    /// Attempts to acquire this lock with exclusive write access.
305    ///
306    /// If the access could not be granted at this time, an error is returned. Otherwise, a guard
307    /// is returned which will release the exclusive access when it is dropped. This method does
308    /// not provide any guarantees with respect to the ordering of whether contentious readers or
309    /// writers will acquire the lock first.
310    ///
311    /// # Errors
312    ///
313    /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
314    /// operation panics.
315    ///
316    /// # Examples
317    ///
318    /// ```
319    /// use crossbeam_utils::sync::ShardedLock;
320    ///
321    /// let lock = ShardedLock::new(1);
322    ///
323    /// let n = lock.read().unwrap();
324    /// assert_eq!(*n, 1);
325    ///
326    /// assert!(lock.try_write().is_err());
327    /// ```
328    pub fn try_write(&self) -> TryLockResult<ShardedLockWriteGuard<'_, T>> {
329        let mut poisoned = false;
330        let mut blocked = None;
331
332        // Write-lock each shard in succession.
333        for (i, shard) in self.shards.iter().enumerate() {
334            let guard = match shard.lock.try_write() {
335                Ok(guard) => guard,
336                Err(TryLockError::Poisoned(err)) => {
337                    poisoned = true;
338                    err.into_inner()
339                }
340                Err(TryLockError::WouldBlock) => {
341                    blocked = Some(i);
342                    break;
343                }
344            };
345
346            // Store the guard into the shard.
347            unsafe {
348                let guard: RwLockWriteGuard<'static, ()> = mem::transmute(guard);
349                let dest: *mut _ = shard.write_guard.get();
350                *dest = Some(guard);
351            }
352        }
353
354        if let Some(i) = blocked {
355            // Unlock the shards in reverse order of locking.
356            for shard in self.shards[0..i].iter().rev() {
357                unsafe {
358                    let dest: *mut _ = shard.write_guard.get();
359                    let guard = mem::replace(&mut *dest, None);
360                    drop(guard);
361                }
362            }
363            Err(TryLockError::WouldBlock)
364        } else if poisoned {
365            let guard = ShardedLockWriteGuard {
366                lock: self,
367                _marker: PhantomData,
368            };
369            Err(TryLockError::Poisoned(PoisonError::new(guard)))
370        } else {
371            Ok(ShardedLockWriteGuard {
372                lock: self,
373                _marker: PhantomData,
374            })
375        }
376    }
377
378    /// Locks with exclusive write access, blocking the current thread until it can be acquired.
379    ///
380    /// The calling thread will be blocked until there are no more writers which hold the lock.
381    /// There may be other readers currently inside the lock when this method returns. This method
382    /// does not provide any guarantees with respect to the ordering of whether contentious readers
383    /// or writers will acquire the lock first.
384    ///
385    /// Returns a guard which will release the exclusive access when dropped.
386    ///
387    /// # Errors
388    ///
389    /// This method will return an error if the lock is poisoned. A lock gets poisoned when a write
390    /// operation panics.
391    ///
392    /// # Panics
393    ///
394    /// This method might panic when called if the lock is already held by the current thread.
395    ///
396    /// # Examples
397    ///
398    /// ```
399    /// use crossbeam_utils::sync::ShardedLock;
400    ///
401    /// let lock = ShardedLock::new(1);
402    ///
403    /// let mut n = lock.write().unwrap();
404    /// *n = 2;
405    ///
406    /// assert!(lock.try_read().is_err());
407    /// ```
408    pub fn write(&self) -> LockResult<ShardedLockWriteGuard<'_, T>> {
409        let mut poisoned = false;
410
411        // Write-lock each shard in succession.
412        for shard in self.shards.iter() {
413            let guard = match shard.lock.write() {
414                Ok(guard) => guard,
415                Err(err) => {
416                    poisoned = true;
417                    err.into_inner()
418                }
419            };
420
421            // Store the guard into the shard.
422            unsafe {
423                let guard: RwLockWriteGuard<'_, ()> = guard;
424                let guard: RwLockWriteGuard<'static, ()> = mem::transmute(guard);
425                let dest: *mut _ = shard.write_guard.get();
426                *dest = Some(guard);
427            }
428        }
429
430        if poisoned {
431            Err(PoisonError::new(ShardedLockWriteGuard {
432                lock: self,
433                _marker: PhantomData,
434            }))
435        } else {
436            Ok(ShardedLockWriteGuard {
437                lock: self,
438                _marker: PhantomData,
439            })
440        }
441    }
442}
443
444impl<T: ?Sized + fmt::Debug> fmt::Debug for ShardedLock<T> {
445    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
446        match self.try_read() {
447            Ok(guard) => f
448                .debug_struct("ShardedLock")
449                .field("data", &&*guard)
450                .finish(),
451            Err(TryLockError::Poisoned(err)) => f
452                .debug_struct("ShardedLock")
453                .field("data", &&**err.get_ref())
454                .finish(),
455            Err(TryLockError::WouldBlock) => {
456                struct LockedPlaceholder;
457                impl fmt::Debug for LockedPlaceholder {
458                    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
459                        f.write_str("<locked>")
460                    }
461                }
462                f.debug_struct("ShardedLock")
463                    .field("data", &LockedPlaceholder)
464                    .finish()
465            }
466        }
467    }
468}
469
470impl<T: Default> Default for ShardedLock<T> {
471    fn default() -> ShardedLock<T> {
472        ShardedLock::new(Default::default())
473    }
474}
475
476impl<T> From<T> for ShardedLock<T> {
477    fn from(t: T) -> Self {
478        ShardedLock::new(t)
479    }
480}
481
482/// A guard used to release the shared read access of a [`ShardedLock`] when dropped.
483pub struct ShardedLockReadGuard<'a, T: ?Sized> {
484    lock: &'a ShardedLock<T>,
485    _guard: RwLockReadGuard<'a, ()>,
486    _marker: PhantomData<RwLockReadGuard<'a, T>>,
487}
488
489unsafe impl<T: ?Sized + Sync> Sync for ShardedLockReadGuard<'_, T> {}
490
491impl<T: ?Sized> Deref for ShardedLockReadGuard<'_, T> {
492    type Target = T;
493
494    fn deref(&self) -> &T {
495        unsafe { &*self.lock.value.get() }
496    }
497}
498
499impl<T: fmt::Debug> fmt::Debug for ShardedLockReadGuard<'_, T> {
500    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
501        f.debug_struct("ShardedLockReadGuard")
502            .field("lock", &self.lock)
503            .finish()
504    }
505}
506
507impl<T: ?Sized + fmt::Display> fmt::Display for ShardedLockReadGuard<'_, T> {
508    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
509        (**self).fmt(f)
510    }
511}
512
513/// A guard used to release the exclusive write access of a [`ShardedLock`] when dropped.
514pub struct ShardedLockWriteGuard<'a, T: ?Sized> {
515    lock: &'a ShardedLock<T>,
516    _marker: PhantomData<RwLockWriteGuard<'a, T>>,
517}
518
519unsafe impl<T: ?Sized + Sync> Sync for ShardedLockWriteGuard<'_, T> {}
520
521impl<T: ?Sized> Drop for ShardedLockWriteGuard<'_, T> {
522    fn drop(&mut self) {
523        // Unlock the shards in reverse order of locking.
524        for shard in self.lock.shards.iter().rev() {
525            unsafe {
526                let dest: *mut _ = shard.write_guard.get();
527                let guard = mem::replace(&mut *dest, None);
528                drop(guard);
529            }
530        }
531    }
532}
533
534impl<T: fmt::Debug> fmt::Debug for ShardedLockWriteGuard<'_, T> {
535    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
536        f.debug_struct("ShardedLockWriteGuard")
537            .field("lock", &self.lock)
538            .finish()
539    }
540}
541
542impl<T: ?Sized + fmt::Display> fmt::Display for ShardedLockWriteGuard<'_, T> {
543    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
544        (**self).fmt(f)
545    }
546}
547
548impl<T: ?Sized> Deref for ShardedLockWriteGuard<'_, T> {
549    type Target = T;
550
551    fn deref(&self) -> &T {
552        unsafe { &*self.lock.value.get() }
553    }
554}
555
556impl<T: ?Sized> DerefMut for ShardedLockWriteGuard<'_, T> {
557    fn deref_mut(&mut self) -> &mut T {
558        unsafe { &mut *self.lock.value.get() }
559    }
560}
561
562/// Returns a `usize` that identifies the current thread.
563///
564/// Each thread is associated with an 'index'. While there are no particular guarantees, indices
565/// usually tend to be consecutive numbers between 0 and the number of running threads.
566///
567/// Since this function accesses TLS, `None` might be returned if the current thread's TLS is
568/// tearing down.
569#[inline]
570fn current_index() -> Option<usize> {
571    REGISTRATION.try_with(|reg| reg.index).ok()
572}
573
574/// The global registry keeping track of registered threads and indices.
575struct ThreadIndices {
576    /// Mapping from `ThreadId` to thread index.
577    mapping: HashMap<ThreadId, usize>,
578
579    /// A list of free indices.
580    free_list: Vec<usize>,
581
582    /// The next index to allocate if the free list is empty.
583    next_index: usize,
584}
585
586fn thread_indices() -> &'static Mutex<ThreadIndices> {
587    static THREAD_INDICES: OnceLock<Mutex<ThreadIndices>> = OnceLock::new();
588    fn init() -> Mutex<ThreadIndices> {
589        Mutex::new(ThreadIndices {
590            mapping: HashMap::new(),
591            free_list: Vec::new(),
592            next_index: 0,
593        })
594    }
595    THREAD_INDICES.get_or_init(init)
596}
597
598/// A registration of a thread with an index.
599///
600/// When dropped, unregisters the thread and frees the reserved index.
601struct Registration {
602    index: usize,
603    thread_id: ThreadId,
604}
605
606impl Drop for Registration {
607    fn drop(&mut self) {
608        let mut indices = thread_indices().lock().unwrap();
609        indices.mapping.remove(&self.thread_id);
610        indices.free_list.push(self.index);
611    }
612}
613
614thread_local! {
615    static REGISTRATION: Registration = {
616        let thread_id = thread::current().id();
617        let mut indices = thread_indices().lock().unwrap();
618
619        let index = match indices.free_list.pop() {
620            Some(i) => i,
621            None => {
622                let i = indices.next_index;
623                indices.next_index += 1;
624                i
625            }
626        };
627        indices.mapping.insert(thread_id, index);
628
629        Registration {
630            index,
631            thread_id,
632        }
633    };
634}