async_lock/
semaphore.rs

1use std::future::Future;
2use std::sync::atomic::{AtomicUsize, Ordering};
3use std::sync::Arc;
4
5use event_listener::Event;
6
7/// A counter for limiting the number of concurrent operations.
8#[derive(Debug)]
9pub struct Semaphore {
10    count: AtomicUsize,
11    event: Event,
12}
13
14impl Semaphore {
15    /// Creates a new semaphore with a limit of `n` concurrent operations.
16    ///
17    /// # Examples
18    ///
19    /// ```
20    /// use async_lock::Semaphore;
21    ///
22    /// let s = Semaphore::new(5);
23    /// ```
24    pub const fn new(n: usize) -> Semaphore {
25        Semaphore {
26            count: AtomicUsize::new(n),
27            event: Event::new(),
28        }
29    }
30
31    /// Attempts to get a permit for a concurrent operation.
32    ///
33    /// If the permit could not be acquired at this time, then [`None`] is returned. Otherwise, a
34    /// guard is returned that releases the mutex when dropped.
35    ///
36    /// # Examples
37    ///
38    /// ```
39    /// use async_lock::Semaphore;
40    ///
41    /// let s = Semaphore::new(2);
42    ///
43    /// let g1 = s.try_acquire().unwrap();
44    /// let g2 = s.try_acquire().unwrap();
45    ///
46    /// assert!(s.try_acquire().is_none());
47    /// drop(g2);
48    /// assert!(s.try_acquire().is_some());
49    /// ```
50    pub fn try_acquire(&self) -> Option<SemaphoreGuard<'_>> {
51        let mut count = self.count.load(Ordering::Acquire);
52        loop {
53            if count == 0 {
54                return None;
55            }
56
57            match self.count.compare_exchange_weak(
58                count,
59                count - 1,
60                Ordering::AcqRel,
61                Ordering::Acquire,
62            ) {
63                Ok(_) => return Some(SemaphoreGuard(self)),
64                Err(c) => count = c,
65            }
66        }
67    }
68
69    /// Waits for a permit for a concurrent operation.
70    ///
71    /// Returns a guard that releases the permit when dropped.
72    ///
73    /// # Examples
74    ///
75    /// ```
76    /// # futures_lite::future::block_on(async {
77    /// use async_lock::Semaphore;
78    ///
79    /// let s = Semaphore::new(2);
80    /// let guard = s.acquire().await;
81    /// # });
82    /// ```
83    pub async fn acquire(&self) -> SemaphoreGuard<'_> {
84        let mut listener = None;
85
86        loop {
87            if let Some(guard) = self.try_acquire() {
88                return guard;
89            }
90
91            match listener.take() {
92                None => listener = Some(self.event.listen()),
93                Some(l) => l.await,
94            }
95        }
96    }
97}
98
99impl Semaphore {
100    /// Attempts to get an owned permit for a concurrent operation.
101    ///
102    /// If the permit could not be acquired at this time, then [`None`] is returned. Otherwise, an
103    /// owned guard is returned that releases the mutex when dropped.
104    ///
105    /// # Examples
106    ///
107    /// ```
108    /// use async_lock::Semaphore;
109    /// use std::sync::Arc;
110    ///
111    /// let s = Arc::new(Semaphore::new(2));
112    ///
113    /// let g1 = s.try_acquire_arc().unwrap();
114    /// let g2 = s.try_acquire_arc().unwrap();
115    ///
116    /// assert!(s.try_acquire_arc().is_none());
117    /// drop(g2);
118    /// assert!(s.try_acquire_arc().is_some());
119    /// ```
120    pub fn try_acquire_arc(self: &Arc<Self>) -> Option<SemaphoreGuardArc> {
121        let mut count = self.count.load(Ordering::Acquire);
122        loop {
123            if count == 0 {
124                return None;
125            }
126
127            match self.count.compare_exchange_weak(
128                count,
129                count - 1,
130                Ordering::AcqRel,
131                Ordering::Acquire,
132            ) {
133                Ok(_) => return Some(SemaphoreGuardArc(self.clone())),
134                Err(c) => count = c,
135            }
136        }
137    }
138
139    async fn acquire_arc_impl(self: Arc<Self>) -> SemaphoreGuardArc {
140        let mut listener = None;
141
142        loop {
143            if let Some(guard) = self.try_acquire_arc() {
144                return guard;
145            }
146
147            match listener.take() {
148                None => listener = Some(self.event.listen()),
149                Some(l) => l.await,
150            }
151        }
152    }
153
154    /// Waits for an owned permit for a concurrent operation.
155    ///
156    /// Returns a guard that releases the permit when dropped.
157    ///
158    /// # Examples
159    ///
160    /// ```
161    /// # futures_lite::future::block_on(async {
162    /// use async_lock::Semaphore;
163    /// use std::sync::Arc;
164    ///
165    /// let s = Arc::new(Semaphore::new(2));
166    /// let guard = s.acquire_arc().await;
167    /// # });
168    /// ```
169    pub fn acquire_arc(self: &Arc<Self>) -> impl Future<Output = SemaphoreGuardArc> {
170        self.clone().acquire_arc_impl()
171    }
172}
173
174/// A guard that releases the acquired permit.
175#[derive(Debug)]
176pub struct SemaphoreGuard<'a>(&'a Semaphore);
177
178impl Drop for SemaphoreGuard<'_> {
179    fn drop(&mut self) {
180        self.0.count.fetch_add(1, Ordering::AcqRel);
181        self.0.event.notify(1);
182    }
183}
184
185/// An owned guard that releases the acquired permit.
186#[derive(Debug)]
187pub struct SemaphoreGuardArc(Arc<Semaphore>);
188
189impl Drop for SemaphoreGuardArc {
190    fn drop(&mut self) {
191        self.0.count.fetch_add(1, Ordering::AcqRel);
192        self.0.event.notify(1);
193    }
194}