tokio/sync/mpsc/
bounded.rs

1use crate::loom::sync::Arc;
2use crate::sync::batch_semaphore::{self as semaphore, TryAcquireError};
3use crate::sync::mpsc::chan;
4use crate::sync::mpsc::error::{SendError, TryRecvError, TrySendError};
5
6cfg_time! {
7    use crate::sync::mpsc::error::SendTimeoutError;
8    use crate::time::Duration;
9}
10
11use std::fmt;
12use std::task::{Context, Poll};
13
14/// Sends values to the associated `Receiver`.
15///
16/// Instances are created by the [`channel`] function.
17///
18/// To convert the `Sender` into a `Sink` or use it in a poll function, you can
19/// use the [`PollSender`] utility.
20///
21/// [`PollSender`]: https://docs.rs/tokio-util/latest/tokio_util/sync/struct.PollSender.html
22pub struct Sender<T> {
23    chan: chan::Tx<T, Semaphore>,
24}
25
26/// A sender that does not prevent the channel from being closed.
27///
28/// If all [`Sender`] instances of a channel were dropped and only `WeakSender`
29/// instances remain, the channel is closed.
30///
31/// In order to send messages, the `WeakSender` needs to be upgraded using
32/// [`WeakSender::upgrade`], which returns `Option<Sender>`. It returns `None`
33/// if all `Sender`s have been dropped, and otherwise it returns a `Sender`.
34///
35/// [`Sender`]: Sender
36/// [`WeakSender::upgrade`]: WeakSender::upgrade
37///
38/// # Examples
39///
40/// ```
41/// use tokio::sync::mpsc::channel;
42///
43/// #[tokio::main]
44/// async fn main() {
45///     let (tx, _rx) = channel::<i32>(15);
46///     let tx_weak = tx.downgrade();
47///
48///     // Upgrading will succeed because `tx` still exists.
49///     assert!(tx_weak.upgrade().is_some());
50///
51///     // If we drop `tx`, then it will fail.
52///     drop(tx);
53///     assert!(tx_weak.clone().upgrade().is_none());
54/// }
55/// ```
56pub struct WeakSender<T> {
57    chan: Arc<chan::Chan<T, Semaphore>>,
58}
59
60/// Permits to send one value into the channel.
61///
62/// `Permit` values are returned by [`Sender::reserve()`] and [`Sender::try_reserve()`]
63/// and are used to guarantee channel capacity before generating a message to send.
64///
65/// [`Sender::reserve()`]: Sender::reserve
66/// [`Sender::try_reserve()`]: Sender::try_reserve
67pub struct Permit<'a, T> {
68    chan: &'a chan::Tx<T, Semaphore>,
69}
70
71/// An [`Iterator`] of [`Permit`] that can be used to hold `n` slots in the channel.
72///
73/// `PermitIterator` values are returned by [`Sender::reserve_many()`] and [`Sender::try_reserve_many()`]
74/// and are used to guarantee channel capacity before generating `n` messages to send.
75///
76/// [`Sender::reserve_many()`]: Sender::reserve_many
77/// [`Sender::try_reserve_many()`]: Sender::try_reserve_many
78pub struct PermitIterator<'a, T> {
79    chan: &'a chan::Tx<T, Semaphore>,
80    n: usize,
81}
82
83/// Owned permit to send one value into the channel.
84///
85/// This is identical to the [`Permit`] type, except that it moves the sender
86/// rather than borrowing it.
87///
88/// `OwnedPermit` values are returned by [`Sender::reserve_owned()`] and
89/// [`Sender::try_reserve_owned()`] and are used to guarantee channel capacity
90/// before generating a message to send.
91///
92/// [`Permit`]: Permit
93/// [`Sender::reserve_owned()`]: Sender::reserve_owned
94/// [`Sender::try_reserve_owned()`]: Sender::try_reserve_owned
95pub struct OwnedPermit<T> {
96    chan: Option<chan::Tx<T, Semaphore>>,
97}
98
99/// Receives values from the associated `Sender`.
100///
101/// Instances are created by the [`channel`] function.
102///
103/// This receiver can be turned into a `Stream` using [`ReceiverStream`].
104///
105/// [`ReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.ReceiverStream.html
106pub struct Receiver<T> {
107    /// The channel receiver.
108    chan: chan::Rx<T, Semaphore>,
109}
110
111/// Creates a bounded mpsc channel for communicating between asynchronous tasks
112/// with backpressure.
113///
114/// The channel will buffer up to the provided number of messages.  Once the
115/// buffer is full, attempts to send new messages will wait until a message is
116/// received from the channel. The provided buffer capacity must be at least 1.
117///
118/// All data sent on `Sender` will become available on `Receiver` in the same
119/// order as it was sent.
120///
121/// The `Sender` can be cloned to `send` to the same channel from multiple code
122/// locations. Only one `Receiver` is supported.
123///
124/// If the `Receiver` is disconnected while trying to `send`, the `send` method
125/// will return a `SendError`. Similarly, if `Sender` is disconnected while
126/// trying to `recv`, the `recv` method will return `None`.
127///
128/// # Panics
129///
130/// Panics if the buffer capacity is 0.
131///
132/// # Examples
133///
134/// ```rust
135/// use tokio::sync::mpsc;
136///
137/// #[tokio::main]
138/// async fn main() {
139///     let (tx, mut rx) = mpsc::channel(100);
140///
141///     tokio::spawn(async move {
142///         for i in 0..10 {
143///             if let Err(_) = tx.send(i).await {
144///                 println!("receiver dropped");
145///                 return;
146///             }
147///         }
148///     });
149///
150///     while let Some(i) = rx.recv().await {
151///         println!("got = {}", i);
152///     }
153/// }
154/// ```
155#[track_caller]
156pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
157    assert!(buffer > 0, "mpsc bounded channel requires buffer > 0");
158    let semaphore = Semaphore {
159        semaphore: semaphore::Semaphore::new(buffer),
160        bound: buffer,
161    };
162    let (tx, rx) = chan::channel(semaphore);
163
164    let tx = Sender::new(tx);
165    let rx = Receiver::new(rx);
166
167    (tx, rx)
168}
169
170/// Channel semaphore is a tuple of the semaphore implementation and a `usize`
171/// representing the channel bound.
172#[derive(Debug)]
173pub(crate) struct Semaphore {
174    pub(crate) semaphore: semaphore::Semaphore,
175    pub(crate) bound: usize,
176}
177
178impl<T> Receiver<T> {
179    pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> Receiver<T> {
180        Receiver { chan }
181    }
182
183    /// Receives the next value for this receiver.
184    ///
185    /// This method returns `None` if the channel has been closed and there are
186    /// no remaining messages in the channel's buffer. This indicates that no
187    /// further values can ever be received from this `Receiver`. The channel is
188    /// closed when all senders have been dropped, or when [`close`] is called.
189    ///
190    /// If there are no messages in the channel's buffer, but the channel has
191    /// not yet been closed, this method will sleep until a message is sent or
192    /// the channel is closed.  Note that if [`close`] is called, but there are
193    /// still outstanding [`Permits`] from before it was closed, the channel is
194    /// not considered closed by `recv` until the permits are released.
195    ///
196    /// # Cancel safety
197    ///
198    /// This method is cancel safe. If `recv` is used as the event in a
199    /// [`tokio::select!`](crate::select) statement and some other branch
200    /// completes first, it is guaranteed that no messages were received on this
201    /// channel.
202    ///
203    /// [`close`]: Self::close
204    /// [`Permits`]: struct@crate::sync::mpsc::Permit
205    ///
206    /// # Examples
207    ///
208    /// ```
209    /// use tokio::sync::mpsc;
210    ///
211    /// #[tokio::main]
212    /// async fn main() {
213    ///     let (tx, mut rx) = mpsc::channel(100);
214    ///
215    ///     tokio::spawn(async move {
216    ///         tx.send("hello").await.unwrap();
217    ///     });
218    ///
219    ///     assert_eq!(Some("hello"), rx.recv().await);
220    ///     assert_eq!(None, rx.recv().await);
221    /// }
222    /// ```
223    ///
224    /// Values are buffered:
225    ///
226    /// ```
227    /// use tokio::sync::mpsc;
228    ///
229    /// #[tokio::main]
230    /// async fn main() {
231    ///     let (tx, mut rx) = mpsc::channel(100);
232    ///
233    ///     tx.send("hello").await.unwrap();
234    ///     tx.send("world").await.unwrap();
235    ///
236    ///     assert_eq!(Some("hello"), rx.recv().await);
237    ///     assert_eq!(Some("world"), rx.recv().await);
238    /// }
239    /// ```
240    pub async fn recv(&mut self) -> Option<T> {
241        use crate::future::poll_fn;
242        poll_fn(|cx| self.chan.recv(cx)).await
243    }
244
245    /// Receives the next values for this receiver and extends `buffer`.
246    ///
247    /// This method extends `buffer` by no more than a fixed number of values
248    /// as specified by `limit`. If `limit` is zero, the function immediately
249    /// returns `0`. The return value is the number of values added to `buffer`.
250    ///
251    /// For `limit > 0`, if there are no messages in the channel's queue, but
252    /// the channel has not yet been closed, this method will sleep until a
253    /// message is sent or the channel is closed. Note that if [`close`] is
254    /// called, but there are still outstanding [`Permits`] from before it was
255    /// closed, the channel is not considered closed by `recv_many` until the
256    /// permits are released.
257    ///
258    /// For non-zero values of `limit`, this method will never return `0` unless
259    /// the channel has been closed and there are no remaining messages in the
260    /// channel's queue. This indicates that no further values can ever be
261    /// received from this `Receiver`. The channel is closed when all senders
262    /// have been dropped, or when [`close`] is called.
263    ///
264    /// The capacity of `buffer` is increased as needed.
265    ///
266    /// # Cancel safety
267    ///
268    /// This method is cancel safe. If `recv_many` is used as the event in a
269    /// [`tokio::select!`](crate::select) statement and some other branch
270    /// completes first, it is guaranteed that no messages were received on this
271    /// channel.
272    ///
273    /// [`close`]: Self::close
274    /// [`Permits`]: struct@crate::sync::mpsc::Permit
275    ///
276    /// # Examples
277    ///
278    /// ```
279    /// use tokio::sync::mpsc;
280    ///
281    /// #[tokio::main]
282    /// async fn main() {
283    ///     let mut buffer: Vec<&str> = Vec::with_capacity(2);
284    ///     let limit = 2;
285    ///     let (tx, mut rx) = mpsc::channel(100);
286    ///     let tx2 = tx.clone();
287    ///     tx2.send("first").await.unwrap();
288    ///     tx2.send("second").await.unwrap();
289    ///     tx2.send("third").await.unwrap();
290    ///
291    ///     // Call `recv_many` to receive up to `limit` (2) values.
292    ///     assert_eq!(2, rx.recv_many(&mut buffer, limit).await);
293    ///     assert_eq!(vec!["first", "second"], buffer);
294    ///
295    ///     // If the buffer is full, the next call to `recv_many`
296    ///     // reserves additional capacity.
297    ///     assert_eq!(1, rx.recv_many(&mut buffer, 1).await);
298    ///
299    ///     tokio::spawn(async move {
300    ///         tx.send("fourth").await.unwrap();
301    ///     });
302    ///
303    ///     // 'tx' is dropped, but `recv_many`
304    ///     // is guaranteed not to return 0 as the channel
305    ///     // is not yet closed.
306    ///     assert_eq!(1, rx.recv_many(&mut buffer, 1).await);
307    ///     assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
308    ///
309    ///     // Once the last sender is dropped, the channel is
310    ///     // closed and `recv_many` returns 0, capacity unchanged.
311    ///     drop(tx2);
312    ///     assert_eq!(0, rx.recv_many(&mut buffer, limit).await);
313    ///     assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
314    /// }
315    /// ```
316    pub async fn recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize {
317        use crate::future::poll_fn;
318        poll_fn(|cx| self.chan.recv_many(cx, buffer, limit)).await
319    }
320
321    /// Tries to receive the next value for this receiver.
322    ///
323    /// This method returns the [`Empty`] error if the channel is currently
324    /// empty, but there are still outstanding [senders] or [permits].
325    ///
326    /// This method returns the [`Disconnected`] error if the channel is
327    /// currently empty, and there are no outstanding [senders] or [permits].
328    ///
329    /// Unlike the [`poll_recv`] method, this method will never return an
330    /// [`Empty`] error spuriously.
331    ///
332    /// [`Empty`]: crate::sync::mpsc::error::TryRecvError::Empty
333    /// [`Disconnected`]: crate::sync::mpsc::error::TryRecvError::Disconnected
334    /// [`poll_recv`]: Self::poll_recv
335    /// [senders]: crate::sync::mpsc::Sender
336    /// [permits]: crate::sync::mpsc::Permit
337    ///
338    /// # Examples
339    ///
340    /// ```
341    /// use tokio::sync::mpsc;
342    /// use tokio::sync::mpsc::error::TryRecvError;
343    ///
344    /// #[tokio::main]
345    /// async fn main() {
346    ///     let (tx, mut rx) = mpsc::channel(100);
347    ///
348    ///     tx.send("hello").await.unwrap();
349    ///
350    ///     assert_eq!(Ok("hello"), rx.try_recv());
351    ///     assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
352    ///
353    ///     tx.send("hello").await.unwrap();
354    ///     // Drop the last sender, closing the channel.
355    ///     drop(tx);
356    ///
357    ///     assert_eq!(Ok("hello"), rx.try_recv());
358    ///     assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
359    /// }
360    /// ```
361    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
362        self.chan.try_recv()
363    }
364
365    /// Blocking receive to call outside of asynchronous contexts.
366    ///
367    /// This method returns `None` if the channel has been closed and there are
368    /// no remaining messages in the channel's buffer. This indicates that no
369    /// further values can ever be received from this `Receiver`. The channel is
370    /// closed when all senders have been dropped, or when [`close`] is called.
371    ///
372    /// If there are no messages in the channel's buffer, but the channel has
373    /// not yet been closed, this method will block until a message is sent or
374    /// the channel is closed.
375    ///
376    /// This method is intended for use cases where you are sending from
377    /// asynchronous code to synchronous code, and will work even if the sender
378    /// is not using [`blocking_send`] to send the message.
379    ///
380    /// Note that if [`close`] is called, but there are still outstanding
381    /// [`Permits`] from before it was closed, the channel is not considered
382    /// closed by `blocking_recv` until the permits are released.
383    ///
384    /// [`close`]: Self::close
385    /// [`Permits`]: struct@crate::sync::mpsc::Permit
386    /// [`blocking_send`]: fn@crate::sync::mpsc::Sender::blocking_send
387    ///
388    /// # Panics
389    ///
390    /// This function panics if called within an asynchronous execution
391    /// context.
392    ///
393    /// # Examples
394    ///
395    /// ```
396    /// use std::thread;
397    /// use tokio::runtime::Runtime;
398    /// use tokio::sync::mpsc;
399    ///
400    /// fn main() {
401    ///     let (tx, mut rx) = mpsc::channel::<u8>(10);
402    ///
403    ///     let sync_code = thread::spawn(move || {
404    ///         assert_eq!(Some(10), rx.blocking_recv());
405    ///     });
406    ///
407    ///     Runtime::new()
408    ///         .unwrap()
409    ///         .block_on(async move {
410    ///             let _ = tx.send(10).await;
411    ///         });
412    ///     sync_code.join().unwrap()
413    /// }
414    /// ```
415    #[track_caller]
416    #[cfg(feature = "sync")]
417    #[cfg_attr(docsrs, doc(alias = "recv_blocking"))]
418    pub fn blocking_recv(&mut self) -> Option<T> {
419        crate::future::block_on(self.recv())
420    }
421
422    /// Closes the receiving half of a channel without dropping it.
423    ///
424    /// This prevents any further messages from being sent on the channel while
425    /// still enabling the receiver to drain messages that are buffered. Any
426    /// outstanding [`Permit`] values will still be able to send messages.
427    ///
428    /// To guarantee that no messages are dropped, after calling `close()`,
429    /// `recv()` must be called until `None` is returned. If there are
430    /// outstanding [`Permit`] or [`OwnedPermit`] values, the `recv` method will
431    /// not return `None` until those are released.
432    ///
433    /// [`Permit`]: Permit
434    /// [`OwnedPermit`]: OwnedPermit
435    ///
436    /// # Examples
437    ///
438    /// ```
439    /// use tokio::sync::mpsc;
440    ///
441    /// #[tokio::main]
442    /// async fn main() {
443    ///     let (tx, mut rx) = mpsc::channel(20);
444    ///
445    ///     tokio::spawn(async move {
446    ///         let mut i = 0;
447    ///         while let Ok(permit) = tx.reserve().await {
448    ///             permit.send(i);
449    ///             i += 1;
450    ///         }
451    ///     });
452    ///
453    ///     rx.close();
454    ///
455    ///     while let Some(msg) = rx.recv().await {
456    ///         println!("got {}", msg);
457    ///     }
458    ///
459    ///     // Channel closed and no messages are lost.
460    /// }
461    /// ```
462    pub fn close(&mut self) {
463        self.chan.close();
464    }
465
466    /// Checks if a channel is closed.
467    ///
468    /// This method returns `true` if the channel has been closed. The channel is closed
469    /// when all [`Sender`] have been dropped, or when [`Receiver::close`] is called.
470    ///
471    /// [`Sender`]: crate::sync::mpsc::Sender
472    /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
473    ///
474    /// # Examples
475    /// ```
476    /// use tokio::sync::mpsc;
477    ///
478    /// #[tokio::main]
479    /// async fn main() {
480    ///     let (_tx, mut rx) = mpsc::channel::<()>(10);
481    ///     assert!(!rx.is_closed());
482    ///
483    ///     rx.close();
484    ///
485    ///     assert!(rx.is_closed());
486    /// }
487    /// ```
488    pub fn is_closed(&self) -> bool {
489        self.chan.is_closed()
490    }
491
492    /// Checks if a channel is empty.
493    ///
494    /// This method returns `true` if the channel has no messages.
495    ///
496    /// # Examples
497    /// ```
498    /// use tokio::sync::mpsc;
499    ///
500    /// #[tokio::main]
501    /// async fn main() {
502    ///     let (tx, rx) = mpsc::channel(10);
503    ///     assert!(rx.is_empty());
504    ///
505    ///     tx.send(0).await.unwrap();
506    ///     assert!(!rx.is_empty());
507    /// }
508    ///
509    /// ```
510    pub fn is_empty(&self) -> bool {
511        self.chan.is_empty()
512    }
513
514    /// Returns the number of messages in the channel.
515    ///
516    /// # Examples
517    /// ```
518    /// use tokio::sync::mpsc;
519    ///
520    /// #[tokio::main]
521    /// async fn main() {
522    ///     let (tx, rx) = mpsc::channel(10);
523    ///     assert_eq!(0, rx.len());
524    ///
525    ///     tx.send(0).await.unwrap();
526    ///     assert_eq!(1, rx.len());
527    /// }
528    /// ```
529    pub fn len(&self) -> usize {
530        self.chan.len()
531    }
532
533    /// Returns the current capacity of the channel.
534    ///
535    /// The capacity goes down when the sender sends a value by calling [`Sender::send`] or by reserving
536    /// capacity with [`Sender::reserve`]. The capacity goes up when values are received.
537    /// This is distinct from [`max_capacity`], which always returns buffer capacity initially
538    /// specified when calling [`channel`].
539    ///
540    /// # Examples
541    ///
542    /// ```
543    /// use tokio::sync::mpsc;
544    ///
545    /// #[tokio::main]
546    /// async fn main() {
547    ///     let (tx, mut rx) = mpsc::channel::<()>(5);
548    ///
549    ///     assert_eq!(rx.capacity(), 5);
550    ///
551    ///     // Making a reservation drops the capacity by one.
552    ///     let permit = tx.reserve().await.unwrap();
553    ///     assert_eq!(rx.capacity(), 4);
554    ///     assert_eq!(rx.len(), 0);
555    ///
556    ///     // Sending and receiving a value increases the capacity by one.
557    ///     permit.send(());
558    ///     assert_eq!(rx.len(), 1);
559    ///     rx.recv().await.unwrap();
560    ///     assert_eq!(rx.capacity(), 5);
561    ///
562    ///     // Directly sending a message drops the capacity by one.
563    ///     tx.send(()).await.unwrap();
564    ///     assert_eq!(rx.capacity(), 4);
565    ///     assert_eq!(rx.len(), 1);
566    ///
567    ///     // Receiving the message increases the capacity by one.
568    ///     rx.recv().await.unwrap();
569    ///     assert_eq!(rx.capacity(), 5);
570    ///     assert_eq!(rx.len(), 0);
571    /// }
572    /// ```
573    /// [`capacity`]: Receiver::capacity
574    /// [`max_capacity`]: Receiver::max_capacity
575    pub fn capacity(&self) -> usize {
576        self.chan.semaphore().semaphore.available_permits()
577    }
578
579    /// Returns the maximum buffer capacity of the channel.
580    ///
581    /// The maximum capacity is the buffer capacity initially specified when calling
582    /// [`channel`]. This is distinct from [`capacity`], which returns the *current*
583    /// available buffer capacity: as messages are sent and received, the value
584    /// returned by [`capacity`] will go up or down, whereas the value
585    /// returned by [`max_capacity`] will remain constant.
586    ///
587    /// # Examples
588    ///
589    /// ```
590    /// use tokio::sync::mpsc;
591    ///
592    /// #[tokio::main]
593    /// async fn main() {
594    ///     let (tx, rx) = mpsc::channel::<()>(5);
595    ///
596    ///     // both max capacity and capacity are the same at first
597    ///     assert_eq!(rx.max_capacity(), 5);
598    ///     assert_eq!(rx.capacity(), 5);
599    ///
600    ///     // Making a reservation doesn't change the max capacity.
601    ///     let permit = tx.reserve().await.unwrap();
602    ///     assert_eq!(rx.max_capacity(), 5);
603    ///     // but drops the capacity by one
604    ///     assert_eq!(rx.capacity(), 4);
605    /// }
606    /// ```
607    /// [`capacity`]: Receiver::capacity
608    /// [`max_capacity`]: Receiver::max_capacity
609    pub fn max_capacity(&self) -> usize {
610        self.chan.semaphore().bound
611    }
612
613    /// Polls to receive the next message on this channel.
614    ///
615    /// This method returns:
616    ///
617    ///  * `Poll::Pending` if no messages are available but the channel is not
618    ///    closed, or if a spurious failure happens.
619    ///  * `Poll::Ready(Some(message))` if a message is available.
620    ///  * `Poll::Ready(None)` if the channel has been closed and all messages
621    ///    sent before it was closed have been received.
622    ///
623    /// When the method returns `Poll::Pending`, the `Waker` in the provided
624    /// `Context` is scheduled to receive a wakeup when a message is sent on any
625    /// receiver, or when the channel is closed.  Note that on multiple calls to
626    /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
627    /// passed to the most recent call is scheduled to receive a wakeup.
628    ///
629    /// If this method returns `Poll::Pending` due to a spurious failure, then
630    /// the `Waker` will be notified when the situation causing the spurious
631    /// failure has been resolved. Note that receiving such a wakeup does not
632    /// guarantee that the next call will succeed — it could fail with another
633    /// spurious failure.
634    pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
635        self.chan.recv(cx)
636    }
637
638    /// Polls to receive multiple messages on this channel, extending the provided buffer.
639    ///
640    /// This method returns:
641    /// * `Poll::Pending` if no messages are available but the channel is not closed, or if a
642    ///   spurious failure happens.
643    /// * `Poll::Ready(count)` where `count` is the number of messages successfully received and
644    ///   stored in `buffer`. This can be less than, or equal to, `limit`.
645    /// * `Poll::Ready(0)` if `limit` is set to zero or when the channel is closed.
646    ///
647    /// When the method returns `Poll::Pending`, the `Waker` in the provided
648    /// `Context` is scheduled to receive a wakeup when a message is sent on any
649    /// receiver, or when the channel is closed.  Note that on multiple calls to
650    /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
651    /// passed to the most recent call is scheduled to receive a wakeup.
652    ///
653    /// Note that this method does not guarantee that exactly `limit` messages
654    /// are received. Rather, if at least one message is available, it returns
655    /// as many messages as it can up to the given limit. This method returns
656    /// zero only if the channel is closed (or if `limit` is zero).
657    ///
658    /// # Examples
659    ///
660    /// ```
661    /// use std::task::{Context, Poll};
662    /// use std::pin::Pin;
663    /// use tokio::sync::mpsc;
664    /// use futures::Future;
665    ///
666    /// struct MyReceiverFuture<'a> {
667    ///     receiver: mpsc::Receiver<i32>,
668    ///     buffer: &'a mut Vec<i32>,
669    ///     limit: usize,
670    /// }
671    ///
672    /// impl<'a> Future for MyReceiverFuture<'a> {
673    ///     type Output = usize; // Number of messages received
674    ///
675    ///     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
676    ///         let MyReceiverFuture { receiver, buffer, limit } = &mut *self;
677    ///
678    ///         // Now `receiver` and `buffer` are mutable references, and `limit` is copied
679    ///         match receiver.poll_recv_many(cx, *buffer, *limit) {
680    ///             Poll::Pending => Poll::Pending,
681    ///             Poll::Ready(count) => Poll::Ready(count),
682    ///         }
683    ///     }
684    /// }
685    ///
686    /// #[tokio::main]
687    /// async fn main() {
688    ///     let (tx, rx) = mpsc::channel(32);
689    ///     let mut buffer = Vec::new();
690    ///
691    ///     let my_receiver_future = MyReceiverFuture {
692    ///         receiver: rx,
693    ///         buffer: &mut buffer,
694    ///         limit: 3,
695    ///     };
696    ///
697    ///     for i in 0..10 {
698    ///         tx.send(i).await.unwrap();
699    ///     }
700    ///
701    ///     let count = my_receiver_future.await;
702    ///     assert_eq!(count, 3);
703    ///     assert_eq!(buffer, vec![0,1,2])
704    /// }
705    /// ```
706    pub fn poll_recv_many(
707        &mut self,
708        cx: &mut Context<'_>,
709        buffer: &mut Vec<T>,
710        limit: usize,
711    ) -> Poll<usize> {
712        self.chan.recv_many(cx, buffer, limit)
713    }
714}
715
716impl<T> fmt::Debug for Receiver<T> {
717    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
718        fmt.debug_struct("Receiver")
719            .field("chan", &self.chan)
720            .finish()
721    }
722}
723
724impl<T> Unpin for Receiver<T> {}
725
726impl<T> Sender<T> {
727    pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> Sender<T> {
728        Sender { chan }
729    }
730
731    /// Sends a value, waiting until there is capacity.
732    ///
733    /// A successful send occurs when it is determined that the other end of the
734    /// channel has not hung up already. An unsuccessful send would be one where
735    /// the corresponding receiver has already been closed. Note that a return
736    /// value of `Err` means that the data will never be received, but a return
737    /// value of `Ok` does not mean that the data will be received. It is
738    /// possible for the corresponding receiver to hang up immediately after
739    /// this function returns `Ok`.
740    ///
741    /// # Errors
742    ///
743    /// If the receive half of the channel is closed, either due to [`close`]
744    /// being called or the [`Receiver`] handle dropping, the function returns
745    /// an error. The error includes the value passed to `send`.
746    ///
747    /// [`close`]: Receiver::close
748    /// [`Receiver`]: Receiver
749    ///
750    /// # Cancel safety
751    ///
752    /// If `send` is used as the event in a [`tokio::select!`](crate::select)
753    /// statement and some other branch completes first, then it is guaranteed
754    /// that the message was not sent. **However, in that case, the message
755    /// is dropped and will be lost.**
756    ///
757    /// To avoid losing messages, use [`reserve`](Self::reserve) to reserve
758    /// capacity, then use the returned [`Permit`] to send the message.
759    ///
760    /// This channel uses a queue to ensure that calls to `send` and `reserve`
761    /// complete in the order they were requested.  Cancelling a call to
762    /// `send` makes you lose your place in the queue.
763    ///
764    /// # Examples
765    ///
766    /// In the following example, each call to `send` will block until the
767    /// previously sent value was received.
768    ///
769    /// ```rust
770    /// use tokio::sync::mpsc;
771    ///
772    /// #[tokio::main]
773    /// async fn main() {
774    ///     let (tx, mut rx) = mpsc::channel(1);
775    ///
776    ///     tokio::spawn(async move {
777    ///         for i in 0..10 {
778    ///             if let Err(_) = tx.send(i).await {
779    ///                 println!("receiver dropped");
780    ///                 return;
781    ///             }
782    ///         }
783    ///     });
784    ///
785    ///     while let Some(i) = rx.recv().await {
786    ///         println!("got = {}", i);
787    ///     }
788    /// }
789    /// ```
790    pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
791        match self.reserve().await {
792            Ok(permit) => {
793                permit.send(value);
794                Ok(())
795            }
796            Err(_) => Err(SendError(value)),
797        }
798    }
799
800    /// Completes when the receiver has dropped.
801    ///
802    /// This allows the producers to get notified when interest in the produced
803    /// values is canceled and immediately stop doing work.
804    ///
805    /// # Cancel safety
806    ///
807    /// This method is cancel safe. Once the channel is closed, it stays closed
808    /// forever and all future calls to `closed` will return immediately.
809    ///
810    /// # Examples
811    ///
812    /// ```
813    /// use tokio::sync::mpsc;
814    ///
815    /// #[tokio::main]
816    /// async fn main() {
817    ///     let (tx1, rx) = mpsc::channel::<()>(1);
818    ///     let tx2 = tx1.clone();
819    ///     let tx3 = tx1.clone();
820    ///     let tx4 = tx1.clone();
821    ///     let tx5 = tx1.clone();
822    ///     tokio::spawn(async move {
823    ///         drop(rx);
824    ///     });
825    ///
826    ///     futures::join!(
827    ///         tx1.closed(),
828    ///         tx2.closed(),
829    ///         tx3.closed(),
830    ///         tx4.closed(),
831    ///         tx5.closed()
832    ///     );
833    ///     println!("Receiver dropped");
834    /// }
835    /// ```
836    pub async fn closed(&self) {
837        self.chan.closed().await;
838    }
839
840    /// Attempts to immediately send a message on this `Sender`
841    ///
842    /// This method differs from [`send`] by returning immediately if the channel's
843    /// buffer is full or no receiver is waiting to acquire some data. Compared
844    /// with [`send`], this function has two failure cases instead of one (one for
845    /// disconnection, one for a full buffer).
846    ///
847    /// # Errors
848    ///
849    /// If the channel capacity has been reached, i.e., the channel has `n`
850    /// buffered values where `n` is the argument passed to [`channel`], then an
851    /// error is returned.
852    ///
853    /// If the receive half of the channel is closed, either due to [`close`]
854    /// being called or the [`Receiver`] handle dropping, the function returns
855    /// an error. The error includes the value passed to `send`.
856    ///
857    /// [`send`]: Sender::send
858    /// [`channel`]: channel
859    /// [`close`]: Receiver::close
860    ///
861    /// # Examples
862    ///
863    /// ```
864    /// use tokio::sync::mpsc;
865    ///
866    /// #[tokio::main]
867    /// async fn main() {
868    ///     // Create a channel with buffer size 1
869    ///     let (tx1, mut rx) = mpsc::channel(1);
870    ///     let tx2 = tx1.clone();
871    ///
872    ///     tokio::spawn(async move {
873    ///         tx1.send(1).await.unwrap();
874    ///         tx1.send(2).await.unwrap();
875    ///         // task waits until the receiver receives a value.
876    ///     });
877    ///
878    ///     tokio::spawn(async move {
879    ///         // This will return an error and send
880    ///         // no message if the buffer is full
881    ///         let _ = tx2.try_send(3);
882    ///     });
883    ///
884    ///     let mut msg;
885    ///     msg = rx.recv().await.unwrap();
886    ///     println!("message {} received", msg);
887    ///
888    ///     msg = rx.recv().await.unwrap();
889    ///     println!("message {} received", msg);
890    ///
891    ///     // Third message may have never been sent
892    ///     match rx.recv().await {
893    ///         Some(msg) => println!("message {} received", msg),
894    ///         None => println!("the third message was never sent"),
895    ///     }
896    /// }
897    /// ```
898    pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
899        match self.chan.semaphore().semaphore.try_acquire(1) {
900            Ok(()) => {}
901            Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(message)),
902            Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(message)),
903        }
904
905        // Send the message
906        self.chan.send(message);
907        Ok(())
908    }
909
910    /// Sends a value, waiting until there is capacity, but only for a limited time.
911    ///
912    /// Shares the same success and error conditions as [`send`], adding one more
913    /// condition for an unsuccessful send, which is when the provided timeout has
914    /// elapsed, and there is no capacity available.
915    ///
916    /// [`send`]: Sender::send
917    ///
918    /// # Errors
919    ///
920    /// If the receive half of the channel is closed, either due to [`close`]
921    /// being called or the [`Receiver`] having been dropped,
922    /// the function returns an error. The error includes the value passed to `send`.
923    ///
924    /// [`close`]: Receiver::close
925    /// [`Receiver`]: Receiver
926    ///
927    /// # Panics
928    ///
929    /// This function panics if it is called outside the context of a Tokio
930    /// runtime [with time enabled](crate::runtime::Builder::enable_time).
931    ///
932    /// # Examples
933    ///
934    /// In the following example, each call to `send_timeout` will block until the
935    /// previously sent value was received, unless the timeout has elapsed.
936    ///
937    /// ```rust
938    /// use tokio::sync::mpsc;
939    /// use tokio::time::{sleep, Duration};
940    ///
941    /// #[tokio::main]
942    /// async fn main() {
943    ///     let (tx, mut rx) = mpsc::channel(1);
944    ///
945    ///     tokio::spawn(async move {
946    ///         for i in 0..10 {
947    ///             if let Err(e) = tx.send_timeout(i, Duration::from_millis(100)).await {
948    ///                 println!("send error: #{:?}", e);
949    ///                 return;
950    ///             }
951    ///         }
952    ///     });
953    ///
954    ///     while let Some(i) = rx.recv().await {
955    ///         println!("got = {}", i);
956    ///         sleep(Duration::from_millis(200)).await;
957    ///     }
958    /// }
959    /// ```
960    #[cfg(feature = "time")]
961    #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
962    pub async fn send_timeout(
963        &self,
964        value: T,
965        timeout: Duration,
966    ) -> Result<(), SendTimeoutError<T>> {
967        let permit = match crate::time::timeout(timeout, self.reserve()).await {
968            Err(_) => {
969                return Err(SendTimeoutError::Timeout(value));
970            }
971            Ok(Err(_)) => {
972                return Err(SendTimeoutError::Closed(value));
973            }
974            Ok(Ok(permit)) => permit,
975        };
976
977        permit.send(value);
978        Ok(())
979    }
980
981    /// Blocking send to call outside of asynchronous contexts.
982    ///
983    /// This method is intended for use cases where you are sending from
984    /// synchronous code to asynchronous code, and will work even if the
985    /// receiver is not using [`blocking_recv`] to receive the message.
986    ///
987    /// [`blocking_recv`]: fn@crate::sync::mpsc::Receiver::blocking_recv
988    ///
989    /// # Panics
990    ///
991    /// This function panics if called within an asynchronous execution
992    /// context.
993    ///
994    /// # Examples
995    ///
996    /// ```
997    /// use std::thread;
998    /// use tokio::runtime::Runtime;
999    /// use tokio::sync::mpsc;
1000    ///
1001    /// fn main() {
1002    ///     let (tx, mut rx) = mpsc::channel::<u8>(1);
1003    ///
1004    ///     let sync_code = thread::spawn(move || {
1005    ///         tx.blocking_send(10).unwrap();
1006    ///     });
1007    ///
1008    ///     Runtime::new().unwrap().block_on(async move {
1009    ///         assert_eq!(Some(10), rx.recv().await);
1010    ///     });
1011    ///     sync_code.join().unwrap()
1012    /// }
1013    /// ```
1014    #[track_caller]
1015    #[cfg(feature = "sync")]
1016    #[cfg_attr(docsrs, doc(alias = "send_blocking"))]
1017    pub fn blocking_send(&self, value: T) -> Result<(), SendError<T>> {
1018        crate::future::block_on(self.send(value))
1019    }
1020
1021    /// Checks if the channel has been closed. This happens when the
1022    /// [`Receiver`] is dropped, or when the [`Receiver::close`] method is
1023    /// called.
1024    ///
1025    /// [`Receiver`]: crate::sync::mpsc::Receiver
1026    /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
1027    ///
1028    /// ```
1029    /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(42);
1030    /// assert!(!tx.is_closed());
1031    ///
1032    /// let tx2 = tx.clone();
1033    /// assert!(!tx2.is_closed());
1034    ///
1035    /// drop(rx);
1036    /// assert!(tx.is_closed());
1037    /// assert!(tx2.is_closed());
1038    /// ```
1039    pub fn is_closed(&self) -> bool {
1040        self.chan.is_closed()
1041    }
1042
1043    /// Waits for channel capacity. Once capacity to send one message is
1044    /// available, it is reserved for the caller.
1045    ///
1046    /// If the channel is full, the function waits for the number of unreceived
1047    /// messages to become less than the channel capacity. Capacity to send one
1048    /// message is reserved for the caller. A [`Permit`] is returned to track
1049    /// the reserved capacity. The [`send`] function on [`Permit`] consumes the
1050    /// reserved capacity.
1051    ///
1052    /// Dropping [`Permit`] without sending a message releases the capacity back
1053    /// to the channel.
1054    ///
1055    /// [`Permit`]: Permit
1056    /// [`send`]: Permit::send
1057    ///
1058    /// # Cancel safety
1059    ///
1060    /// This channel uses a queue to ensure that calls to `send` and `reserve`
1061    /// complete in the order they were requested.  Cancelling a call to
1062    /// `reserve` makes you lose your place in the queue.
1063    ///
1064    /// # Examples
1065    ///
1066    /// ```
1067    /// use tokio::sync::mpsc;
1068    ///
1069    /// #[tokio::main]
1070    /// async fn main() {
1071    ///     let (tx, mut rx) = mpsc::channel(1);
1072    ///
1073    ///     // Reserve capacity
1074    ///     let permit = tx.reserve().await.unwrap();
1075    ///
1076    ///     // Trying to send directly on the `tx` will fail due to no
1077    ///     // available capacity.
1078    ///     assert!(tx.try_send(123).is_err());
1079    ///
1080    ///     // Sending on the permit succeeds
1081    ///     permit.send(456);
1082    ///
1083    ///     // The value sent on the permit is received
1084    ///     assert_eq!(rx.recv().await.unwrap(), 456);
1085    /// }
1086    /// ```
1087    pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> {
1088        self.reserve_inner(1).await?;
1089        Ok(Permit { chan: &self.chan })
1090    }
1091
1092    /// Waits for channel capacity. Once capacity to send `n` messages is
1093    /// available, it is reserved for the caller.
1094    ///
1095    /// If the channel is full or if there are fewer than `n` permits available, the function waits
1096    /// for the number of unreceived messages to become `n` less than the channel capacity.
1097    /// Capacity to send `n` message is then reserved for the caller.
1098    ///
1099    /// A [`PermitIterator`] is returned to track the reserved capacity.
1100    /// You can call this [`Iterator`] until it is exhausted to
1101    /// get a [`Permit`] and then call [`Permit::send`]. This function is similar to
1102    /// [`try_reserve_many`] except it awaits for the slots to become available.
1103    ///
1104    /// If the channel is closed, the function returns a [`SendError`].
1105    ///
1106    /// Dropping [`PermitIterator`] without consuming it entirely releases the remaining
1107    /// permits back to the channel.
1108    ///
1109    /// [`PermitIterator`]: PermitIterator
1110    /// [`Permit`]: Permit
1111    /// [`send`]: Permit::send
1112    /// [`try_reserve_many`]: Sender::try_reserve_many
1113    ///
1114    /// # Cancel safety
1115    ///
1116    /// This channel uses a queue to ensure that calls to `send` and `reserve_many`
1117    /// complete in the order they were requested. Cancelling a call to
1118    /// `reserve_many` makes you lose your place in the queue.
1119    ///
1120    /// # Examples
1121    ///
1122    /// ```
1123    /// use tokio::sync::mpsc;
1124    ///
1125    /// #[tokio::main]
1126    /// async fn main() {
1127    ///     let (tx, mut rx) = mpsc::channel(2);
1128    ///
1129    ///     // Reserve capacity
1130    ///     let mut permit = tx.reserve_many(2).await.unwrap();
1131    ///
1132    ///     // Trying to send directly on the `tx` will fail due to no
1133    ///     // available capacity.
1134    ///     assert!(tx.try_send(123).is_err());
1135    ///
1136    ///     // Sending with the permit iterator succeeds
1137    ///     permit.next().unwrap().send(456);
1138    ///     permit.next().unwrap().send(457);
1139    ///
1140    ///     // The iterator should now be exhausted
1141    ///     assert!(permit.next().is_none());
1142    ///
1143    ///     // The value sent on the permit is received
1144    ///     assert_eq!(rx.recv().await.unwrap(), 456);
1145    ///     assert_eq!(rx.recv().await.unwrap(), 457);
1146    /// }
1147    /// ```
1148    pub async fn reserve_many(&self, n: usize) -> Result<PermitIterator<'_, T>, SendError<()>> {
1149        self.reserve_inner(n).await?;
1150        Ok(PermitIterator {
1151            chan: &self.chan,
1152            n,
1153        })
1154    }
1155
1156    /// Waits for channel capacity, moving the `Sender` and returning an owned
1157    /// permit. Once capacity to send one message is available, it is reserved
1158    /// for the caller.
1159    ///
1160    /// This moves the sender _by value_, and returns an owned permit that can
1161    /// be used to send a message into the channel. Unlike [`Sender::reserve`],
1162    /// this method may be used in cases where the permit must be valid for the
1163    /// `'static` lifetime. `Sender`s may be cloned cheaply (`Sender::clone` is
1164    /// essentially a reference count increment, comparable to [`Arc::clone`]),
1165    /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be
1166    /// moved, it can be cloned prior to calling `reserve_owned`.
1167    ///
1168    /// If the channel is full, the function waits for the number of unreceived
1169    /// messages to become less than the channel capacity. Capacity to send one
1170    /// message is reserved for the caller. An [`OwnedPermit`] is returned to
1171    /// track the reserved capacity. The [`send`] function on [`OwnedPermit`]
1172    /// consumes the reserved capacity.
1173    ///
1174    /// Dropping the [`OwnedPermit`] without sending a message releases the
1175    /// capacity back to the channel.
1176    ///
1177    /// # Cancel safety
1178    ///
1179    /// This channel uses a queue to ensure that calls to `send` and `reserve`
1180    /// complete in the order they were requested.  Cancelling a call to
1181    /// `reserve_owned` makes you lose your place in the queue.
1182    ///
1183    /// # Examples
1184    /// Sending a message using an [`OwnedPermit`]:
1185    /// ```
1186    /// use tokio::sync::mpsc;
1187    ///
1188    /// #[tokio::main]
1189    /// async fn main() {
1190    ///     let (tx, mut rx) = mpsc::channel(1);
1191    ///
1192    ///     // Reserve capacity, moving the sender.
1193    ///     let permit = tx.reserve_owned().await.unwrap();
1194    ///
1195    ///     // Send a message, consuming the permit and returning
1196    ///     // the moved sender.
1197    ///     let tx = permit.send(123);
1198    ///
1199    ///     // The value sent on the permit is received.
1200    ///     assert_eq!(rx.recv().await.unwrap(), 123);
1201    ///
1202    ///     // The sender can now be used again.
1203    ///     tx.send(456).await.unwrap();
1204    /// }
1205    /// ```
1206    ///
1207    /// When multiple [`OwnedPermit`]s are needed, or the sender cannot be moved
1208    /// by value, it can be inexpensively cloned before calling `reserve_owned`:
1209    ///
1210    /// ```
1211    /// use tokio::sync::mpsc;
1212    ///
1213    /// #[tokio::main]
1214    /// async fn main() {
1215    ///     let (tx, mut rx) = mpsc::channel(1);
1216    ///
1217    ///     // Clone the sender and reserve capacity.
1218    ///     let permit = tx.clone().reserve_owned().await.unwrap();
1219    ///
1220    ///     // Trying to send directly on the `tx` will fail due to no
1221    ///     // available capacity.
1222    ///     assert!(tx.try_send(123).is_err());
1223    ///
1224    ///     // Sending on the permit succeeds.
1225    ///     permit.send(456);
1226    ///
1227    ///     // The value sent on the permit is received
1228    ///     assert_eq!(rx.recv().await.unwrap(), 456);
1229    /// }
1230    /// ```
1231    ///
1232    /// [`Sender::reserve`]: Sender::reserve
1233    /// [`OwnedPermit`]: OwnedPermit
1234    /// [`send`]: OwnedPermit::send
1235    /// [`Arc::clone`]: std::sync::Arc::clone
1236    pub async fn reserve_owned(self) -> Result<OwnedPermit<T>, SendError<()>> {
1237        self.reserve_inner(1).await?;
1238        Ok(OwnedPermit {
1239            chan: Some(self.chan),
1240        })
1241    }
1242
1243    async fn reserve_inner(&self, n: usize) -> Result<(), SendError<()>> {
1244        crate::trace::async_trace_leaf().await;
1245
1246        if n > self.max_capacity() {
1247            return Err(SendError(()));
1248        }
1249        match self.chan.semaphore().semaphore.acquire(n).await {
1250            Ok(()) => Ok(()),
1251            Err(_) => Err(SendError(())),
1252        }
1253    }
1254
1255    /// Tries to acquire a slot in the channel without waiting for the slot to become
1256    /// available.
1257    ///
1258    /// If the channel is full this function will return [`TrySendError`], otherwise
1259    /// if there is a slot available it will return a [`Permit`] that will then allow you
1260    /// to [`send`] on the channel with a guaranteed slot. This function is similar to
1261    /// [`reserve`] except it does not await for the slot to become available.
1262    ///
1263    /// Dropping [`Permit`] without sending a message releases the capacity back
1264    /// to the channel.
1265    ///
1266    /// [`Permit`]: Permit
1267    /// [`send`]: Permit::send
1268    /// [`reserve`]: Sender::reserve
1269    ///
1270    /// # Examples
1271    ///
1272    /// ```
1273    /// use tokio::sync::mpsc;
1274    ///
1275    /// #[tokio::main]
1276    /// async fn main() {
1277    ///     let (tx, mut rx) = mpsc::channel(1);
1278    ///
1279    ///     // Reserve capacity
1280    ///     let permit = tx.try_reserve().unwrap();
1281    ///
1282    ///     // Trying to send directly on the `tx` will fail due to no
1283    ///     // available capacity.
1284    ///     assert!(tx.try_send(123).is_err());
1285    ///
1286    ///     // Trying to reserve an additional slot on the `tx` will
1287    ///     // fail because there is no capacity.
1288    ///     assert!(tx.try_reserve().is_err());
1289    ///
1290    ///     // Sending on the permit succeeds
1291    ///     permit.send(456);
1292    ///
1293    ///     // The value sent on the permit is received
1294    ///     assert_eq!(rx.recv().await.unwrap(), 456);
1295    ///
1296    /// }
1297    /// ```
1298    pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> {
1299        match self.chan.semaphore().semaphore.try_acquire(1) {
1300            Ok(()) => {}
1301            Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())),
1302            Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())),
1303        }
1304
1305        Ok(Permit { chan: &self.chan })
1306    }
1307
1308    /// Tries to acquire `n` slots in the channel without waiting for the slot to become
1309    /// available.
1310    ///
1311    /// A [`PermitIterator`] is returned to track the reserved capacity.
1312    /// You can call this [`Iterator`] until it is exhausted to
1313    /// get a [`Permit`] and then call [`Permit::send`]. This function is similar to
1314    /// [`reserve_many`] except it does not await for the slots to become available.
1315    ///
1316    /// If there are fewer than `n` permits available on the channel, then
1317    /// this function will return a [`TrySendError::Full`]. If the channel is closed
1318    /// this function will return a [`TrySendError::Closed`].
1319    ///
1320    /// Dropping [`PermitIterator`] without consuming it entirely releases the remaining
1321    /// permits back to the channel.
1322    ///
1323    /// [`PermitIterator`]: PermitIterator
1324    /// [`send`]: Permit::send
1325    /// [`reserve_many`]: Sender::reserve_many
1326    ///
1327    /// # Examples
1328    ///
1329    /// ```
1330    /// use tokio::sync::mpsc;
1331    ///
1332    /// #[tokio::main]
1333    /// async fn main() {
1334    ///     let (tx, mut rx) = mpsc::channel(2);
1335    ///
1336    ///     // Reserve capacity
1337    ///     let mut permit = tx.try_reserve_many(2).unwrap();
1338    ///
1339    ///     // Trying to send directly on the `tx` will fail due to no
1340    ///     // available capacity.
1341    ///     assert!(tx.try_send(123).is_err());
1342    ///
1343    ///     // Trying to reserve an additional slot on the `tx` will
1344    ///     // fail because there is no capacity.
1345    ///     assert!(tx.try_reserve().is_err());
1346    ///
1347    ///     // Sending with the permit iterator succeeds
1348    ///     permit.next().unwrap().send(456);
1349    ///     permit.next().unwrap().send(457);
1350    ///
1351    ///     // The iterator should now be exhausted
1352    ///     assert!(permit.next().is_none());
1353    ///
1354    ///     // The value sent on the permit is received
1355    ///     assert_eq!(rx.recv().await.unwrap(), 456);
1356    ///     assert_eq!(rx.recv().await.unwrap(), 457);
1357    ///
1358    ///     // Trying to call try_reserve_many with 0 will return an empty iterator
1359    ///     let mut permit = tx.try_reserve_many(0).unwrap();
1360    ///     assert!(permit.next().is_none());
1361    ///
1362    ///     // Trying to call try_reserve_many with a number greater than the channel
1363    ///     // capacity will return an error
1364    ///     let permit = tx.try_reserve_many(3);
1365    ///     assert!(permit.is_err());
1366    ///
1367    ///     // Trying to call try_reserve_many on a closed channel will return an error
1368    ///     drop(rx);
1369    ///     let permit = tx.try_reserve_many(1);
1370    ///     assert!(permit.is_err());
1371    ///
1372    ///     let permit = tx.try_reserve_many(0);
1373    ///     assert!(permit.is_err());
1374    /// }
1375    /// ```
1376    pub fn try_reserve_many(&self, n: usize) -> Result<PermitIterator<'_, T>, TrySendError<()>> {
1377        if n > self.max_capacity() {
1378            return Err(TrySendError::Full(()));
1379        }
1380
1381        match self.chan.semaphore().semaphore.try_acquire(n) {
1382            Ok(()) => {}
1383            Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())),
1384            Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())),
1385        }
1386
1387        Ok(PermitIterator {
1388            chan: &self.chan,
1389            n,
1390        })
1391    }
1392
1393    /// Tries to acquire a slot in the channel without waiting for the slot to become
1394    /// available, returning an owned permit.
1395    ///
1396    /// This moves the sender _by value_, and returns an owned permit that can
1397    /// be used to send a message into the channel. Unlike [`Sender::try_reserve`],
1398    /// this method may be used in cases where the permit must be valid for the
1399    /// `'static` lifetime.  `Sender`s may be cloned cheaply (`Sender::clone` is
1400    /// essentially a reference count increment, comparable to [`Arc::clone`]),
1401    /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be
1402    /// moved, it can be cloned prior to calling `try_reserve_owned`.
1403    ///
1404    /// If the channel is full this function will return a [`TrySendError`].
1405    /// Since the sender is taken by value, the `TrySendError` returned in this
1406    /// case contains the sender, so that it may be used again. Otherwise, if
1407    /// there is a slot available, this method will return an [`OwnedPermit`]
1408    /// that can then be used to [`send`] on the channel with a guaranteed slot.
1409    /// This function is similar to  [`reserve_owned`] except it does not await
1410    /// for the slot to become available.
1411    ///
1412    /// Dropping the [`OwnedPermit`] without sending a message releases the capacity back
1413    /// to the channel.
1414    ///
1415    /// [`OwnedPermit`]: OwnedPermit
1416    /// [`send`]: OwnedPermit::send
1417    /// [`reserve_owned`]: Sender::reserve_owned
1418    /// [`Arc::clone`]: std::sync::Arc::clone
1419    ///
1420    /// # Examples
1421    ///
1422    /// ```
1423    /// use tokio::sync::mpsc;
1424    ///
1425    /// #[tokio::main]
1426    /// async fn main() {
1427    ///     let (tx, mut rx) = mpsc::channel(1);
1428    ///
1429    ///     // Reserve capacity
1430    ///     let permit = tx.clone().try_reserve_owned().unwrap();
1431    ///
1432    ///     // Trying to send directly on the `tx` will fail due to no
1433    ///     // available capacity.
1434    ///     assert!(tx.try_send(123).is_err());
1435    ///
1436    ///     // Trying to reserve an additional slot on the `tx` will
1437    ///     // fail because there is no capacity.
1438    ///     assert!(tx.try_reserve().is_err());
1439    ///
1440    ///     // Sending on the permit succeeds
1441    ///     permit.send(456);
1442    ///
1443    ///     // The value sent on the permit is received
1444    ///     assert_eq!(rx.recv().await.unwrap(), 456);
1445    ///
1446    /// }
1447    /// ```
1448    pub fn try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>> {
1449        match self.chan.semaphore().semaphore.try_acquire(1) {
1450            Ok(()) => {}
1451            Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(self)),
1452            Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(self)),
1453        }
1454
1455        Ok(OwnedPermit {
1456            chan: Some(self.chan),
1457        })
1458    }
1459
1460    /// Returns `true` if senders belong to the same channel.
1461    ///
1462    /// # Examples
1463    ///
1464    /// ```
1465    /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(1);
1466    /// let  tx2 = tx.clone();
1467    /// assert!(tx.same_channel(&tx2));
1468    ///
1469    /// let (tx3, rx3) = tokio::sync::mpsc::channel::<()>(1);
1470    /// assert!(!tx3.same_channel(&tx2));
1471    /// ```
1472    pub fn same_channel(&self, other: &Self) -> bool {
1473        self.chan.same_channel(&other.chan)
1474    }
1475
1476    /// Returns the current capacity of the channel.
1477    ///
1478    /// The capacity goes down when sending a value by calling [`send`] or by reserving capacity
1479    /// with [`reserve`]. The capacity goes up when values are received by the [`Receiver`].
1480    /// This is distinct from [`max_capacity`], which always returns buffer capacity initially
1481    /// specified when calling [`channel`]
1482    ///
1483    /// # Examples
1484    ///
1485    /// ```
1486    /// use tokio::sync::mpsc;
1487    ///
1488    /// #[tokio::main]
1489    /// async fn main() {
1490    ///     let (tx, mut rx) = mpsc::channel::<()>(5);
1491    ///
1492    ///     assert_eq!(tx.capacity(), 5);
1493    ///
1494    ///     // Making a reservation drops the capacity by one.
1495    ///     let permit = tx.reserve().await.unwrap();
1496    ///     assert_eq!(tx.capacity(), 4);
1497    ///
1498    ///     // Sending and receiving a value increases the capacity by one.
1499    ///     permit.send(());
1500    ///     rx.recv().await.unwrap();
1501    ///     assert_eq!(tx.capacity(), 5);
1502    /// }
1503    /// ```
1504    ///
1505    /// [`send`]: Sender::send
1506    /// [`reserve`]: Sender::reserve
1507    /// [`channel`]: channel
1508    /// [`max_capacity`]: Sender::max_capacity
1509    pub fn capacity(&self) -> usize {
1510        self.chan.semaphore().semaphore.available_permits()
1511    }
1512
1513    /// Converts the `Sender` to a [`WeakSender`] that does not count
1514    /// towards RAII semantics, i.e. if all `Sender` instances of the
1515    /// channel were dropped and only `WeakSender` instances remain,
1516    /// the channel is closed.
1517    #[must_use = "Downgrade creates a WeakSender without destroying the original non-weak sender."]
1518    pub fn downgrade(&self) -> WeakSender<T> {
1519        WeakSender {
1520            chan: self.chan.downgrade(),
1521        }
1522    }
1523
1524    /// Returns the maximum buffer capacity of the channel.
1525    ///
1526    /// The maximum capacity is the buffer capacity initially specified when calling
1527    /// [`channel`]. This is distinct from [`capacity`], which returns the *current*
1528    /// available buffer capacity: as messages are sent and received, the
1529    /// value returned by [`capacity`] will go up or down, whereas the value
1530    /// returned by [`max_capacity`] will remain constant.
1531    ///
1532    /// # Examples
1533    ///
1534    /// ```
1535    /// use tokio::sync::mpsc;
1536    ///
1537    /// #[tokio::main]
1538    /// async fn main() {
1539    ///     let (tx, _rx) = mpsc::channel::<()>(5);
1540    ///
1541    ///     // both max capacity and capacity are the same at first
1542    ///     assert_eq!(tx.max_capacity(), 5);
1543    ///     assert_eq!(tx.capacity(), 5);
1544    ///
1545    ///     // Making a reservation doesn't change the max capacity.
1546    ///     let permit = tx.reserve().await.unwrap();
1547    ///     assert_eq!(tx.max_capacity(), 5);
1548    ///     // but drops the capacity by one
1549    ///     assert_eq!(tx.capacity(), 4);
1550    /// }
1551    /// ```
1552    ///
1553    /// [`channel`]: channel
1554    /// [`max_capacity`]: Sender::max_capacity
1555    /// [`capacity`]: Sender::capacity
1556    pub fn max_capacity(&self) -> usize {
1557        self.chan.semaphore().bound
1558    }
1559
1560    /// Returns the number of [`Sender`] handles.
1561    pub fn strong_count(&self) -> usize {
1562        self.chan.strong_count()
1563    }
1564
1565    /// Returns the number of [`WeakSender`] handles.
1566    pub fn weak_count(&self) -> usize {
1567        self.chan.weak_count()
1568    }
1569}
1570
1571impl<T> Clone for Sender<T> {
1572    fn clone(&self) -> Self {
1573        Sender {
1574            chan: self.chan.clone(),
1575        }
1576    }
1577}
1578
1579impl<T> fmt::Debug for Sender<T> {
1580    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1581        fmt.debug_struct("Sender")
1582            .field("chan", &self.chan)
1583            .finish()
1584    }
1585}
1586
1587impl<T> Clone for WeakSender<T> {
1588    fn clone(&self) -> Self {
1589        self.chan.increment_weak_count();
1590
1591        WeakSender {
1592            chan: self.chan.clone(),
1593        }
1594    }
1595}
1596
1597impl<T> Drop for WeakSender<T> {
1598    fn drop(&mut self) {
1599        self.chan.decrement_weak_count();
1600    }
1601}
1602
1603impl<T> WeakSender<T> {
1604    /// Tries to convert a `WeakSender` into a [`Sender`]. This will return `Some`
1605    /// if there are other `Sender` instances alive and the channel wasn't
1606    /// previously dropped, otherwise `None` is returned.
1607    pub fn upgrade(&self) -> Option<Sender<T>> {
1608        chan::Tx::upgrade(self.chan.clone()).map(Sender::new)
1609    }
1610
1611    /// Returns the number of [`Sender`] handles.
1612    pub fn strong_count(&self) -> usize {
1613        self.chan.strong_count()
1614    }
1615
1616    /// Returns the number of [`WeakSender`] handles.
1617    pub fn weak_count(&self) -> usize {
1618        self.chan.weak_count()
1619    }
1620}
1621
1622impl<T> fmt::Debug for WeakSender<T> {
1623    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1624        fmt.debug_struct("WeakSender").finish()
1625    }
1626}
1627
1628// ===== impl Permit =====
1629
1630impl<T> Permit<'_, T> {
1631    /// Sends a value using the reserved capacity.
1632    ///
1633    /// Capacity for the message has already been reserved. The message is sent
1634    /// to the receiver and the permit is consumed. The operation will succeed
1635    /// even if the receiver half has been closed. See [`Receiver::close`] for
1636    /// more details on performing a clean shutdown.
1637    ///
1638    /// [`Receiver::close`]: Receiver::close
1639    ///
1640    /// # Examples
1641    ///
1642    /// ```
1643    /// use tokio::sync::mpsc;
1644    ///
1645    /// #[tokio::main]
1646    /// async fn main() {
1647    ///     let (tx, mut rx) = mpsc::channel(1);
1648    ///
1649    ///     // Reserve capacity
1650    ///     let permit = tx.reserve().await.unwrap();
1651    ///
1652    ///     // Trying to send directly on the `tx` will fail due to no
1653    ///     // available capacity.
1654    ///     assert!(tx.try_send(123).is_err());
1655    ///
1656    ///     // Send a message on the permit
1657    ///     permit.send(456);
1658    ///
1659    ///     // The value sent on the permit is received
1660    ///     assert_eq!(rx.recv().await.unwrap(), 456);
1661    /// }
1662    /// ```
1663    pub fn send(self, value: T) {
1664        use std::mem;
1665
1666        self.chan.send(value);
1667
1668        // Avoid the drop logic
1669        mem::forget(self);
1670    }
1671}
1672
1673impl<T> Drop for Permit<'_, T> {
1674    fn drop(&mut self) {
1675        use chan::Semaphore;
1676
1677        let semaphore = self.chan.semaphore();
1678
1679        // Add the permit back to the semaphore
1680        semaphore.add_permit();
1681
1682        // If this is the last sender for this channel, wake the receiver so
1683        // that it can be notified that the channel is closed.
1684        if semaphore.is_closed() && semaphore.is_idle() {
1685            self.chan.wake_rx();
1686        }
1687    }
1688}
1689
1690impl<T> fmt::Debug for Permit<'_, T> {
1691    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1692        fmt.debug_struct("Permit")
1693            .field("chan", &self.chan)
1694            .finish()
1695    }
1696}
1697
1698// ===== impl PermitIterator =====
1699
1700impl<'a, T> Iterator for PermitIterator<'a, T> {
1701    type Item = Permit<'a, T>;
1702
1703    fn next(&mut self) -> Option<Self::Item> {
1704        if self.n == 0 {
1705            return None;
1706        }
1707
1708        self.n -= 1;
1709        Some(Permit { chan: self.chan })
1710    }
1711
1712    fn size_hint(&self) -> (usize, Option<usize>) {
1713        let n = self.n;
1714        (n, Some(n))
1715    }
1716}
1717impl<T> ExactSizeIterator for PermitIterator<'_, T> {}
1718impl<T> std::iter::FusedIterator for PermitIterator<'_, T> {}
1719
1720impl<T> Drop for PermitIterator<'_, T> {
1721    fn drop(&mut self) {
1722        use chan::Semaphore;
1723
1724        if self.n == 0 {
1725            return;
1726        }
1727
1728        let semaphore = self.chan.semaphore();
1729
1730        // Add the remaining permits back to the semaphore
1731        semaphore.add_permits(self.n);
1732
1733        // If this is the last sender for this channel, wake the receiver so
1734        // that it can be notified that the channel is closed.
1735        if semaphore.is_closed() && semaphore.is_idle() {
1736            self.chan.wake_rx();
1737        }
1738    }
1739}
1740
1741impl<T> fmt::Debug for PermitIterator<'_, T> {
1742    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1743        fmt.debug_struct("PermitIterator")
1744            .field("chan", &self.chan)
1745            .field("capacity", &self.n)
1746            .finish()
1747    }
1748}
1749
1750// ===== impl Permit =====
1751
1752impl<T> OwnedPermit<T> {
1753    /// Sends a value using the reserved capacity.
1754    ///
1755    /// Capacity for the message has already been reserved. The message is sent
1756    /// to the receiver and the permit is consumed. The operation will succeed
1757    /// even if the receiver half has been closed. See [`Receiver::close`] for
1758    /// more details on performing a clean shutdown.
1759    ///
1760    /// Unlike [`Permit::send`], this method returns the [`Sender`] from which
1761    /// the `OwnedPermit` was reserved.
1762    ///
1763    /// [`Receiver::close`]: Receiver::close
1764    ///
1765    /// # Examples
1766    ///
1767    /// ```
1768    /// use tokio::sync::mpsc;
1769    ///
1770    /// #[tokio::main]
1771    /// async fn main() {
1772    ///     let (tx, mut rx) = mpsc::channel(1);
1773    ///
1774    ///     // Reserve capacity
1775    ///     let permit = tx.reserve_owned().await.unwrap();
1776    ///
1777    ///     // Send a message on the permit, returning the sender.
1778    ///     let tx = permit.send(456);
1779    ///
1780    ///     // The value sent on the permit is received
1781    ///     assert_eq!(rx.recv().await.unwrap(), 456);
1782    ///
1783    ///     // We may now reuse `tx` to send another message.
1784    ///     tx.send(789).await.unwrap();
1785    /// }
1786    /// ```
1787    pub fn send(mut self, value: T) -> Sender<T> {
1788        let chan = self.chan.take().unwrap_or_else(|| {
1789            unreachable!("OwnedPermit channel is only taken when the permit is moved")
1790        });
1791        chan.send(value);
1792
1793        Sender { chan }
1794    }
1795
1796    /// Releases the reserved capacity *without* sending a message, returning the
1797    /// [`Sender`].
1798    ///
1799    /// # Examples
1800    ///
1801    /// ```
1802    /// use tokio::sync::mpsc;
1803    ///
1804    /// #[tokio::main]
1805    /// async fn main() {
1806    ///     let (tx, rx) = mpsc::channel(1);
1807    ///
1808    ///     // Clone the sender and reserve capacity
1809    ///     let permit = tx.clone().reserve_owned().await.unwrap();
1810    ///
1811    ///     // Trying to send on the original `tx` will fail, since the `permit`
1812    ///     // has reserved all the available capacity.
1813    ///     assert!(tx.try_send(123).is_err());
1814    ///
1815    ///     // Release the permit without sending a message, returning the clone
1816    ///     // of the sender.
1817    ///     let tx2 = permit.release();
1818    ///
1819    ///     // We may now reuse `tx` to send another message.
1820    ///     tx.send(789).await.unwrap();
1821    ///     # drop(rx); drop(tx2);
1822    /// }
1823    /// ```
1824    ///
1825    /// [`Sender`]: Sender
1826    pub fn release(mut self) -> Sender<T> {
1827        use chan::Semaphore;
1828
1829        let chan = self.chan.take().unwrap_or_else(|| {
1830            unreachable!("OwnedPermit channel is only taken when the permit is moved")
1831        });
1832
1833        // Add the permit back to the semaphore
1834        chan.semaphore().add_permit();
1835        Sender { chan }
1836    }
1837}
1838
1839impl<T> Drop for OwnedPermit<T> {
1840    fn drop(&mut self) {
1841        use chan::Semaphore;
1842
1843        // Are we still holding onto the sender?
1844        if let Some(chan) = self.chan.take() {
1845            let semaphore = chan.semaphore();
1846
1847            // Add the permit back to the semaphore
1848            semaphore.add_permit();
1849
1850            // If this `OwnedPermit` is holding the last sender for this
1851            // channel, wake the receiver so that it can be notified that the
1852            // channel is closed.
1853            if semaphore.is_closed() && semaphore.is_idle() {
1854                chan.wake_rx();
1855            }
1856        }
1857
1858        // Otherwise, do nothing.
1859    }
1860}
1861
1862impl<T> fmt::Debug for OwnedPermit<T> {
1863    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1864        fmt.debug_struct("OwnedPermit")
1865            .field("chan", &self.chan)
1866            .finish()
1867    }
1868}