tokio/sync/
semaphore.rs

1use super::batch_semaphore as ll; // low level implementation
2use super::{AcquireError, TryAcquireError};
3#[cfg(all(tokio_unstable, feature = "tracing"))]
4use crate::util::trace;
5use std::sync::Arc;
6
7/// Counting semaphore performing asynchronous permit acquisition.
8///
9/// A semaphore maintains a set of permits. Permits are used to synchronize
10/// access to a shared resource. A semaphore differs from a mutex in that it
11/// can allow more than one concurrent caller to access the shared resource at a
12/// time.
13///
14/// When `acquire` is called and the semaphore has remaining permits, the
15/// function immediately returns a permit. However, if no remaining permits are
16/// available, `acquire` (asynchronously) waits until an outstanding permit is
17/// dropped. At this point, the freed permit is assigned to the caller.
18///
19/// This `Semaphore` is fair, which means that permits are given out in the order
20/// they were requested. This fairness is also applied when `acquire_many` gets
21/// involved, so if a call to `acquire_many` at the front of the queue requests
22/// more permits than currently available, this can prevent a call to `acquire`
23/// from completing, even if the semaphore has enough permits complete the call
24/// to `acquire`.
25///
26/// To use the `Semaphore` in a poll function, you can use the [`PollSemaphore`]
27/// utility.
28///
29/// # Examples
30///
31/// Basic usage:
32///
33/// ```
34/// use tokio::sync::{Semaphore, TryAcquireError};
35///
36/// #[tokio::main]
37/// async fn main() {
38///     let semaphore = Semaphore::new(3);
39///
40///     let a_permit = semaphore.acquire().await.unwrap();
41///     let two_permits = semaphore.acquire_many(2).await.unwrap();
42///
43///     assert_eq!(semaphore.available_permits(), 0);
44///
45///     let permit_attempt = semaphore.try_acquire();
46///     assert_eq!(permit_attempt.err(), Some(TryAcquireError::NoPermits));
47/// }
48/// ```
49///
50/// ## Limit the number of simultaneously opened files in your program
51///
52/// Most operating systems have limits on the number of open file
53/// handles. Even in systems without explicit limits, resource constraints
54/// implicitly set an upper bound on the number of open files. If your
55/// program attempts to open a large number of files and exceeds this
56/// limit, it will result in an error.
57///
58/// This example uses a Semaphore with 100 permits. By acquiring a permit from
59/// the Semaphore before accessing a file, you ensure that your program opens
60/// no more than 100 files at a time. When trying to open the 101st
61/// file, the program will wait until a permit becomes available before
62/// proceeding to open another file.
63/// ```
64/// use std::io::Result;
65/// use tokio::fs::File;
66/// use tokio::sync::Semaphore;
67/// use tokio::io::AsyncWriteExt;
68///
69/// static PERMITS: Semaphore = Semaphore::const_new(100);
70///
71/// async fn write_to_file(message: &[u8]) -> Result<()> {
72///     let _permit = PERMITS.acquire().await.unwrap();
73///     let mut buffer = File::create("example.txt").await?;
74///     buffer.write_all(message).await?;
75///     Ok(()) // Permit goes out of scope here, and is available again for acquisition
76/// }
77/// ```
78///
79/// ## Limit the number of outgoing requests being sent at the same time
80///
81/// In some scenarios, it might be required to limit the number of outgoing
82/// requests being sent in parallel. This could be due to limits of a consumed
83/// API or the network resources of the system the application is running on.
84///
85/// This example uses an `Arc<Semaphore>` with 10 permits. Each task spawned is
86/// given a reference to the semaphore by cloning the `Arc<Semaphore>`. Before
87/// a task sends a request, it must acquire a permit from the semaphore by
88/// calling [`Semaphore::acquire`]. This ensures that at most 10 requests are
89/// sent in parallel at any given time. After a task has sent a request, it
90/// drops the permit to allow other tasks to send requests.
91///
92/// ```
93/// use std::sync::Arc;
94/// use tokio::sync::Semaphore;
95///
96/// #[tokio::main]
97/// async fn main() {
98///     // Define maximum number of parallel requests.
99///     let semaphore = Arc::new(Semaphore::new(10));
100///     // Spawn many tasks that will send requests.
101///     let mut jhs = Vec::new();
102///     for task_id in 0..100 {
103///         let semaphore = semaphore.clone();
104///         let jh = tokio::spawn(async move {
105///             // Acquire permit before sending request.
106///             let _permit = semaphore.acquire().await.unwrap();
107///             // Send the request.
108///             let response = send_request(task_id).await;
109///             // Drop the permit after the request has been sent.
110///             drop(_permit);
111///             // Handle response.
112///             // ...
113///
114///             response
115///         });
116///         jhs.push(jh);
117///     }
118///     // Collect responses from tasks.
119///     let mut responses = Vec::new();
120///     for jh in jhs {
121///         let response = jh.await.unwrap();
122///         responses.push(response);
123///     }
124///     // Process responses.
125///     // ...
126/// }
127/// # async fn send_request(task_id: usize) {
128/// #     // Send request.
129/// # }
130/// ```
131///
132/// ## Limit the number of incoming requests being handled at the same time
133///
134/// Similar to limiting the number of simultaneously opened files, network handles
135/// are a limited resource. Allowing an unbounded amount of requests to be processed
136/// could result in a denial-of-service, among many other issues.
137///
138/// This example uses an `Arc<Semaphore>` instead of a global variable.
139/// To limit the number of requests that can be processed at the time,
140/// we acquire a permit for each task before spawning it. Once acquired,
141/// a new task is spawned; and once finished, the permit is dropped inside
142/// of the task to allow others to spawn. Permits must be acquired via
143/// [`Semaphore::acquire_owned`] to be movable across the task boundary.
144/// (Since our semaphore is not a global variable — if it was, then `acquire` would be enough.)
145///
146/// ```no_run
147/// use std::sync::Arc;
148/// use tokio::sync::Semaphore;
149/// use tokio::net::TcpListener;
150///
151/// #[tokio::main]
152/// async fn main() -> std::io::Result<()> {
153///     let semaphore = Arc::new(Semaphore::new(3));
154///     let listener = TcpListener::bind("127.0.0.1:8080").await?;
155///
156///     loop {
157///         // Acquire permit before accepting the next socket.
158///         //
159///         // We use `acquire_owned` so that we can move `permit` into
160///         // other tasks.
161///         let permit = semaphore.clone().acquire_owned().await.unwrap();
162///         let (mut socket, _) = listener.accept().await?;
163///
164///         tokio::spawn(async move {
165///             // Do work using the socket.
166///             handle_connection(&mut socket).await;
167///             // Drop socket while the permit is still live.
168///             drop(socket);
169///             // Drop the permit, so more tasks can be created.
170///             drop(permit);
171///         });
172///     }
173/// }
174/// # async fn handle_connection(_socket: &mut tokio::net::TcpStream) {
175/// #   // Do work
176/// # }
177/// ```
178///
179/// ## Prevent tests from running in parallel
180///
181/// By default, Rust runs tests in the same file in parallel. However, in some
182/// cases, running two tests in parallel may lead to problems. For example, this
183/// can happen when tests use the same database.
184///
185/// Consider the following scenario:
186/// 1. `test_insert`: Inserts a key-value pair into the database, then retrieves
187///    the value using the same key to verify the insertion.
188/// 2. `test_update`: Inserts a key, then updates the key to a new value and
189///    verifies that the value has been accurately updated.
190/// 3. `test_others`: A third test that doesn't modify the database state. It
191///    can run in parallel with the other tests.
192///
193/// In this example, `test_insert` and `test_update` need to run in sequence to
194/// work, but it doesn't matter which test runs first. We can leverage a
195/// semaphore with a single permit to address this challenge.
196///
197/// ```
198/// # use tokio::sync::Mutex;
199/// # use std::collections::BTreeMap;
200/// # struct Database {
201/// #   map: Mutex<BTreeMap<String, i32>>,
202/// # }
203/// # impl Database {
204/// #    pub const fn setup() -> Database {
205/// #        Database {
206/// #            map: Mutex::const_new(BTreeMap::new()),
207/// #        }
208/// #    }
209/// #    pub async fn insert(&self, key: &str, value: i32) {
210/// #        self.map.lock().await.insert(key.to_string(), value);
211/// #    }
212/// #    pub async fn update(&self, key: &str, value: i32) {
213/// #        self.map.lock().await
214/// #            .entry(key.to_string())
215/// #            .and_modify(|origin| *origin = value);
216/// #    }
217/// #    pub async fn delete(&self, key: &str) {
218/// #        self.map.lock().await.remove(key);
219/// #    }
220/// #    pub async fn get(&self, key: &str) -> i32 {
221/// #        *self.map.lock().await.get(key).unwrap()
222/// #    }
223/// # }
224/// use tokio::sync::Semaphore;
225///
226/// // Initialize a static semaphore with only one permit, which is used to
227/// // prevent test_insert and test_update from running in parallel.
228/// static PERMIT: Semaphore = Semaphore::const_new(1);
229///
230/// // Initialize the database that will be used by the subsequent tests.
231/// static DB: Database = Database::setup();
232///
233/// #[tokio::test]
234/// # async fn fake_test_insert() {}
235/// async fn test_insert() {
236///     // Acquire permit before proceeding. Since the semaphore has only one permit,
237///     // the test will wait if the permit is already acquired by other tests.
238///     let permit = PERMIT.acquire().await.unwrap();
239///
240///     // Do the actual test stuff with database
241///
242///     // Insert a key-value pair to database
243///     let (key, value) = ("name", 0);
244///     DB.insert(key, value).await;
245///
246///     // Verify that the value has been inserted correctly.
247///     assert_eq!(DB.get(key).await, value);
248///
249///     // Undo the insertion, so the database is empty at the end of the test.
250///     DB.delete(key).await;
251///
252///     // Drop permit. This allows the other test to start running.
253///     drop(permit);
254/// }
255///
256/// #[tokio::test]
257/// # async fn fake_test_update() {}
258/// async fn test_update() {
259///     // Acquire permit before proceeding. Since the semaphore has only one permit,
260///     // the test will wait if the permit is already acquired by other tests.
261///     let permit = PERMIT.acquire().await.unwrap();
262///
263///     // Do the same insert.
264///     let (key, value) = ("name", 0);
265///     DB.insert(key, value).await;
266///
267///     // Update the existing value with a new one.
268///     let new_value = 1;
269///     DB.update(key, new_value).await;
270///
271///     // Verify that the value has been updated correctly.
272///     assert_eq!(DB.get(key).await, new_value);
273///
274///     // Undo any modificattion.
275///     DB.delete(key).await;
276///
277///     // Drop permit. This allows the other test to start running.
278///     drop(permit);
279/// }
280///
281/// #[tokio::test]
282/// # async fn fake_test_others() {}
283/// async fn test_others() {
284///     // This test can run in parallel with test_insert and test_update,
285///     // so it does not use PERMIT.
286/// }
287/// # #[tokio::main(flavor = "current_thread")]
288/// # async fn main() {
289/// #   test_insert().await;
290/// #   test_update().await;
291/// #   test_others().await;
292/// # }
293/// ```
294///
295/// ## Rate limiting using a token bucket
296///
297/// This example showcases the [`add_permits`] and [`SemaphorePermit::forget`] methods.
298///
299/// Many applications and systems have constraints on the rate at which certain
300/// operations should occur. Exceeding this rate can result in suboptimal
301/// performance or even errors.
302///
303/// This example implements rate limiting using a [token bucket]. A token bucket is a form of rate
304/// limiting that doesn't kick in immediately, to allow for short bursts of incoming requests that
305/// arrive at the same time.
306///
307/// With a token bucket, each incoming request consumes a token, and the tokens are refilled at a
308/// certain rate that defines the rate limit. When a burst of requests arrives, tokens are
309/// immediately given out until the bucket is empty. Once the bucket is empty, requests will have to
310/// wait for new tokens to be added.
311///
312/// Unlike the example that limits how many requests can be handled at the same time, we do not add
313/// tokens back when we finish handling a request. Instead, tokens are added only by a timer task.
314///
315/// Note that this implementation is suboptimal when the duration is small, because it consumes a
316/// lot of cpu constantly looping and sleeping.
317///
318/// [token bucket]: https://en.wikipedia.org/wiki/Token_bucket
319/// [`add_permits`]: crate::sync::Semaphore::add_permits
320/// [`SemaphorePermit::forget`]: crate::sync::SemaphorePermit::forget
321/// ```
322/// use std::sync::Arc;
323/// use tokio::sync::Semaphore;
324/// use tokio::time::{interval, Duration};
325///
326/// struct TokenBucket {
327///     sem: Arc<Semaphore>,
328///     jh: tokio::task::JoinHandle<()>,
329/// }
330///
331/// impl TokenBucket {
332///     fn new(duration: Duration, capacity: usize) -> Self {
333///         let sem = Arc::new(Semaphore::new(capacity));
334///
335///         // refills the tokens at the end of each interval
336///         let jh = tokio::spawn({
337///             let sem = sem.clone();
338///             let mut interval = interval(duration);
339///             interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
340///
341///             async move {
342///                 loop {
343///                     interval.tick().await;
344///
345///                     if sem.available_permits() < capacity {
346///                         sem.add_permits(1);
347///                     }
348///                 }
349///             }
350///         });
351///
352///         Self { jh, sem }
353///     }
354///
355///     async fn acquire(&self) {
356///         // This can return an error if the semaphore is closed, but we
357///         // never close it, so this error can never happen.
358///         let permit = self.sem.acquire().await.unwrap();
359///         // To avoid releasing the permit back to the semaphore, we use
360///         // the `SemaphorePermit::forget` method.
361///         permit.forget();
362///     }
363/// }
364///
365/// impl Drop for TokenBucket {
366///     fn drop(&mut self) {
367///         // Kill the background task so it stops taking up resources when we
368///         // don't need it anymore.
369///         self.jh.abort();
370///     }
371/// }
372///
373/// #[tokio::main]
374/// # async fn _hidden() {}
375/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
376/// async fn main() {
377///     let capacity = 5;
378///     let update_interval = Duration::from_secs_f32(1.0 / capacity as f32);
379///     let bucket = TokenBucket::new(update_interval, capacity);
380///
381///     for _ in 0..5 {
382///         bucket.acquire().await;
383///
384///         // do the operation
385///     }
386/// }
387/// ```
388///
389/// [`PollSemaphore`]: https://docs.rs/tokio-util/latest/tokio_util/sync/struct.PollSemaphore.html
390/// [`Semaphore::acquire_owned`]: crate::sync::Semaphore::acquire_owned
391#[derive(Debug)]
392pub struct Semaphore {
393    /// The low level semaphore
394    ll_sem: ll::Semaphore,
395    #[cfg(all(tokio_unstable, feature = "tracing"))]
396    resource_span: tracing::Span,
397}
398
399/// A permit from the semaphore.
400///
401/// This type is created by the [`acquire`] method.
402///
403/// [`acquire`]: crate::sync::Semaphore::acquire()
404#[must_use]
405#[clippy::has_significant_drop]
406#[derive(Debug)]
407pub struct SemaphorePermit<'a> {
408    sem: &'a Semaphore,
409    permits: u32,
410}
411
412/// An owned permit from the semaphore.
413///
414/// This type is created by the [`acquire_owned`] method.
415///
416/// [`acquire_owned`]: crate::sync::Semaphore::acquire_owned()
417#[must_use]
418#[clippy::has_significant_drop]
419#[derive(Debug)]
420pub struct OwnedSemaphorePermit {
421    sem: Arc<Semaphore>,
422    permits: u32,
423}
424
425#[test]
426#[cfg(not(loom))]
427fn bounds() {
428    fn check_unpin<T: Unpin>() {}
429    // This has to take a value, since the async fn's return type is unnameable.
430    fn check_send_sync_val<T: Send + Sync>(_t: T) {}
431    fn check_send_sync<T: Send + Sync>() {}
432    check_unpin::<Semaphore>();
433    check_unpin::<SemaphorePermit<'_>>();
434    check_send_sync::<Semaphore>();
435
436    let semaphore = Semaphore::new(0);
437    check_send_sync_val(semaphore.acquire());
438}
439
440impl Semaphore {
441    /// The maximum number of permits which a semaphore can hold. It is `usize::MAX >> 3`.
442    ///
443    /// Exceeding this limit typically results in a panic.
444    pub const MAX_PERMITS: usize = super::batch_semaphore::Semaphore::MAX_PERMITS;
445
446    /// Creates a new semaphore with the initial number of permits.
447    ///
448    /// Panics if `permits` exceeds [`Semaphore::MAX_PERMITS`].
449    #[track_caller]
450    pub fn new(permits: usize) -> Self {
451        #[cfg(all(tokio_unstable, feature = "tracing"))]
452        let resource_span = {
453            let location = std::panic::Location::caller();
454
455            tracing::trace_span!(
456                parent: None,
457                "runtime.resource",
458                concrete_type = "Semaphore",
459                kind = "Sync",
460                loc.file = location.file(),
461                loc.line = location.line(),
462                loc.col = location.column(),
463                inherits_child_attrs = true,
464            )
465        };
466
467        #[cfg(all(tokio_unstable, feature = "tracing"))]
468        let ll_sem = resource_span.in_scope(|| ll::Semaphore::new(permits));
469
470        #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
471        let ll_sem = ll::Semaphore::new(permits);
472
473        Self {
474            ll_sem,
475            #[cfg(all(tokio_unstable, feature = "tracing"))]
476            resource_span,
477        }
478    }
479
480    /// Creates a new semaphore with the initial number of permits.
481    ///
482    /// When using the `tracing` [unstable feature], a `Semaphore` created with
483    /// `const_new` will not be instrumented. As such, it will not be visible
484    /// in [`tokio-console`]. Instead, [`Semaphore::new`] should be used to
485    /// create an instrumented object if that is needed.
486    ///
487    /// # Examples
488    ///
489    /// ```
490    /// use tokio::sync::Semaphore;
491    ///
492    /// static SEM: Semaphore = Semaphore::const_new(10);
493    /// ```
494    ///
495    /// [`tokio-console`]: https://github.com/tokio-rs/console
496    /// [unstable feature]: crate#unstable-features
497    #[cfg(not(all(loom, test)))]
498    pub const fn const_new(permits: usize) -> Self {
499        Self {
500            ll_sem: ll::Semaphore::const_new(permits),
501            #[cfg(all(tokio_unstable, feature = "tracing"))]
502            resource_span: tracing::Span::none(),
503        }
504    }
505
506    /// Creates a new closed semaphore with 0 permits.
507    pub(crate) fn new_closed() -> Self {
508        Self {
509            ll_sem: ll::Semaphore::new_closed(),
510            #[cfg(all(tokio_unstable, feature = "tracing"))]
511            resource_span: tracing::Span::none(),
512        }
513    }
514
515    /// Creates a new closed semaphore with 0 permits.
516    #[cfg(not(all(loom, test)))]
517    pub(crate) const fn const_new_closed() -> Self {
518        Self {
519            ll_sem: ll::Semaphore::const_new_closed(),
520            #[cfg(all(tokio_unstable, feature = "tracing"))]
521            resource_span: tracing::Span::none(),
522        }
523    }
524
525    /// Returns the current number of available permits.
526    pub fn available_permits(&self) -> usize {
527        self.ll_sem.available_permits()
528    }
529
530    /// Adds `n` new permits to the semaphore.
531    ///
532    /// The maximum number of permits is [`Semaphore::MAX_PERMITS`], and this function will panic if the limit is exceeded.
533    pub fn add_permits(&self, n: usize) {
534        self.ll_sem.release(n);
535    }
536
537    /// Decrease a semaphore's permits by a maximum of `n`.
538    ///
539    /// If there are insufficient permits and it's not possible to reduce by `n`,
540    /// return the number of permits that were actually reduced.
541    pub fn forget_permits(&self, n: usize) -> usize {
542        self.ll_sem.forget_permits(n)
543    }
544
545    /// Acquires a permit from the semaphore.
546    ///
547    /// If the semaphore has been closed, this returns an [`AcquireError`].
548    /// Otherwise, this returns a [`SemaphorePermit`] representing the
549    /// acquired permit.
550    ///
551    /// # Cancel safety
552    ///
553    /// This method uses a queue to fairly distribute permits in the order they
554    /// were requested. Cancelling a call to `acquire` makes you lose your place
555    /// in the queue.
556    ///
557    /// # Examples
558    ///
559    /// ```
560    /// use tokio::sync::Semaphore;
561    ///
562    /// #[tokio::main]
563    /// async fn main() {
564    ///     let semaphore = Semaphore::new(2);
565    ///
566    ///     let permit_1 = semaphore.acquire().await.unwrap();
567    ///     assert_eq!(semaphore.available_permits(), 1);
568    ///
569    ///     let permit_2 = semaphore.acquire().await.unwrap();
570    ///     assert_eq!(semaphore.available_permits(), 0);
571    ///
572    ///     drop(permit_1);
573    ///     assert_eq!(semaphore.available_permits(), 1);
574    /// }
575    /// ```
576    ///
577    /// [`AcquireError`]: crate::sync::AcquireError
578    /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
579    pub async fn acquire(&self) -> Result<SemaphorePermit<'_>, AcquireError> {
580        #[cfg(all(tokio_unstable, feature = "tracing"))]
581        let inner = trace::async_op(
582            || self.ll_sem.acquire(1),
583            self.resource_span.clone(),
584            "Semaphore::acquire",
585            "poll",
586            true,
587        );
588        #[cfg(not(all(tokio_unstable, feature = "tracing")))]
589        let inner = self.ll_sem.acquire(1);
590
591        inner.await?;
592        Ok(SemaphorePermit {
593            sem: self,
594            permits: 1,
595        })
596    }
597
598    /// Acquires `n` permits from the semaphore.
599    ///
600    /// If the semaphore has been closed, this returns an [`AcquireError`].
601    /// Otherwise, this returns a [`SemaphorePermit`] representing the
602    /// acquired permits.
603    ///
604    /// # Cancel safety
605    ///
606    /// This method uses a queue to fairly distribute permits in the order they
607    /// were requested. Cancelling a call to `acquire_many` makes you lose your
608    /// place in the queue.
609    ///
610    /// # Examples
611    ///
612    /// ```
613    /// use tokio::sync::Semaphore;
614    ///
615    /// #[tokio::main]
616    /// async fn main() {
617    ///     let semaphore = Semaphore::new(5);
618    ///
619    ///     let permit = semaphore.acquire_many(3).await.unwrap();
620    ///     assert_eq!(semaphore.available_permits(), 2);
621    /// }
622    /// ```
623    ///
624    /// [`AcquireError`]: crate::sync::AcquireError
625    /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
626    pub async fn acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, AcquireError> {
627        #[cfg(all(tokio_unstable, feature = "tracing"))]
628        trace::async_op(
629            || self.ll_sem.acquire(n as usize),
630            self.resource_span.clone(),
631            "Semaphore::acquire_many",
632            "poll",
633            true,
634        )
635        .await?;
636
637        #[cfg(not(all(tokio_unstable, feature = "tracing")))]
638        self.ll_sem.acquire(n as usize).await?;
639
640        Ok(SemaphorePermit {
641            sem: self,
642            permits: n,
643        })
644    }
645
646    /// Tries to acquire a permit from the semaphore.
647    ///
648    /// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
649    /// and a [`TryAcquireError::NoPermits`] if there are no permits left. Otherwise,
650    /// this returns a [`SemaphorePermit`] representing the acquired permits.
651    ///
652    /// # Examples
653    ///
654    /// ```
655    /// use tokio::sync::{Semaphore, TryAcquireError};
656    ///
657    /// # fn main() {
658    /// let semaphore = Semaphore::new(2);
659    ///
660    /// let permit_1 = semaphore.try_acquire().unwrap();
661    /// assert_eq!(semaphore.available_permits(), 1);
662    ///
663    /// let permit_2 = semaphore.try_acquire().unwrap();
664    /// assert_eq!(semaphore.available_permits(), 0);
665    ///
666    /// let permit_3 = semaphore.try_acquire();
667    /// assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));
668    /// # }
669    /// ```
670    ///
671    /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
672    /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
673    /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
674    pub fn try_acquire(&self) -> Result<SemaphorePermit<'_>, TryAcquireError> {
675        match self.ll_sem.try_acquire(1) {
676            Ok(()) => Ok(SemaphorePermit {
677                sem: self,
678                permits: 1,
679            }),
680            Err(e) => Err(e),
681        }
682    }
683
684    /// Tries to acquire `n` permits from the semaphore.
685    ///
686    /// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
687    /// and a [`TryAcquireError::NoPermits`] if there are not enough permits left.
688    /// Otherwise, this returns a [`SemaphorePermit`] representing the acquired permits.
689    ///
690    /// # Examples
691    ///
692    /// ```
693    /// use tokio::sync::{Semaphore, TryAcquireError};
694    ///
695    /// # fn main() {
696    /// let semaphore = Semaphore::new(4);
697    ///
698    /// let permit_1 = semaphore.try_acquire_many(3).unwrap();
699    /// assert_eq!(semaphore.available_permits(), 1);
700    ///
701    /// let permit_2 = semaphore.try_acquire_many(2);
702    /// assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));
703    /// # }
704    /// ```
705    ///
706    /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
707    /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
708    /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
709    pub fn try_acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, TryAcquireError> {
710        match self.ll_sem.try_acquire(n as usize) {
711            Ok(()) => Ok(SemaphorePermit {
712                sem: self,
713                permits: n,
714            }),
715            Err(e) => Err(e),
716        }
717    }
718
719    /// Acquires a permit from the semaphore.
720    ///
721    /// The semaphore must be wrapped in an [`Arc`] to call this method.
722    /// If the semaphore has been closed, this returns an [`AcquireError`].
723    /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
724    /// acquired permit.
725    ///
726    /// # Cancel safety
727    ///
728    /// This method uses a queue to fairly distribute permits in the order they
729    /// were requested. Cancelling a call to `acquire_owned` makes you lose your
730    /// place in the queue.
731    ///
732    /// # Examples
733    ///
734    /// ```
735    /// use std::sync::Arc;
736    /// use tokio::sync::Semaphore;
737    ///
738    /// #[tokio::main]
739    /// async fn main() {
740    ///     let semaphore = Arc::new(Semaphore::new(3));
741    ///     let mut join_handles = Vec::new();
742    ///
743    ///     for _ in 0..5 {
744    ///         let permit = semaphore.clone().acquire_owned().await.unwrap();
745    ///         join_handles.push(tokio::spawn(async move {
746    ///             // perform task...
747    ///             // explicitly own `permit` in the task
748    ///             drop(permit);
749    ///         }));
750    ///     }
751    ///
752    ///     for handle in join_handles {
753    ///         handle.await.unwrap();
754    ///     }
755    /// }
756    /// ```
757    ///
758    /// [`Arc`]: std::sync::Arc
759    /// [`AcquireError`]: crate::sync::AcquireError
760    /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
761    pub async fn acquire_owned(self: Arc<Self>) -> Result<OwnedSemaphorePermit, AcquireError> {
762        #[cfg(all(tokio_unstable, feature = "tracing"))]
763        let inner = trace::async_op(
764            || self.ll_sem.acquire(1),
765            self.resource_span.clone(),
766            "Semaphore::acquire_owned",
767            "poll",
768            true,
769        );
770        #[cfg(not(all(tokio_unstable, feature = "tracing")))]
771        let inner = self.ll_sem.acquire(1);
772
773        inner.await?;
774        Ok(OwnedSemaphorePermit {
775            sem: self,
776            permits: 1,
777        })
778    }
779
780    /// Acquires `n` permits from the semaphore.
781    ///
782    /// The semaphore must be wrapped in an [`Arc`] to call this method.
783    /// If the semaphore has been closed, this returns an [`AcquireError`].
784    /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
785    /// acquired permit.
786    ///
787    /// # Cancel safety
788    ///
789    /// This method uses a queue to fairly distribute permits in the order they
790    /// were requested. Cancelling a call to `acquire_many_owned` makes you lose
791    /// your place in the queue.
792    ///
793    /// # Examples
794    ///
795    /// ```
796    /// use std::sync::Arc;
797    /// use tokio::sync::Semaphore;
798    ///
799    /// #[tokio::main]
800    /// async fn main() {
801    ///     let semaphore = Arc::new(Semaphore::new(10));
802    ///     let mut join_handles = Vec::new();
803    ///
804    ///     for _ in 0..5 {
805    ///         let permit = semaphore.clone().acquire_many_owned(2).await.unwrap();
806    ///         join_handles.push(tokio::spawn(async move {
807    ///             // perform task...
808    ///             // explicitly own `permit` in the task
809    ///             drop(permit);
810    ///         }));
811    ///     }
812    ///
813    ///     for handle in join_handles {
814    ///         handle.await.unwrap();
815    ///     }
816    /// }
817    /// ```
818    ///
819    /// [`Arc`]: std::sync::Arc
820    /// [`AcquireError`]: crate::sync::AcquireError
821    /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
822    pub async fn acquire_many_owned(
823        self: Arc<Self>,
824        n: u32,
825    ) -> Result<OwnedSemaphorePermit, AcquireError> {
826        #[cfg(all(tokio_unstable, feature = "tracing"))]
827        let inner = trace::async_op(
828            || self.ll_sem.acquire(n as usize),
829            self.resource_span.clone(),
830            "Semaphore::acquire_many_owned",
831            "poll",
832            true,
833        );
834        #[cfg(not(all(tokio_unstable, feature = "tracing")))]
835        let inner = self.ll_sem.acquire(n as usize);
836
837        inner.await?;
838        Ok(OwnedSemaphorePermit {
839            sem: self,
840            permits: n,
841        })
842    }
843
844    /// Tries to acquire a permit from the semaphore.
845    ///
846    /// The semaphore must be wrapped in an [`Arc`] to call this method. If
847    /// the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
848    /// and a [`TryAcquireError::NoPermits`] if there are no permits left.
849    /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
850    /// acquired permit.
851    ///
852    /// # Examples
853    ///
854    /// ```
855    /// use std::sync::Arc;
856    /// use tokio::sync::{Semaphore, TryAcquireError};
857    ///
858    /// # fn main() {
859    /// let semaphore = Arc::new(Semaphore::new(2));
860    ///
861    /// let permit_1 = Arc::clone(&semaphore).try_acquire_owned().unwrap();
862    /// assert_eq!(semaphore.available_permits(), 1);
863    ///
864    /// let permit_2 = Arc::clone(&semaphore).try_acquire_owned().unwrap();
865    /// assert_eq!(semaphore.available_permits(), 0);
866    ///
867    /// let permit_3 = semaphore.try_acquire_owned();
868    /// assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));
869    /// # }
870    /// ```
871    ///
872    /// [`Arc`]: std::sync::Arc
873    /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
874    /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
875    /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
876    pub fn try_acquire_owned(self: Arc<Self>) -> Result<OwnedSemaphorePermit, TryAcquireError> {
877        match self.ll_sem.try_acquire(1) {
878            Ok(()) => Ok(OwnedSemaphorePermit {
879                sem: self,
880                permits: 1,
881            }),
882            Err(e) => Err(e),
883        }
884    }
885
886    /// Tries to acquire `n` permits from the semaphore.
887    ///
888    /// The semaphore must be wrapped in an [`Arc`] to call this method. If
889    /// the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
890    /// and a [`TryAcquireError::NoPermits`] if there are no permits left.
891    /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
892    /// acquired permit.
893    ///
894    /// # Examples
895    ///
896    /// ```
897    /// use std::sync::Arc;
898    /// use tokio::sync::{Semaphore, TryAcquireError};
899    ///
900    /// # fn main() {
901    /// let semaphore = Arc::new(Semaphore::new(4));
902    ///
903    /// let permit_1 = Arc::clone(&semaphore).try_acquire_many_owned(3).unwrap();
904    /// assert_eq!(semaphore.available_permits(), 1);
905    ///
906    /// let permit_2 = semaphore.try_acquire_many_owned(2);
907    /// assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));
908    /// # }
909    /// ```
910    ///
911    /// [`Arc`]: std::sync::Arc
912    /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
913    /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
914    /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
915    pub fn try_acquire_many_owned(
916        self: Arc<Self>,
917        n: u32,
918    ) -> Result<OwnedSemaphorePermit, TryAcquireError> {
919        match self.ll_sem.try_acquire(n as usize) {
920            Ok(()) => Ok(OwnedSemaphorePermit {
921                sem: self,
922                permits: n,
923            }),
924            Err(e) => Err(e),
925        }
926    }
927
928    /// Closes the semaphore.
929    ///
930    /// This prevents the semaphore from issuing new permits and notifies all pending waiters.
931    ///
932    /// # Examples
933    ///
934    /// ```
935    /// use tokio::sync::Semaphore;
936    /// use std::sync::Arc;
937    /// use tokio::sync::TryAcquireError;
938    ///
939    /// #[tokio::main]
940    /// async fn main() {
941    ///     let semaphore = Arc::new(Semaphore::new(1));
942    ///     let semaphore2 = semaphore.clone();
943    ///
944    ///     tokio::spawn(async move {
945    ///         let permit = semaphore.acquire_many(2).await;
946    ///         assert!(permit.is_err());
947    ///         println!("waiter received error");
948    ///     });
949    ///
950    ///     println!("closing semaphore");
951    ///     semaphore2.close();
952    ///
953    ///     // Cannot obtain more permits
954    ///     assert_eq!(semaphore2.try_acquire().err(), Some(TryAcquireError::Closed))
955    /// }
956    /// ```
957    pub fn close(&self) {
958        self.ll_sem.close();
959    }
960
961    /// Returns true if the semaphore is closed
962    pub fn is_closed(&self) -> bool {
963        self.ll_sem.is_closed()
964    }
965}
966
967impl<'a> SemaphorePermit<'a> {
968    /// Forgets the permit **without** releasing it back to the semaphore.
969    /// This can be used to reduce the amount of permits available from a
970    /// semaphore.
971    ///
972    /// # Examples
973    ///
974    /// ```
975    /// use std::sync::Arc;
976    /// use tokio::sync::Semaphore;
977    ///
978    /// let sem = Arc::new(Semaphore::new(10));
979    /// {
980    ///     let permit = sem.try_acquire_many(5).unwrap();
981    ///     assert_eq!(sem.available_permits(), 5);
982    ///     permit.forget();
983    /// }
984    ///
985    /// // Since we forgot the permit, available permits won't go back to its initial value
986    /// // even after the permit is dropped.
987    /// assert_eq!(sem.available_permits(), 5);
988    /// ```
989    pub fn forget(mut self) {
990        self.permits = 0;
991    }
992
993    /// Merge two [`SemaphorePermit`] instances together, consuming `other`
994    /// without releasing the permits it holds.
995    ///
996    /// Permits held by both `self` and `other` are released when `self` drops.
997    ///
998    /// # Panics
999    ///
1000    /// This function panics if permits from different [`Semaphore`] instances
1001    /// are merged.
1002    ///
1003    /// # Examples
1004    ///
1005    /// ```
1006    /// use std::sync::Arc;
1007    /// use tokio::sync::Semaphore;
1008    ///
1009    /// let sem = Arc::new(Semaphore::new(10));
1010    /// let mut permit = sem.try_acquire().unwrap();
1011    ///
1012    /// for _ in 0..9 {
1013    ///     let _permit = sem.try_acquire().unwrap();
1014    ///     // Merge individual permits into a single one.
1015    ///     permit.merge(_permit)
1016    /// }
1017    ///
1018    /// assert_eq!(sem.available_permits(), 0);
1019    ///
1020    /// // Release all permits in a single batch.
1021    /// drop(permit);
1022    ///
1023    /// assert_eq!(sem.available_permits(), 10);
1024    /// ```
1025    #[track_caller]
1026    pub fn merge(&mut self, mut other: Self) {
1027        assert!(
1028            std::ptr::eq(self.sem, other.sem),
1029            "merging permits from different semaphore instances"
1030        );
1031        self.permits += other.permits;
1032        other.permits = 0;
1033    }
1034
1035    /// Splits `n` permits from `self` and returns a new [`SemaphorePermit`] instance that holds `n` permits.
1036    ///
1037    /// If there are insufficient permits and it's not possible to reduce by `n`, returns `None`.
1038    ///
1039    /// # Examples
1040    ///
1041    /// ```
1042    /// use std::sync::Arc;
1043    /// use tokio::sync::Semaphore;
1044    ///
1045    /// let sem = Arc::new(Semaphore::new(3));
1046    ///
1047    /// let mut p1 = sem.try_acquire_many(3).unwrap();
1048    /// let p2 = p1.split(1).unwrap();
1049    ///
1050    /// assert_eq!(p1.num_permits(), 2);
1051    /// assert_eq!(p2.num_permits(), 1);
1052    /// ```
1053    pub fn split(&mut self, n: usize) -> Option<Self> {
1054        let n = u32::try_from(n).ok()?;
1055
1056        if n > self.permits {
1057            return None;
1058        }
1059
1060        self.permits -= n;
1061
1062        Some(Self {
1063            sem: self.sem,
1064            permits: n,
1065        })
1066    }
1067
1068    /// Returns the number of permits held by `self`.
1069    pub fn num_permits(&self) -> usize {
1070        self.permits as usize
1071    }
1072}
1073
1074impl OwnedSemaphorePermit {
1075    /// Forgets the permit **without** releasing it back to the semaphore.
1076    /// This can be used to reduce the amount of permits available from a
1077    /// semaphore.
1078    ///
1079    /// # Examples
1080    ///
1081    /// ```
1082    /// use std::sync::Arc;
1083    /// use tokio::sync::Semaphore;
1084    ///
1085    /// let sem = Arc::new(Semaphore::new(10));
1086    /// {
1087    ///     let permit = sem.clone().try_acquire_many_owned(5).unwrap();
1088    ///     assert_eq!(sem.available_permits(), 5);
1089    ///     permit.forget();
1090    /// }
1091    ///
1092    /// // Since we forgot the permit, available permits won't go back to its initial value
1093    /// // even after the permit is dropped.
1094    /// assert_eq!(sem.available_permits(), 5);
1095    /// ```
1096    pub fn forget(mut self) {
1097        self.permits = 0;
1098    }
1099
1100    /// Merge two [`OwnedSemaphorePermit`] instances together, consuming `other`
1101    /// without releasing the permits it holds.
1102    ///
1103    /// Permits held by both `self` and `other` are released when `self` drops.
1104    ///
1105    /// # Panics
1106    ///
1107    /// This function panics if permits from different [`Semaphore`] instances
1108    /// are merged.
1109    ///
1110    /// # Examples
1111    ///
1112    /// ```
1113    /// use std::sync::Arc;
1114    /// use tokio::sync::Semaphore;
1115    ///
1116    /// let sem = Arc::new(Semaphore::new(10));
1117    /// let mut permit = sem.clone().try_acquire_owned().unwrap();
1118    ///
1119    /// for _ in 0..9 {
1120    ///     let _permit = sem.clone().try_acquire_owned().unwrap();
1121    ///     // Merge individual permits into a single one.
1122    ///     permit.merge(_permit)
1123    /// }
1124    ///
1125    /// assert_eq!(sem.available_permits(), 0);
1126    ///
1127    /// // Release all permits in a single batch.
1128    /// drop(permit);
1129    ///
1130    /// assert_eq!(sem.available_permits(), 10);
1131    /// ```
1132    #[track_caller]
1133    pub fn merge(&mut self, mut other: Self) {
1134        assert!(
1135            Arc::ptr_eq(&self.sem, &other.sem),
1136            "merging permits from different semaphore instances"
1137        );
1138        self.permits += other.permits;
1139        other.permits = 0;
1140    }
1141
1142    /// Splits `n` permits from `self` and returns a new [`OwnedSemaphorePermit`] instance that holds `n` permits.
1143    ///
1144    /// If there are insufficient permits and it's not possible to reduce by `n`, returns `None`.
1145    ///
1146    /// # Note
1147    ///
1148    /// It will clone the owned `Arc<Semaphore>` to construct the new instance.
1149    ///
1150    /// # Examples
1151    ///
1152    /// ```
1153    /// use std::sync::Arc;
1154    /// use tokio::sync::Semaphore;
1155    ///
1156    /// let sem = Arc::new(Semaphore::new(3));
1157    ///
1158    /// let mut p1 = sem.try_acquire_many_owned(3).unwrap();
1159    /// let p2 = p1.split(1).unwrap();
1160    ///
1161    /// assert_eq!(p1.num_permits(), 2);
1162    /// assert_eq!(p2.num_permits(), 1);
1163    /// ```
1164    pub fn split(&mut self, n: usize) -> Option<Self> {
1165        let n = u32::try_from(n).ok()?;
1166
1167        if n > self.permits {
1168            return None;
1169        }
1170
1171        self.permits -= n;
1172
1173        Some(Self {
1174            sem: self.sem.clone(),
1175            permits: n,
1176        })
1177    }
1178
1179    /// Returns the [`Semaphore`] from which this permit was acquired.
1180    pub fn semaphore(&self) -> &Arc<Semaphore> {
1181        &self.sem
1182    }
1183
1184    /// Returns the number of permits held by `self`.
1185    pub fn num_permits(&self) -> usize {
1186        self.permits as usize
1187    }
1188}
1189
1190impl Drop for SemaphorePermit<'_> {
1191    fn drop(&mut self) {
1192        self.sem.add_permits(self.permits as usize);
1193    }
1194}
1195
1196impl Drop for OwnedSemaphorePermit {
1197    fn drop(&mut self) {
1198        self.sem.add_permits(self.permits as usize);
1199    }
1200}