async_lock/semaphore.rs
1use std::future::Future;
2use std::sync::atomic::{AtomicUsize, Ordering};
3use std::sync::Arc;
4
5use event_listener::Event;
6
7/// A counter for limiting the number of concurrent operations.
8#[derive(Debug)]
9pub struct Semaphore {
10 count: AtomicUsize,
11 event: Event,
12}
13
14impl Semaphore {
15 /// Creates a new semaphore with a limit of `n` concurrent operations.
16 ///
17 /// # Examples
18 ///
19 /// ```
20 /// use async_lock::Semaphore;
21 ///
22 /// let s = Semaphore::new(5);
23 /// ```
24 pub const fn new(n: usize) -> Semaphore {
25 Semaphore {
26 count: AtomicUsize::new(n),
27 event: Event::new(),
28 }
29 }
30
31 /// Attempts to get a permit for a concurrent operation.
32 ///
33 /// If the permit could not be acquired at this time, then [`None`] is returned. Otherwise, a
34 /// guard is returned that releases the mutex when dropped.
35 ///
36 /// # Examples
37 ///
38 /// ```
39 /// use async_lock::Semaphore;
40 ///
41 /// let s = Semaphore::new(2);
42 ///
43 /// let g1 = s.try_acquire().unwrap();
44 /// let g2 = s.try_acquire().unwrap();
45 ///
46 /// assert!(s.try_acquire().is_none());
47 /// drop(g2);
48 /// assert!(s.try_acquire().is_some());
49 /// ```
50 pub fn try_acquire(&self) -> Option<SemaphoreGuard<'_>> {
51 let mut count = self.count.load(Ordering::Acquire);
52 loop {
53 if count == 0 {
54 return None;
55 }
56
57 match self.count.compare_exchange_weak(
58 count,
59 count - 1,
60 Ordering::AcqRel,
61 Ordering::Acquire,
62 ) {
63 Ok(_) => return Some(SemaphoreGuard(self)),
64 Err(c) => count = c,
65 }
66 }
67 }
68
69 /// Waits for a permit for a concurrent operation.
70 ///
71 /// Returns a guard that releases the permit when dropped.
72 ///
73 /// # Examples
74 ///
75 /// ```
76 /// # futures_lite::future::block_on(async {
77 /// use async_lock::Semaphore;
78 ///
79 /// let s = Semaphore::new(2);
80 /// let guard = s.acquire().await;
81 /// # });
82 /// ```
83 pub async fn acquire(&self) -> SemaphoreGuard<'_> {
84 let mut listener = None;
85
86 loop {
87 if let Some(guard) = self.try_acquire() {
88 return guard;
89 }
90
91 match listener.take() {
92 None => listener = Some(self.event.listen()),
93 Some(l) => l.await,
94 }
95 }
96 }
97}
98
99impl Semaphore {
100 /// Attempts to get an owned permit for a concurrent operation.
101 ///
102 /// If the permit could not be acquired at this time, then [`None`] is returned. Otherwise, an
103 /// owned guard is returned that releases the mutex when dropped.
104 ///
105 /// # Examples
106 ///
107 /// ```
108 /// use async_lock::Semaphore;
109 /// use std::sync::Arc;
110 ///
111 /// let s = Arc::new(Semaphore::new(2));
112 ///
113 /// let g1 = s.try_acquire_arc().unwrap();
114 /// let g2 = s.try_acquire_arc().unwrap();
115 ///
116 /// assert!(s.try_acquire_arc().is_none());
117 /// drop(g2);
118 /// assert!(s.try_acquire_arc().is_some());
119 /// ```
120 pub fn try_acquire_arc(self: &Arc<Self>) -> Option<SemaphoreGuardArc> {
121 let mut count = self.count.load(Ordering::Acquire);
122 loop {
123 if count == 0 {
124 return None;
125 }
126
127 match self.count.compare_exchange_weak(
128 count,
129 count - 1,
130 Ordering::AcqRel,
131 Ordering::Acquire,
132 ) {
133 Ok(_) => return Some(SemaphoreGuardArc(self.clone())),
134 Err(c) => count = c,
135 }
136 }
137 }
138
139 async fn acquire_arc_impl(self: Arc<Self>) -> SemaphoreGuardArc {
140 let mut listener = None;
141
142 loop {
143 if let Some(guard) = self.try_acquire_arc() {
144 return guard;
145 }
146
147 match listener.take() {
148 None => listener = Some(self.event.listen()),
149 Some(l) => l.await,
150 }
151 }
152 }
153
154 /// Waits for an owned permit for a concurrent operation.
155 ///
156 /// Returns a guard that releases the permit when dropped.
157 ///
158 /// # Examples
159 ///
160 /// ```
161 /// # futures_lite::future::block_on(async {
162 /// use async_lock::Semaphore;
163 /// use std::sync::Arc;
164 ///
165 /// let s = Arc::new(Semaphore::new(2));
166 /// let guard = s.acquire_arc().await;
167 /// # });
168 /// ```
169 pub fn acquire_arc(self: &Arc<Self>) -> impl Future<Output = SemaphoreGuardArc> {
170 self.clone().acquire_arc_impl()
171 }
172}
173
174/// A guard that releases the acquired permit.
175#[derive(Debug)]
176pub struct SemaphoreGuard<'a>(&'a Semaphore);
177
178impl Drop for SemaphoreGuard<'_> {
179 fn drop(&mut self) {
180 self.0.count.fetch_add(1, Ordering::AcqRel);
181 self.0.event.notify(1);
182 }
183}
184
185/// An owned guard that releases the acquired permit.
186#[derive(Debug)]
187pub struct SemaphoreGuardArc(Arc<Semaphore>);
188
189impl Drop for SemaphoreGuardArc {
190 fn drop(&mut self) {
191 self.0.count.fetch_add(1, Ordering::AcqRel);
192 self.0.event.notify(1);
193 }
194}