tokio/sync/semaphore.rs
1use super::batch_semaphore as ll; // low level implementation
2use super::{AcquireError, TryAcquireError};
3#[cfg(all(tokio_unstable, feature = "tracing"))]
4use crate::util::trace;
5use std::sync::Arc;
6
7/// Counting semaphore performing asynchronous permit acquisition.
8///
9/// A semaphore maintains a set of permits. Permits are used to synchronize
10/// access to a shared resource. A semaphore differs from a mutex in that it
11/// can allow more than one concurrent caller to access the shared resource at a
12/// time.
13///
14/// When `acquire` is called and the semaphore has remaining permits, the
15/// function immediately returns a permit. However, if no remaining permits are
16/// available, `acquire` (asynchronously) waits until an outstanding permit is
17/// dropped. At this point, the freed permit is assigned to the caller.
18///
19/// This `Semaphore` is fair, which means that permits are given out in the order
20/// they were requested. This fairness is also applied when `acquire_many` gets
21/// involved, so if a call to `acquire_many` at the front of the queue requests
22/// more permits than currently available, this can prevent a call to `acquire`
23/// from completing, even if the semaphore has enough permits complete the call
24/// to `acquire`.
25///
26/// To use the `Semaphore` in a poll function, you can use the [`PollSemaphore`]
27/// utility.
28///
29/// # Examples
30///
31/// Basic usage:
32///
33/// ```
34/// use tokio::sync::{Semaphore, TryAcquireError};
35///
36/// #[tokio::main]
37/// async fn main() {
38/// let semaphore = Semaphore::new(3);
39///
40/// let a_permit = semaphore.acquire().await.unwrap();
41/// let two_permits = semaphore.acquire_many(2).await.unwrap();
42///
43/// assert_eq!(semaphore.available_permits(), 0);
44///
45/// let permit_attempt = semaphore.try_acquire();
46/// assert_eq!(permit_attempt.err(), Some(TryAcquireError::NoPermits));
47/// }
48/// ```
49///
50/// ## Limit the number of simultaneously opened files in your program
51///
52/// Most operating systems have limits on the number of open file
53/// handles. Even in systems without explicit limits, resource constraints
54/// implicitly set an upper bound on the number of open files. If your
55/// program attempts to open a large number of files and exceeds this
56/// limit, it will result in an error.
57///
58/// This example uses a Semaphore with 100 permits. By acquiring a permit from
59/// the Semaphore before accessing a file, you ensure that your program opens
60/// no more than 100 files at a time. When trying to open the 101st
61/// file, the program will wait until a permit becomes available before
62/// proceeding to open another file.
63/// ```
64/// use std::io::Result;
65/// use tokio::fs::File;
66/// use tokio::sync::Semaphore;
67/// use tokio::io::AsyncWriteExt;
68///
69/// static PERMITS: Semaphore = Semaphore::const_new(100);
70///
71/// async fn write_to_file(message: &[u8]) -> Result<()> {
72/// let _permit = PERMITS.acquire().await.unwrap();
73/// let mut buffer = File::create("example.txt").await?;
74/// buffer.write_all(message).await?;
75/// Ok(()) // Permit goes out of scope here, and is available again for acquisition
76/// }
77/// ```
78///
79/// ## Limit the number of outgoing requests being sent at the same time
80///
81/// In some scenarios, it might be required to limit the number of outgoing
82/// requests being sent in parallel. This could be due to limits of a consumed
83/// API or the network resources of the system the application is running on.
84///
85/// This example uses an `Arc<Semaphore>` with 10 permits. Each task spawned is
86/// given a reference to the semaphore by cloning the `Arc<Semaphore>`. Before
87/// a task sends a request, it must acquire a permit from the semaphore by
88/// calling [`Semaphore::acquire`]. This ensures that at most 10 requests are
89/// sent in parallel at any given time. After a task has sent a request, it
90/// drops the permit to allow other tasks to send requests.
91///
92/// ```
93/// use std::sync::Arc;
94/// use tokio::sync::Semaphore;
95///
96/// #[tokio::main]
97/// async fn main() {
98/// // Define maximum number of parallel requests.
99/// let semaphore = Arc::new(Semaphore::new(10));
100/// // Spawn many tasks that will send requests.
101/// let mut jhs = Vec::new();
102/// for task_id in 0..100 {
103/// let semaphore = semaphore.clone();
104/// let jh = tokio::spawn(async move {
105/// // Acquire permit before sending request.
106/// let _permit = semaphore.acquire().await.unwrap();
107/// // Send the request.
108/// let response = send_request(task_id).await;
109/// // Drop the permit after the request has been sent.
110/// drop(_permit);
111/// // Handle response.
112/// // ...
113///
114/// response
115/// });
116/// jhs.push(jh);
117/// }
118/// // Collect responses from tasks.
119/// let mut responses = Vec::new();
120/// for jh in jhs {
121/// let response = jh.await.unwrap();
122/// responses.push(response);
123/// }
124/// // Process responses.
125/// // ...
126/// }
127/// # async fn send_request(task_id: usize) {
128/// # // Send request.
129/// # }
130/// ```
131///
132/// ## Limit the number of incoming requests being handled at the same time
133///
134/// Similar to limiting the number of simultaneously opened files, network handles
135/// are a limited resource. Allowing an unbounded amount of requests to be processed
136/// could result in a denial-of-service, among many other issues.
137///
138/// This example uses an `Arc<Semaphore>` instead of a global variable.
139/// To limit the number of requests that can be processed at the time,
140/// we acquire a permit for each task before spawning it. Once acquired,
141/// a new task is spawned; and once finished, the permit is dropped inside
142/// of the task to allow others to spawn. Permits must be acquired via
143/// [`Semaphore::acquire_owned`] to be movable across the task boundary.
144/// (Since our semaphore is not a global variable — if it was, then `acquire` would be enough.)
145///
146/// ```no_run
147/// use std::sync::Arc;
148/// use tokio::sync::Semaphore;
149/// use tokio::net::TcpListener;
150///
151/// #[tokio::main]
152/// async fn main() -> std::io::Result<()> {
153/// let semaphore = Arc::new(Semaphore::new(3));
154/// let listener = TcpListener::bind("127.0.0.1:8080").await?;
155///
156/// loop {
157/// // Acquire permit before accepting the next socket.
158/// //
159/// // We use `acquire_owned` so that we can move `permit` into
160/// // other tasks.
161/// let permit = semaphore.clone().acquire_owned().await.unwrap();
162/// let (mut socket, _) = listener.accept().await?;
163///
164/// tokio::spawn(async move {
165/// // Do work using the socket.
166/// handle_connection(&mut socket).await;
167/// // Drop socket while the permit is still live.
168/// drop(socket);
169/// // Drop the permit, so more tasks can be created.
170/// drop(permit);
171/// });
172/// }
173/// }
174/// # async fn handle_connection(_socket: &mut tokio::net::TcpStream) {
175/// # // Do work
176/// # }
177/// ```
178///
179/// ## Prevent tests from running in parallel
180///
181/// By default, Rust runs tests in the same file in parallel. However, in some
182/// cases, running two tests in parallel may lead to problems. For example, this
183/// can happen when tests use the same database.
184///
185/// Consider the following scenario:
186/// 1. `test_insert`: Inserts a key-value pair into the database, then retrieves
187/// the value using the same key to verify the insertion.
188/// 2. `test_update`: Inserts a key, then updates the key to a new value and
189/// verifies that the value has been accurately updated.
190/// 3. `test_others`: A third test that doesn't modify the database state. It
191/// can run in parallel with the other tests.
192///
193/// In this example, `test_insert` and `test_update` need to run in sequence to
194/// work, but it doesn't matter which test runs first. We can leverage a
195/// semaphore with a single permit to address this challenge.
196///
197/// ```
198/// # use tokio::sync::Mutex;
199/// # use std::collections::BTreeMap;
200/// # struct Database {
201/// # map: Mutex<BTreeMap<String, i32>>,
202/// # }
203/// # impl Database {
204/// # pub const fn setup() -> Database {
205/// # Database {
206/// # map: Mutex::const_new(BTreeMap::new()),
207/// # }
208/// # }
209/// # pub async fn insert(&self, key: &str, value: i32) {
210/// # self.map.lock().await.insert(key.to_string(), value);
211/// # }
212/// # pub async fn update(&self, key: &str, value: i32) {
213/// # self.map.lock().await
214/// # .entry(key.to_string())
215/// # .and_modify(|origin| *origin = value);
216/// # }
217/// # pub async fn delete(&self, key: &str) {
218/// # self.map.lock().await.remove(key);
219/// # }
220/// # pub async fn get(&self, key: &str) -> i32 {
221/// # *self.map.lock().await.get(key).unwrap()
222/// # }
223/// # }
224/// use tokio::sync::Semaphore;
225///
226/// // Initialize a static semaphore with only one permit, which is used to
227/// // prevent test_insert and test_update from running in parallel.
228/// static PERMIT: Semaphore = Semaphore::const_new(1);
229///
230/// // Initialize the database that will be used by the subsequent tests.
231/// static DB: Database = Database::setup();
232///
233/// #[tokio::test]
234/// # async fn fake_test_insert() {}
235/// async fn test_insert() {
236/// // Acquire permit before proceeding. Since the semaphore has only one permit,
237/// // the test will wait if the permit is already acquired by other tests.
238/// let permit = PERMIT.acquire().await.unwrap();
239///
240/// // Do the actual test stuff with database
241///
242/// // Insert a key-value pair to database
243/// let (key, value) = ("name", 0);
244/// DB.insert(key, value).await;
245///
246/// // Verify that the value has been inserted correctly.
247/// assert_eq!(DB.get(key).await, value);
248///
249/// // Undo the insertion, so the database is empty at the end of the test.
250/// DB.delete(key).await;
251///
252/// // Drop permit. This allows the other test to start running.
253/// drop(permit);
254/// }
255///
256/// #[tokio::test]
257/// # async fn fake_test_update() {}
258/// async fn test_update() {
259/// // Acquire permit before proceeding. Since the semaphore has only one permit,
260/// // the test will wait if the permit is already acquired by other tests.
261/// let permit = PERMIT.acquire().await.unwrap();
262///
263/// // Do the same insert.
264/// let (key, value) = ("name", 0);
265/// DB.insert(key, value).await;
266///
267/// // Update the existing value with a new one.
268/// let new_value = 1;
269/// DB.update(key, new_value).await;
270///
271/// // Verify that the value has been updated correctly.
272/// assert_eq!(DB.get(key).await, new_value);
273///
274/// // Undo any modificattion.
275/// DB.delete(key).await;
276///
277/// // Drop permit. This allows the other test to start running.
278/// drop(permit);
279/// }
280///
281/// #[tokio::test]
282/// # async fn fake_test_others() {}
283/// async fn test_others() {
284/// // This test can run in parallel with test_insert and test_update,
285/// // so it does not use PERMIT.
286/// }
287/// # #[tokio::main(flavor = "current_thread")]
288/// # async fn main() {
289/// # test_insert().await;
290/// # test_update().await;
291/// # test_others().await;
292/// # }
293/// ```
294///
295/// ## Rate limiting using a token bucket
296///
297/// This example showcases the [`add_permits`] and [`SemaphorePermit::forget`] methods.
298///
299/// Many applications and systems have constraints on the rate at which certain
300/// operations should occur. Exceeding this rate can result in suboptimal
301/// performance or even errors.
302///
303/// This example implements rate limiting using a [token bucket]. A token bucket is a form of rate
304/// limiting that doesn't kick in immediately, to allow for short bursts of incoming requests that
305/// arrive at the same time.
306///
307/// With a token bucket, each incoming request consumes a token, and the tokens are refilled at a
308/// certain rate that defines the rate limit. When a burst of requests arrives, tokens are
309/// immediately given out until the bucket is empty. Once the bucket is empty, requests will have to
310/// wait for new tokens to be added.
311///
312/// Unlike the example that limits how many requests can be handled at the same time, we do not add
313/// tokens back when we finish handling a request. Instead, tokens are added only by a timer task.
314///
315/// Note that this implementation is suboptimal when the duration is small, because it consumes a
316/// lot of cpu constantly looping and sleeping.
317///
318/// [token bucket]: https://en.wikipedia.org/wiki/Token_bucket
319/// [`add_permits`]: crate::sync::Semaphore::add_permits
320/// [`SemaphorePermit::forget`]: crate::sync::SemaphorePermit::forget
321/// ```
322/// use std::sync::Arc;
323/// use tokio::sync::Semaphore;
324/// use tokio::time::{interval, Duration};
325///
326/// struct TokenBucket {
327/// sem: Arc<Semaphore>,
328/// jh: tokio::task::JoinHandle<()>,
329/// }
330///
331/// impl TokenBucket {
332/// fn new(duration: Duration, capacity: usize) -> Self {
333/// let sem = Arc::new(Semaphore::new(capacity));
334///
335/// // refills the tokens at the end of each interval
336/// let jh = tokio::spawn({
337/// let sem = sem.clone();
338/// let mut interval = interval(duration);
339/// interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
340///
341/// async move {
342/// loop {
343/// interval.tick().await;
344///
345/// if sem.available_permits() < capacity {
346/// sem.add_permits(1);
347/// }
348/// }
349/// }
350/// });
351///
352/// Self { jh, sem }
353/// }
354///
355/// async fn acquire(&self) {
356/// // This can return an error if the semaphore is closed, but we
357/// // never close it, so this error can never happen.
358/// let permit = self.sem.acquire().await.unwrap();
359/// // To avoid releasing the permit back to the semaphore, we use
360/// // the `SemaphorePermit::forget` method.
361/// permit.forget();
362/// }
363/// }
364///
365/// impl Drop for TokenBucket {
366/// fn drop(&mut self) {
367/// // Kill the background task so it stops taking up resources when we
368/// // don't need it anymore.
369/// self.jh.abort();
370/// }
371/// }
372///
373/// #[tokio::main]
374/// # async fn _hidden() {}
375/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
376/// async fn main() {
377/// let capacity = 5;
378/// let update_interval = Duration::from_secs_f32(1.0 / capacity as f32);
379/// let bucket = TokenBucket::new(update_interval, capacity);
380///
381/// for _ in 0..5 {
382/// bucket.acquire().await;
383///
384/// // do the operation
385/// }
386/// }
387/// ```
388///
389/// [`PollSemaphore`]: https://docs.rs/tokio-util/latest/tokio_util/sync/struct.PollSemaphore.html
390/// [`Semaphore::acquire_owned`]: crate::sync::Semaphore::acquire_owned
391#[derive(Debug)]
392pub struct Semaphore {
393 /// The low level semaphore
394 ll_sem: ll::Semaphore,
395 #[cfg(all(tokio_unstable, feature = "tracing"))]
396 resource_span: tracing::Span,
397}
398
399/// A permit from the semaphore.
400///
401/// This type is created by the [`acquire`] method.
402///
403/// [`acquire`]: crate::sync::Semaphore::acquire()
404#[must_use]
405#[clippy::has_significant_drop]
406#[derive(Debug)]
407pub struct SemaphorePermit<'a> {
408 sem: &'a Semaphore,
409 permits: u32,
410}
411
412/// An owned permit from the semaphore.
413///
414/// This type is created by the [`acquire_owned`] method.
415///
416/// [`acquire_owned`]: crate::sync::Semaphore::acquire_owned()
417#[must_use]
418#[clippy::has_significant_drop]
419#[derive(Debug)]
420pub struct OwnedSemaphorePermit {
421 sem: Arc<Semaphore>,
422 permits: u32,
423}
424
425#[test]
426#[cfg(not(loom))]
427fn bounds() {
428 fn check_unpin<T: Unpin>() {}
429 // This has to take a value, since the async fn's return type is unnameable.
430 fn check_send_sync_val<T: Send + Sync>(_t: T) {}
431 fn check_send_sync<T: Send + Sync>() {}
432 check_unpin::<Semaphore>();
433 check_unpin::<SemaphorePermit<'_>>();
434 check_send_sync::<Semaphore>();
435
436 let semaphore = Semaphore::new(0);
437 check_send_sync_val(semaphore.acquire());
438}
439
440impl Semaphore {
441 /// The maximum number of permits which a semaphore can hold. It is `usize::MAX >> 3`.
442 ///
443 /// Exceeding this limit typically results in a panic.
444 pub const MAX_PERMITS: usize = super::batch_semaphore::Semaphore::MAX_PERMITS;
445
446 /// Creates a new semaphore with the initial number of permits.
447 ///
448 /// Panics if `permits` exceeds [`Semaphore::MAX_PERMITS`].
449 #[track_caller]
450 pub fn new(permits: usize) -> Self {
451 #[cfg(all(tokio_unstable, feature = "tracing"))]
452 let resource_span = {
453 let location = std::panic::Location::caller();
454
455 tracing::trace_span!(
456 parent: None,
457 "runtime.resource",
458 concrete_type = "Semaphore",
459 kind = "Sync",
460 loc.file = location.file(),
461 loc.line = location.line(),
462 loc.col = location.column(),
463 inherits_child_attrs = true,
464 )
465 };
466
467 #[cfg(all(tokio_unstable, feature = "tracing"))]
468 let ll_sem = resource_span.in_scope(|| ll::Semaphore::new(permits));
469
470 #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
471 let ll_sem = ll::Semaphore::new(permits);
472
473 Self {
474 ll_sem,
475 #[cfg(all(tokio_unstable, feature = "tracing"))]
476 resource_span,
477 }
478 }
479
480 /// Creates a new semaphore with the initial number of permits.
481 ///
482 /// When using the `tracing` [unstable feature], a `Semaphore` created with
483 /// `const_new` will not be instrumented. As such, it will not be visible
484 /// in [`tokio-console`]. Instead, [`Semaphore::new`] should be used to
485 /// create an instrumented object if that is needed.
486 ///
487 /// # Examples
488 ///
489 /// ```
490 /// use tokio::sync::Semaphore;
491 ///
492 /// static SEM: Semaphore = Semaphore::const_new(10);
493 /// ```
494 ///
495 /// [`tokio-console`]: https://github.com/tokio-rs/console
496 /// [unstable feature]: crate#unstable-features
497 #[cfg(not(all(loom, test)))]
498 pub const fn const_new(permits: usize) -> Self {
499 Self {
500 ll_sem: ll::Semaphore::const_new(permits),
501 #[cfg(all(tokio_unstable, feature = "tracing"))]
502 resource_span: tracing::Span::none(),
503 }
504 }
505
506 /// Creates a new closed semaphore with 0 permits.
507 pub(crate) fn new_closed() -> Self {
508 Self {
509 ll_sem: ll::Semaphore::new_closed(),
510 #[cfg(all(tokio_unstable, feature = "tracing"))]
511 resource_span: tracing::Span::none(),
512 }
513 }
514
515 /// Creates a new closed semaphore with 0 permits.
516 #[cfg(not(all(loom, test)))]
517 pub(crate) const fn const_new_closed() -> Self {
518 Self {
519 ll_sem: ll::Semaphore::const_new_closed(),
520 #[cfg(all(tokio_unstable, feature = "tracing"))]
521 resource_span: tracing::Span::none(),
522 }
523 }
524
525 /// Returns the current number of available permits.
526 pub fn available_permits(&self) -> usize {
527 self.ll_sem.available_permits()
528 }
529
530 /// Adds `n` new permits to the semaphore.
531 ///
532 /// The maximum number of permits is [`Semaphore::MAX_PERMITS`], and this function will panic if the limit is exceeded.
533 pub fn add_permits(&self, n: usize) {
534 self.ll_sem.release(n);
535 }
536
537 /// Decrease a semaphore's permits by a maximum of `n`.
538 ///
539 /// If there are insufficient permits and it's not possible to reduce by `n`,
540 /// return the number of permits that were actually reduced.
541 pub fn forget_permits(&self, n: usize) -> usize {
542 self.ll_sem.forget_permits(n)
543 }
544
545 /// Acquires a permit from the semaphore.
546 ///
547 /// If the semaphore has been closed, this returns an [`AcquireError`].
548 /// Otherwise, this returns a [`SemaphorePermit`] representing the
549 /// acquired permit.
550 ///
551 /// # Cancel safety
552 ///
553 /// This method uses a queue to fairly distribute permits in the order they
554 /// were requested. Cancelling a call to `acquire` makes you lose your place
555 /// in the queue.
556 ///
557 /// # Examples
558 ///
559 /// ```
560 /// use tokio::sync::Semaphore;
561 ///
562 /// #[tokio::main]
563 /// async fn main() {
564 /// let semaphore = Semaphore::new(2);
565 ///
566 /// let permit_1 = semaphore.acquire().await.unwrap();
567 /// assert_eq!(semaphore.available_permits(), 1);
568 ///
569 /// let permit_2 = semaphore.acquire().await.unwrap();
570 /// assert_eq!(semaphore.available_permits(), 0);
571 ///
572 /// drop(permit_1);
573 /// assert_eq!(semaphore.available_permits(), 1);
574 /// }
575 /// ```
576 ///
577 /// [`AcquireError`]: crate::sync::AcquireError
578 /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
579 pub async fn acquire(&self) -> Result<SemaphorePermit<'_>, AcquireError> {
580 #[cfg(all(tokio_unstable, feature = "tracing"))]
581 let inner = trace::async_op(
582 || self.ll_sem.acquire(1),
583 self.resource_span.clone(),
584 "Semaphore::acquire",
585 "poll",
586 true,
587 );
588 #[cfg(not(all(tokio_unstable, feature = "tracing")))]
589 let inner = self.ll_sem.acquire(1);
590
591 inner.await?;
592 Ok(SemaphorePermit {
593 sem: self,
594 permits: 1,
595 })
596 }
597
598 /// Acquires `n` permits from the semaphore.
599 ///
600 /// If the semaphore has been closed, this returns an [`AcquireError`].
601 /// Otherwise, this returns a [`SemaphorePermit`] representing the
602 /// acquired permits.
603 ///
604 /// # Cancel safety
605 ///
606 /// This method uses a queue to fairly distribute permits in the order they
607 /// were requested. Cancelling a call to `acquire_many` makes you lose your
608 /// place in the queue.
609 ///
610 /// # Examples
611 ///
612 /// ```
613 /// use tokio::sync::Semaphore;
614 ///
615 /// #[tokio::main]
616 /// async fn main() {
617 /// let semaphore = Semaphore::new(5);
618 ///
619 /// let permit = semaphore.acquire_many(3).await.unwrap();
620 /// assert_eq!(semaphore.available_permits(), 2);
621 /// }
622 /// ```
623 ///
624 /// [`AcquireError`]: crate::sync::AcquireError
625 /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
626 pub async fn acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, AcquireError> {
627 #[cfg(all(tokio_unstable, feature = "tracing"))]
628 trace::async_op(
629 || self.ll_sem.acquire(n as usize),
630 self.resource_span.clone(),
631 "Semaphore::acquire_many",
632 "poll",
633 true,
634 )
635 .await?;
636
637 #[cfg(not(all(tokio_unstable, feature = "tracing")))]
638 self.ll_sem.acquire(n as usize).await?;
639
640 Ok(SemaphorePermit {
641 sem: self,
642 permits: n,
643 })
644 }
645
646 /// Tries to acquire a permit from the semaphore.
647 ///
648 /// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
649 /// and a [`TryAcquireError::NoPermits`] if there are no permits left. Otherwise,
650 /// this returns a [`SemaphorePermit`] representing the acquired permits.
651 ///
652 /// # Examples
653 ///
654 /// ```
655 /// use tokio::sync::{Semaphore, TryAcquireError};
656 ///
657 /// # fn main() {
658 /// let semaphore = Semaphore::new(2);
659 ///
660 /// let permit_1 = semaphore.try_acquire().unwrap();
661 /// assert_eq!(semaphore.available_permits(), 1);
662 ///
663 /// let permit_2 = semaphore.try_acquire().unwrap();
664 /// assert_eq!(semaphore.available_permits(), 0);
665 ///
666 /// let permit_3 = semaphore.try_acquire();
667 /// assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));
668 /// # }
669 /// ```
670 ///
671 /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
672 /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
673 /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
674 pub fn try_acquire(&self) -> Result<SemaphorePermit<'_>, TryAcquireError> {
675 match self.ll_sem.try_acquire(1) {
676 Ok(()) => Ok(SemaphorePermit {
677 sem: self,
678 permits: 1,
679 }),
680 Err(e) => Err(e),
681 }
682 }
683
684 /// Tries to acquire `n` permits from the semaphore.
685 ///
686 /// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
687 /// and a [`TryAcquireError::NoPermits`] if there are not enough permits left.
688 /// Otherwise, this returns a [`SemaphorePermit`] representing the acquired permits.
689 ///
690 /// # Examples
691 ///
692 /// ```
693 /// use tokio::sync::{Semaphore, TryAcquireError};
694 ///
695 /// # fn main() {
696 /// let semaphore = Semaphore::new(4);
697 ///
698 /// let permit_1 = semaphore.try_acquire_many(3).unwrap();
699 /// assert_eq!(semaphore.available_permits(), 1);
700 ///
701 /// let permit_2 = semaphore.try_acquire_many(2);
702 /// assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));
703 /// # }
704 /// ```
705 ///
706 /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
707 /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
708 /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
709 pub fn try_acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, TryAcquireError> {
710 match self.ll_sem.try_acquire(n as usize) {
711 Ok(()) => Ok(SemaphorePermit {
712 sem: self,
713 permits: n,
714 }),
715 Err(e) => Err(e),
716 }
717 }
718
719 /// Acquires a permit from the semaphore.
720 ///
721 /// The semaphore must be wrapped in an [`Arc`] to call this method.
722 /// If the semaphore has been closed, this returns an [`AcquireError`].
723 /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
724 /// acquired permit.
725 ///
726 /// # Cancel safety
727 ///
728 /// This method uses a queue to fairly distribute permits in the order they
729 /// were requested. Cancelling a call to `acquire_owned` makes you lose your
730 /// place in the queue.
731 ///
732 /// # Examples
733 ///
734 /// ```
735 /// use std::sync::Arc;
736 /// use tokio::sync::Semaphore;
737 ///
738 /// #[tokio::main]
739 /// async fn main() {
740 /// let semaphore = Arc::new(Semaphore::new(3));
741 /// let mut join_handles = Vec::new();
742 ///
743 /// for _ in 0..5 {
744 /// let permit = semaphore.clone().acquire_owned().await.unwrap();
745 /// join_handles.push(tokio::spawn(async move {
746 /// // perform task...
747 /// // explicitly own `permit` in the task
748 /// drop(permit);
749 /// }));
750 /// }
751 ///
752 /// for handle in join_handles {
753 /// handle.await.unwrap();
754 /// }
755 /// }
756 /// ```
757 ///
758 /// [`Arc`]: std::sync::Arc
759 /// [`AcquireError`]: crate::sync::AcquireError
760 /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
761 pub async fn acquire_owned(self: Arc<Self>) -> Result<OwnedSemaphorePermit, AcquireError> {
762 #[cfg(all(tokio_unstable, feature = "tracing"))]
763 let inner = trace::async_op(
764 || self.ll_sem.acquire(1),
765 self.resource_span.clone(),
766 "Semaphore::acquire_owned",
767 "poll",
768 true,
769 );
770 #[cfg(not(all(tokio_unstable, feature = "tracing")))]
771 let inner = self.ll_sem.acquire(1);
772
773 inner.await?;
774 Ok(OwnedSemaphorePermit {
775 sem: self,
776 permits: 1,
777 })
778 }
779
780 /// Acquires `n` permits from the semaphore.
781 ///
782 /// The semaphore must be wrapped in an [`Arc`] to call this method.
783 /// If the semaphore has been closed, this returns an [`AcquireError`].
784 /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
785 /// acquired permit.
786 ///
787 /// # Cancel safety
788 ///
789 /// This method uses a queue to fairly distribute permits in the order they
790 /// were requested. Cancelling a call to `acquire_many_owned` makes you lose
791 /// your place in the queue.
792 ///
793 /// # Examples
794 ///
795 /// ```
796 /// use std::sync::Arc;
797 /// use tokio::sync::Semaphore;
798 ///
799 /// #[tokio::main]
800 /// async fn main() {
801 /// let semaphore = Arc::new(Semaphore::new(10));
802 /// let mut join_handles = Vec::new();
803 ///
804 /// for _ in 0..5 {
805 /// let permit = semaphore.clone().acquire_many_owned(2).await.unwrap();
806 /// join_handles.push(tokio::spawn(async move {
807 /// // perform task...
808 /// // explicitly own `permit` in the task
809 /// drop(permit);
810 /// }));
811 /// }
812 ///
813 /// for handle in join_handles {
814 /// handle.await.unwrap();
815 /// }
816 /// }
817 /// ```
818 ///
819 /// [`Arc`]: std::sync::Arc
820 /// [`AcquireError`]: crate::sync::AcquireError
821 /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
822 pub async fn acquire_many_owned(
823 self: Arc<Self>,
824 n: u32,
825 ) -> Result<OwnedSemaphorePermit, AcquireError> {
826 #[cfg(all(tokio_unstable, feature = "tracing"))]
827 let inner = trace::async_op(
828 || self.ll_sem.acquire(n as usize),
829 self.resource_span.clone(),
830 "Semaphore::acquire_many_owned",
831 "poll",
832 true,
833 );
834 #[cfg(not(all(tokio_unstable, feature = "tracing")))]
835 let inner = self.ll_sem.acquire(n as usize);
836
837 inner.await?;
838 Ok(OwnedSemaphorePermit {
839 sem: self,
840 permits: n,
841 })
842 }
843
844 /// Tries to acquire a permit from the semaphore.
845 ///
846 /// The semaphore must be wrapped in an [`Arc`] to call this method. If
847 /// the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
848 /// and a [`TryAcquireError::NoPermits`] if there are no permits left.
849 /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
850 /// acquired permit.
851 ///
852 /// # Examples
853 ///
854 /// ```
855 /// use std::sync::Arc;
856 /// use tokio::sync::{Semaphore, TryAcquireError};
857 ///
858 /// # fn main() {
859 /// let semaphore = Arc::new(Semaphore::new(2));
860 ///
861 /// let permit_1 = Arc::clone(&semaphore).try_acquire_owned().unwrap();
862 /// assert_eq!(semaphore.available_permits(), 1);
863 ///
864 /// let permit_2 = Arc::clone(&semaphore).try_acquire_owned().unwrap();
865 /// assert_eq!(semaphore.available_permits(), 0);
866 ///
867 /// let permit_3 = semaphore.try_acquire_owned();
868 /// assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));
869 /// # }
870 /// ```
871 ///
872 /// [`Arc`]: std::sync::Arc
873 /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
874 /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
875 /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
876 pub fn try_acquire_owned(self: Arc<Self>) -> Result<OwnedSemaphorePermit, TryAcquireError> {
877 match self.ll_sem.try_acquire(1) {
878 Ok(()) => Ok(OwnedSemaphorePermit {
879 sem: self,
880 permits: 1,
881 }),
882 Err(e) => Err(e),
883 }
884 }
885
886 /// Tries to acquire `n` permits from the semaphore.
887 ///
888 /// The semaphore must be wrapped in an [`Arc`] to call this method. If
889 /// the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
890 /// and a [`TryAcquireError::NoPermits`] if there are no permits left.
891 /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
892 /// acquired permit.
893 ///
894 /// # Examples
895 ///
896 /// ```
897 /// use std::sync::Arc;
898 /// use tokio::sync::{Semaphore, TryAcquireError};
899 ///
900 /// # fn main() {
901 /// let semaphore = Arc::new(Semaphore::new(4));
902 ///
903 /// let permit_1 = Arc::clone(&semaphore).try_acquire_many_owned(3).unwrap();
904 /// assert_eq!(semaphore.available_permits(), 1);
905 ///
906 /// let permit_2 = semaphore.try_acquire_many_owned(2);
907 /// assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));
908 /// # }
909 /// ```
910 ///
911 /// [`Arc`]: std::sync::Arc
912 /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
913 /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
914 /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
915 pub fn try_acquire_many_owned(
916 self: Arc<Self>,
917 n: u32,
918 ) -> Result<OwnedSemaphorePermit, TryAcquireError> {
919 match self.ll_sem.try_acquire(n as usize) {
920 Ok(()) => Ok(OwnedSemaphorePermit {
921 sem: self,
922 permits: n,
923 }),
924 Err(e) => Err(e),
925 }
926 }
927
928 /// Closes the semaphore.
929 ///
930 /// This prevents the semaphore from issuing new permits and notifies all pending waiters.
931 ///
932 /// # Examples
933 ///
934 /// ```
935 /// use tokio::sync::Semaphore;
936 /// use std::sync::Arc;
937 /// use tokio::sync::TryAcquireError;
938 ///
939 /// #[tokio::main]
940 /// async fn main() {
941 /// let semaphore = Arc::new(Semaphore::new(1));
942 /// let semaphore2 = semaphore.clone();
943 ///
944 /// tokio::spawn(async move {
945 /// let permit = semaphore.acquire_many(2).await;
946 /// assert!(permit.is_err());
947 /// println!("waiter received error");
948 /// });
949 ///
950 /// println!("closing semaphore");
951 /// semaphore2.close();
952 ///
953 /// // Cannot obtain more permits
954 /// assert_eq!(semaphore2.try_acquire().err(), Some(TryAcquireError::Closed))
955 /// }
956 /// ```
957 pub fn close(&self) {
958 self.ll_sem.close();
959 }
960
961 /// Returns true if the semaphore is closed
962 pub fn is_closed(&self) -> bool {
963 self.ll_sem.is_closed()
964 }
965}
966
967impl<'a> SemaphorePermit<'a> {
968 /// Forgets the permit **without** releasing it back to the semaphore.
969 /// This can be used to reduce the amount of permits available from a
970 /// semaphore.
971 ///
972 /// # Examples
973 ///
974 /// ```
975 /// use std::sync::Arc;
976 /// use tokio::sync::Semaphore;
977 ///
978 /// let sem = Arc::new(Semaphore::new(10));
979 /// {
980 /// let permit = sem.try_acquire_many(5).unwrap();
981 /// assert_eq!(sem.available_permits(), 5);
982 /// permit.forget();
983 /// }
984 ///
985 /// // Since we forgot the permit, available permits won't go back to its initial value
986 /// // even after the permit is dropped.
987 /// assert_eq!(sem.available_permits(), 5);
988 /// ```
989 pub fn forget(mut self) {
990 self.permits = 0;
991 }
992
993 /// Merge two [`SemaphorePermit`] instances together, consuming `other`
994 /// without releasing the permits it holds.
995 ///
996 /// Permits held by both `self` and `other` are released when `self` drops.
997 ///
998 /// # Panics
999 ///
1000 /// This function panics if permits from different [`Semaphore`] instances
1001 /// are merged.
1002 ///
1003 /// # Examples
1004 ///
1005 /// ```
1006 /// use std::sync::Arc;
1007 /// use tokio::sync::Semaphore;
1008 ///
1009 /// let sem = Arc::new(Semaphore::new(10));
1010 /// let mut permit = sem.try_acquire().unwrap();
1011 ///
1012 /// for _ in 0..9 {
1013 /// let _permit = sem.try_acquire().unwrap();
1014 /// // Merge individual permits into a single one.
1015 /// permit.merge(_permit)
1016 /// }
1017 ///
1018 /// assert_eq!(sem.available_permits(), 0);
1019 ///
1020 /// // Release all permits in a single batch.
1021 /// drop(permit);
1022 ///
1023 /// assert_eq!(sem.available_permits(), 10);
1024 /// ```
1025 #[track_caller]
1026 pub fn merge(&mut self, mut other: Self) {
1027 assert!(
1028 std::ptr::eq(self.sem, other.sem),
1029 "merging permits from different semaphore instances"
1030 );
1031 self.permits += other.permits;
1032 other.permits = 0;
1033 }
1034
1035 /// Splits `n` permits from `self` and returns a new [`SemaphorePermit`] instance that holds `n` permits.
1036 ///
1037 /// If there are insufficient permits and it's not possible to reduce by `n`, returns `None`.
1038 ///
1039 /// # Examples
1040 ///
1041 /// ```
1042 /// use std::sync::Arc;
1043 /// use tokio::sync::Semaphore;
1044 ///
1045 /// let sem = Arc::new(Semaphore::new(3));
1046 ///
1047 /// let mut p1 = sem.try_acquire_many(3).unwrap();
1048 /// let p2 = p1.split(1).unwrap();
1049 ///
1050 /// assert_eq!(p1.num_permits(), 2);
1051 /// assert_eq!(p2.num_permits(), 1);
1052 /// ```
1053 pub fn split(&mut self, n: usize) -> Option<Self> {
1054 let n = u32::try_from(n).ok()?;
1055
1056 if n > self.permits {
1057 return None;
1058 }
1059
1060 self.permits -= n;
1061
1062 Some(Self {
1063 sem: self.sem,
1064 permits: n,
1065 })
1066 }
1067
1068 /// Returns the number of permits held by `self`.
1069 pub fn num_permits(&self) -> usize {
1070 self.permits as usize
1071 }
1072}
1073
1074impl OwnedSemaphorePermit {
1075 /// Forgets the permit **without** releasing it back to the semaphore.
1076 /// This can be used to reduce the amount of permits available from a
1077 /// semaphore.
1078 ///
1079 /// # Examples
1080 ///
1081 /// ```
1082 /// use std::sync::Arc;
1083 /// use tokio::sync::Semaphore;
1084 ///
1085 /// let sem = Arc::new(Semaphore::new(10));
1086 /// {
1087 /// let permit = sem.clone().try_acquire_many_owned(5).unwrap();
1088 /// assert_eq!(sem.available_permits(), 5);
1089 /// permit.forget();
1090 /// }
1091 ///
1092 /// // Since we forgot the permit, available permits won't go back to its initial value
1093 /// // even after the permit is dropped.
1094 /// assert_eq!(sem.available_permits(), 5);
1095 /// ```
1096 pub fn forget(mut self) {
1097 self.permits = 0;
1098 }
1099
1100 /// Merge two [`OwnedSemaphorePermit`] instances together, consuming `other`
1101 /// without releasing the permits it holds.
1102 ///
1103 /// Permits held by both `self` and `other` are released when `self` drops.
1104 ///
1105 /// # Panics
1106 ///
1107 /// This function panics if permits from different [`Semaphore`] instances
1108 /// are merged.
1109 ///
1110 /// # Examples
1111 ///
1112 /// ```
1113 /// use std::sync::Arc;
1114 /// use tokio::sync::Semaphore;
1115 ///
1116 /// let sem = Arc::new(Semaphore::new(10));
1117 /// let mut permit = sem.clone().try_acquire_owned().unwrap();
1118 ///
1119 /// for _ in 0..9 {
1120 /// let _permit = sem.clone().try_acquire_owned().unwrap();
1121 /// // Merge individual permits into a single one.
1122 /// permit.merge(_permit)
1123 /// }
1124 ///
1125 /// assert_eq!(sem.available_permits(), 0);
1126 ///
1127 /// // Release all permits in a single batch.
1128 /// drop(permit);
1129 ///
1130 /// assert_eq!(sem.available_permits(), 10);
1131 /// ```
1132 #[track_caller]
1133 pub fn merge(&mut self, mut other: Self) {
1134 assert!(
1135 Arc::ptr_eq(&self.sem, &other.sem),
1136 "merging permits from different semaphore instances"
1137 );
1138 self.permits += other.permits;
1139 other.permits = 0;
1140 }
1141
1142 /// Splits `n` permits from `self` and returns a new [`OwnedSemaphorePermit`] instance that holds `n` permits.
1143 ///
1144 /// If there are insufficient permits and it's not possible to reduce by `n`, returns `None`.
1145 ///
1146 /// # Note
1147 ///
1148 /// It will clone the owned `Arc<Semaphore>` to construct the new instance.
1149 ///
1150 /// # Examples
1151 ///
1152 /// ```
1153 /// use std::sync::Arc;
1154 /// use tokio::sync::Semaphore;
1155 ///
1156 /// let sem = Arc::new(Semaphore::new(3));
1157 ///
1158 /// let mut p1 = sem.try_acquire_many_owned(3).unwrap();
1159 /// let p2 = p1.split(1).unwrap();
1160 ///
1161 /// assert_eq!(p1.num_permits(), 2);
1162 /// assert_eq!(p2.num_permits(), 1);
1163 /// ```
1164 pub fn split(&mut self, n: usize) -> Option<Self> {
1165 let n = u32::try_from(n).ok()?;
1166
1167 if n > self.permits {
1168 return None;
1169 }
1170
1171 self.permits -= n;
1172
1173 Some(Self {
1174 sem: self.sem.clone(),
1175 permits: n,
1176 })
1177 }
1178
1179 /// Returns the [`Semaphore`] from which this permit was acquired.
1180 pub fn semaphore(&self) -> &Arc<Semaphore> {
1181 &self.sem
1182 }
1183
1184 /// Returns the number of permits held by `self`.
1185 pub fn num_permits(&self) -> usize {
1186 self.permits as usize
1187 }
1188}
1189
1190impl Drop for SemaphorePermit<'_> {
1191 fn drop(&mut self) {
1192 self.sem.add_permits(self.permits as usize);
1193 }
1194}
1195
1196impl Drop for OwnedSemaphorePermit {
1197 fn drop(&mut self) {
1198 self.sem.add_permits(self.permits as usize);
1199 }
1200}