tokio/sync/mpsc/bounded.rs
1use crate::loom::sync::Arc;
2use crate::sync::batch_semaphore::{self as semaphore, TryAcquireError};
3use crate::sync::mpsc::chan;
4use crate::sync::mpsc::error::{SendError, TryRecvError, TrySendError};
5
6cfg_time! {
7 use crate::sync::mpsc::error::SendTimeoutError;
8 use crate::time::Duration;
9}
10
11use std::fmt;
12use std::task::{Context, Poll};
13
14/// Sends values to the associated `Receiver`.
15///
16/// Instances are created by the [`channel`] function.
17///
18/// To convert the `Sender` into a `Sink` or use it in a poll function, you can
19/// use the [`PollSender`] utility.
20///
21/// [`PollSender`]: https://docs.rs/tokio-util/latest/tokio_util/sync/struct.PollSender.html
22pub struct Sender<T> {
23 chan: chan::Tx<T, Semaphore>,
24}
25
26/// A sender that does not prevent the channel from being closed.
27///
28/// If all [`Sender`] instances of a channel were dropped and only `WeakSender`
29/// instances remain, the channel is closed.
30///
31/// In order to send messages, the `WeakSender` needs to be upgraded using
32/// [`WeakSender::upgrade`], which returns `Option<Sender>`. It returns `None`
33/// if all `Sender`s have been dropped, and otherwise it returns a `Sender`.
34///
35/// [`Sender`]: Sender
36/// [`WeakSender::upgrade`]: WeakSender::upgrade
37///
38/// # Examples
39///
40/// ```
41/// use tokio::sync::mpsc::channel;
42///
43/// #[tokio::main]
44/// async fn main() {
45/// let (tx, _rx) = channel::<i32>(15);
46/// let tx_weak = tx.downgrade();
47///
48/// // Upgrading will succeed because `tx` still exists.
49/// assert!(tx_weak.upgrade().is_some());
50///
51/// // If we drop `tx`, then it will fail.
52/// drop(tx);
53/// assert!(tx_weak.clone().upgrade().is_none());
54/// }
55/// ```
56pub struct WeakSender<T> {
57 chan: Arc<chan::Chan<T, Semaphore>>,
58}
59
60/// Permits to send one value into the channel.
61///
62/// `Permit` values are returned by [`Sender::reserve()`] and [`Sender::try_reserve()`]
63/// and are used to guarantee channel capacity before generating a message to send.
64///
65/// [`Sender::reserve()`]: Sender::reserve
66/// [`Sender::try_reserve()`]: Sender::try_reserve
67pub struct Permit<'a, T> {
68 chan: &'a chan::Tx<T, Semaphore>,
69}
70
71/// An [`Iterator`] of [`Permit`] that can be used to hold `n` slots in the channel.
72///
73/// `PermitIterator` values are returned by [`Sender::reserve_many()`] and [`Sender::try_reserve_many()`]
74/// and are used to guarantee channel capacity before generating `n` messages to send.
75///
76/// [`Sender::reserve_many()`]: Sender::reserve_many
77/// [`Sender::try_reserve_many()`]: Sender::try_reserve_many
78pub struct PermitIterator<'a, T> {
79 chan: &'a chan::Tx<T, Semaphore>,
80 n: usize,
81}
82
83/// Owned permit to send one value into the channel.
84///
85/// This is identical to the [`Permit`] type, except that it moves the sender
86/// rather than borrowing it.
87///
88/// `OwnedPermit` values are returned by [`Sender::reserve_owned()`] and
89/// [`Sender::try_reserve_owned()`] and are used to guarantee channel capacity
90/// before generating a message to send.
91///
92/// [`Permit`]: Permit
93/// [`Sender::reserve_owned()`]: Sender::reserve_owned
94/// [`Sender::try_reserve_owned()`]: Sender::try_reserve_owned
95pub struct OwnedPermit<T> {
96 chan: Option<chan::Tx<T, Semaphore>>,
97}
98
99/// Receives values from the associated `Sender`.
100///
101/// Instances are created by the [`channel`] function.
102///
103/// This receiver can be turned into a `Stream` using [`ReceiverStream`].
104///
105/// [`ReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.ReceiverStream.html
106pub struct Receiver<T> {
107 /// The channel receiver.
108 chan: chan::Rx<T, Semaphore>,
109}
110
111/// Creates a bounded mpsc channel for communicating between asynchronous tasks
112/// with backpressure.
113///
114/// The channel will buffer up to the provided number of messages. Once the
115/// buffer is full, attempts to send new messages will wait until a message is
116/// received from the channel. The provided buffer capacity must be at least 1.
117///
118/// All data sent on `Sender` will become available on `Receiver` in the same
119/// order as it was sent.
120///
121/// The `Sender` can be cloned to `send` to the same channel from multiple code
122/// locations. Only one `Receiver` is supported.
123///
124/// If the `Receiver` is disconnected while trying to `send`, the `send` method
125/// will return a `SendError`. Similarly, if `Sender` is disconnected while
126/// trying to `recv`, the `recv` method will return `None`.
127///
128/// # Panics
129///
130/// Panics if the buffer capacity is 0.
131///
132/// # Examples
133///
134/// ```rust
135/// use tokio::sync::mpsc;
136///
137/// #[tokio::main]
138/// async fn main() {
139/// let (tx, mut rx) = mpsc::channel(100);
140///
141/// tokio::spawn(async move {
142/// for i in 0..10 {
143/// if let Err(_) = tx.send(i).await {
144/// println!("receiver dropped");
145/// return;
146/// }
147/// }
148/// });
149///
150/// while let Some(i) = rx.recv().await {
151/// println!("got = {}", i);
152/// }
153/// }
154/// ```
155#[track_caller]
156pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
157 assert!(buffer > 0, "mpsc bounded channel requires buffer > 0");
158 let semaphore = Semaphore {
159 semaphore: semaphore::Semaphore::new(buffer),
160 bound: buffer,
161 };
162 let (tx, rx) = chan::channel(semaphore);
163
164 let tx = Sender::new(tx);
165 let rx = Receiver::new(rx);
166
167 (tx, rx)
168}
169
170/// Channel semaphore is a tuple of the semaphore implementation and a `usize`
171/// representing the channel bound.
172#[derive(Debug)]
173pub(crate) struct Semaphore {
174 pub(crate) semaphore: semaphore::Semaphore,
175 pub(crate) bound: usize,
176}
177
178impl<T> Receiver<T> {
179 pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> Receiver<T> {
180 Receiver { chan }
181 }
182
183 /// Receives the next value for this receiver.
184 ///
185 /// This method returns `None` if the channel has been closed and there are
186 /// no remaining messages in the channel's buffer. This indicates that no
187 /// further values can ever be received from this `Receiver`. The channel is
188 /// closed when all senders have been dropped, or when [`close`] is called.
189 ///
190 /// If there are no messages in the channel's buffer, but the channel has
191 /// not yet been closed, this method will sleep until a message is sent or
192 /// the channel is closed. Note that if [`close`] is called, but there are
193 /// still outstanding [`Permits`] from before it was closed, the channel is
194 /// not considered closed by `recv` until the permits are released.
195 ///
196 /// # Cancel safety
197 ///
198 /// This method is cancel safe. If `recv` is used as the event in a
199 /// [`tokio::select!`](crate::select) statement and some other branch
200 /// completes first, it is guaranteed that no messages were received on this
201 /// channel.
202 ///
203 /// [`close`]: Self::close
204 /// [`Permits`]: struct@crate::sync::mpsc::Permit
205 ///
206 /// # Examples
207 ///
208 /// ```
209 /// use tokio::sync::mpsc;
210 ///
211 /// #[tokio::main]
212 /// async fn main() {
213 /// let (tx, mut rx) = mpsc::channel(100);
214 ///
215 /// tokio::spawn(async move {
216 /// tx.send("hello").await.unwrap();
217 /// });
218 ///
219 /// assert_eq!(Some("hello"), rx.recv().await);
220 /// assert_eq!(None, rx.recv().await);
221 /// }
222 /// ```
223 ///
224 /// Values are buffered:
225 ///
226 /// ```
227 /// use tokio::sync::mpsc;
228 ///
229 /// #[tokio::main]
230 /// async fn main() {
231 /// let (tx, mut rx) = mpsc::channel(100);
232 ///
233 /// tx.send("hello").await.unwrap();
234 /// tx.send("world").await.unwrap();
235 ///
236 /// assert_eq!(Some("hello"), rx.recv().await);
237 /// assert_eq!(Some("world"), rx.recv().await);
238 /// }
239 /// ```
240 pub async fn recv(&mut self) -> Option<T> {
241 use crate::future::poll_fn;
242 poll_fn(|cx| self.chan.recv(cx)).await
243 }
244
245 /// Receives the next values for this receiver and extends `buffer`.
246 ///
247 /// This method extends `buffer` by no more than a fixed number of values
248 /// as specified by `limit`. If `limit` is zero, the function immediately
249 /// returns `0`. The return value is the number of values added to `buffer`.
250 ///
251 /// For `limit > 0`, if there are no messages in the channel's queue, but
252 /// the channel has not yet been closed, this method will sleep until a
253 /// message is sent or the channel is closed. Note that if [`close`] is
254 /// called, but there are still outstanding [`Permits`] from before it was
255 /// closed, the channel is not considered closed by `recv_many` until the
256 /// permits are released.
257 ///
258 /// For non-zero values of `limit`, this method will never return `0` unless
259 /// the channel has been closed and there are no remaining messages in the
260 /// channel's queue. This indicates that no further values can ever be
261 /// received from this `Receiver`. The channel is closed when all senders
262 /// have been dropped, or when [`close`] is called.
263 ///
264 /// The capacity of `buffer` is increased as needed.
265 ///
266 /// # Cancel safety
267 ///
268 /// This method is cancel safe. If `recv_many` is used as the event in a
269 /// [`tokio::select!`](crate::select) statement and some other branch
270 /// completes first, it is guaranteed that no messages were received on this
271 /// channel.
272 ///
273 /// [`close`]: Self::close
274 /// [`Permits`]: struct@crate::sync::mpsc::Permit
275 ///
276 /// # Examples
277 ///
278 /// ```
279 /// use tokio::sync::mpsc;
280 ///
281 /// #[tokio::main]
282 /// async fn main() {
283 /// let mut buffer: Vec<&str> = Vec::with_capacity(2);
284 /// let limit = 2;
285 /// let (tx, mut rx) = mpsc::channel(100);
286 /// let tx2 = tx.clone();
287 /// tx2.send("first").await.unwrap();
288 /// tx2.send("second").await.unwrap();
289 /// tx2.send("third").await.unwrap();
290 ///
291 /// // Call `recv_many` to receive up to `limit` (2) values.
292 /// assert_eq!(2, rx.recv_many(&mut buffer, limit).await);
293 /// assert_eq!(vec!["first", "second"], buffer);
294 ///
295 /// // If the buffer is full, the next call to `recv_many`
296 /// // reserves additional capacity.
297 /// assert_eq!(1, rx.recv_many(&mut buffer, 1).await);
298 ///
299 /// tokio::spawn(async move {
300 /// tx.send("fourth").await.unwrap();
301 /// });
302 ///
303 /// // 'tx' is dropped, but `recv_many`
304 /// // is guaranteed not to return 0 as the channel
305 /// // is not yet closed.
306 /// assert_eq!(1, rx.recv_many(&mut buffer, 1).await);
307 /// assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
308 ///
309 /// // Once the last sender is dropped, the channel is
310 /// // closed and `recv_many` returns 0, capacity unchanged.
311 /// drop(tx2);
312 /// assert_eq!(0, rx.recv_many(&mut buffer, limit).await);
313 /// assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
314 /// }
315 /// ```
316 pub async fn recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize {
317 use crate::future::poll_fn;
318 poll_fn(|cx| self.chan.recv_many(cx, buffer, limit)).await
319 }
320
321 /// Tries to receive the next value for this receiver.
322 ///
323 /// This method returns the [`Empty`] error if the channel is currently
324 /// empty, but there are still outstanding [senders] or [permits].
325 ///
326 /// This method returns the [`Disconnected`] error if the channel is
327 /// currently empty, and there are no outstanding [senders] or [permits].
328 ///
329 /// Unlike the [`poll_recv`] method, this method will never return an
330 /// [`Empty`] error spuriously.
331 ///
332 /// [`Empty`]: crate::sync::mpsc::error::TryRecvError::Empty
333 /// [`Disconnected`]: crate::sync::mpsc::error::TryRecvError::Disconnected
334 /// [`poll_recv`]: Self::poll_recv
335 /// [senders]: crate::sync::mpsc::Sender
336 /// [permits]: crate::sync::mpsc::Permit
337 ///
338 /// # Examples
339 ///
340 /// ```
341 /// use tokio::sync::mpsc;
342 /// use tokio::sync::mpsc::error::TryRecvError;
343 ///
344 /// #[tokio::main]
345 /// async fn main() {
346 /// let (tx, mut rx) = mpsc::channel(100);
347 ///
348 /// tx.send("hello").await.unwrap();
349 ///
350 /// assert_eq!(Ok("hello"), rx.try_recv());
351 /// assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
352 ///
353 /// tx.send("hello").await.unwrap();
354 /// // Drop the last sender, closing the channel.
355 /// drop(tx);
356 ///
357 /// assert_eq!(Ok("hello"), rx.try_recv());
358 /// assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
359 /// }
360 /// ```
361 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
362 self.chan.try_recv()
363 }
364
365 /// Blocking receive to call outside of asynchronous contexts.
366 ///
367 /// This method returns `None` if the channel has been closed and there are
368 /// no remaining messages in the channel's buffer. This indicates that no
369 /// further values can ever be received from this `Receiver`. The channel is
370 /// closed when all senders have been dropped, or when [`close`] is called.
371 ///
372 /// If there are no messages in the channel's buffer, but the channel has
373 /// not yet been closed, this method will block until a message is sent or
374 /// the channel is closed.
375 ///
376 /// This method is intended for use cases where you are sending from
377 /// asynchronous code to synchronous code, and will work even if the sender
378 /// is not using [`blocking_send`] to send the message.
379 ///
380 /// Note that if [`close`] is called, but there are still outstanding
381 /// [`Permits`] from before it was closed, the channel is not considered
382 /// closed by `blocking_recv` until the permits are released.
383 ///
384 /// [`close`]: Self::close
385 /// [`Permits`]: struct@crate::sync::mpsc::Permit
386 /// [`blocking_send`]: fn@crate::sync::mpsc::Sender::blocking_send
387 ///
388 /// # Panics
389 ///
390 /// This function panics if called within an asynchronous execution
391 /// context.
392 ///
393 /// # Examples
394 ///
395 /// ```
396 /// use std::thread;
397 /// use tokio::runtime::Runtime;
398 /// use tokio::sync::mpsc;
399 ///
400 /// fn main() {
401 /// let (tx, mut rx) = mpsc::channel::<u8>(10);
402 ///
403 /// let sync_code = thread::spawn(move || {
404 /// assert_eq!(Some(10), rx.blocking_recv());
405 /// });
406 ///
407 /// Runtime::new()
408 /// .unwrap()
409 /// .block_on(async move {
410 /// let _ = tx.send(10).await;
411 /// });
412 /// sync_code.join().unwrap()
413 /// }
414 /// ```
415 #[track_caller]
416 #[cfg(feature = "sync")]
417 #[cfg_attr(docsrs, doc(alias = "recv_blocking"))]
418 pub fn blocking_recv(&mut self) -> Option<T> {
419 crate::future::block_on(self.recv())
420 }
421
422 /// Closes the receiving half of a channel without dropping it.
423 ///
424 /// This prevents any further messages from being sent on the channel while
425 /// still enabling the receiver to drain messages that are buffered. Any
426 /// outstanding [`Permit`] values will still be able to send messages.
427 ///
428 /// To guarantee that no messages are dropped, after calling `close()`,
429 /// `recv()` must be called until `None` is returned. If there are
430 /// outstanding [`Permit`] or [`OwnedPermit`] values, the `recv` method will
431 /// not return `None` until those are released.
432 ///
433 /// [`Permit`]: Permit
434 /// [`OwnedPermit`]: OwnedPermit
435 ///
436 /// # Examples
437 ///
438 /// ```
439 /// use tokio::sync::mpsc;
440 ///
441 /// #[tokio::main]
442 /// async fn main() {
443 /// let (tx, mut rx) = mpsc::channel(20);
444 ///
445 /// tokio::spawn(async move {
446 /// let mut i = 0;
447 /// while let Ok(permit) = tx.reserve().await {
448 /// permit.send(i);
449 /// i += 1;
450 /// }
451 /// });
452 ///
453 /// rx.close();
454 ///
455 /// while let Some(msg) = rx.recv().await {
456 /// println!("got {}", msg);
457 /// }
458 ///
459 /// // Channel closed and no messages are lost.
460 /// }
461 /// ```
462 pub fn close(&mut self) {
463 self.chan.close();
464 }
465
466 /// Checks if a channel is closed.
467 ///
468 /// This method returns `true` if the channel has been closed. The channel is closed
469 /// when all [`Sender`] have been dropped, or when [`Receiver::close`] is called.
470 ///
471 /// [`Sender`]: crate::sync::mpsc::Sender
472 /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
473 ///
474 /// # Examples
475 /// ```
476 /// use tokio::sync::mpsc;
477 ///
478 /// #[tokio::main]
479 /// async fn main() {
480 /// let (_tx, mut rx) = mpsc::channel::<()>(10);
481 /// assert!(!rx.is_closed());
482 ///
483 /// rx.close();
484 ///
485 /// assert!(rx.is_closed());
486 /// }
487 /// ```
488 pub fn is_closed(&self) -> bool {
489 self.chan.is_closed()
490 }
491
492 /// Checks if a channel is empty.
493 ///
494 /// This method returns `true` if the channel has no messages.
495 ///
496 /// # Examples
497 /// ```
498 /// use tokio::sync::mpsc;
499 ///
500 /// #[tokio::main]
501 /// async fn main() {
502 /// let (tx, rx) = mpsc::channel(10);
503 /// assert!(rx.is_empty());
504 ///
505 /// tx.send(0).await.unwrap();
506 /// assert!(!rx.is_empty());
507 /// }
508 ///
509 /// ```
510 pub fn is_empty(&self) -> bool {
511 self.chan.is_empty()
512 }
513
514 /// Returns the number of messages in the channel.
515 ///
516 /// # Examples
517 /// ```
518 /// use tokio::sync::mpsc;
519 ///
520 /// #[tokio::main]
521 /// async fn main() {
522 /// let (tx, rx) = mpsc::channel(10);
523 /// assert_eq!(0, rx.len());
524 ///
525 /// tx.send(0).await.unwrap();
526 /// assert_eq!(1, rx.len());
527 /// }
528 /// ```
529 pub fn len(&self) -> usize {
530 self.chan.len()
531 }
532
533 /// Returns the current capacity of the channel.
534 ///
535 /// The capacity goes down when the sender sends a value by calling [`Sender::send`] or by reserving
536 /// capacity with [`Sender::reserve`]. The capacity goes up when values are received.
537 /// This is distinct from [`max_capacity`], which always returns buffer capacity initially
538 /// specified when calling [`channel`].
539 ///
540 /// # Examples
541 ///
542 /// ```
543 /// use tokio::sync::mpsc;
544 ///
545 /// #[tokio::main]
546 /// async fn main() {
547 /// let (tx, mut rx) = mpsc::channel::<()>(5);
548 ///
549 /// assert_eq!(rx.capacity(), 5);
550 ///
551 /// // Making a reservation drops the capacity by one.
552 /// let permit = tx.reserve().await.unwrap();
553 /// assert_eq!(rx.capacity(), 4);
554 /// assert_eq!(rx.len(), 0);
555 ///
556 /// // Sending and receiving a value increases the capacity by one.
557 /// permit.send(());
558 /// assert_eq!(rx.len(), 1);
559 /// rx.recv().await.unwrap();
560 /// assert_eq!(rx.capacity(), 5);
561 ///
562 /// // Directly sending a message drops the capacity by one.
563 /// tx.send(()).await.unwrap();
564 /// assert_eq!(rx.capacity(), 4);
565 /// assert_eq!(rx.len(), 1);
566 ///
567 /// // Receiving the message increases the capacity by one.
568 /// rx.recv().await.unwrap();
569 /// assert_eq!(rx.capacity(), 5);
570 /// assert_eq!(rx.len(), 0);
571 /// }
572 /// ```
573 /// [`capacity`]: Receiver::capacity
574 /// [`max_capacity`]: Receiver::max_capacity
575 pub fn capacity(&self) -> usize {
576 self.chan.semaphore().semaphore.available_permits()
577 }
578
579 /// Returns the maximum buffer capacity of the channel.
580 ///
581 /// The maximum capacity is the buffer capacity initially specified when calling
582 /// [`channel`]. This is distinct from [`capacity`], which returns the *current*
583 /// available buffer capacity: as messages are sent and received, the value
584 /// returned by [`capacity`] will go up or down, whereas the value
585 /// returned by [`max_capacity`] will remain constant.
586 ///
587 /// # Examples
588 ///
589 /// ```
590 /// use tokio::sync::mpsc;
591 ///
592 /// #[tokio::main]
593 /// async fn main() {
594 /// let (tx, rx) = mpsc::channel::<()>(5);
595 ///
596 /// // both max capacity and capacity are the same at first
597 /// assert_eq!(rx.max_capacity(), 5);
598 /// assert_eq!(rx.capacity(), 5);
599 ///
600 /// // Making a reservation doesn't change the max capacity.
601 /// let permit = tx.reserve().await.unwrap();
602 /// assert_eq!(rx.max_capacity(), 5);
603 /// // but drops the capacity by one
604 /// assert_eq!(rx.capacity(), 4);
605 /// }
606 /// ```
607 /// [`capacity`]: Receiver::capacity
608 /// [`max_capacity`]: Receiver::max_capacity
609 pub fn max_capacity(&self) -> usize {
610 self.chan.semaphore().bound
611 }
612
613 /// Polls to receive the next message on this channel.
614 ///
615 /// This method returns:
616 ///
617 /// * `Poll::Pending` if no messages are available but the channel is not
618 /// closed, or if a spurious failure happens.
619 /// * `Poll::Ready(Some(message))` if a message is available.
620 /// * `Poll::Ready(None)` if the channel has been closed and all messages
621 /// sent before it was closed have been received.
622 ///
623 /// When the method returns `Poll::Pending`, the `Waker` in the provided
624 /// `Context` is scheduled to receive a wakeup when a message is sent on any
625 /// receiver, or when the channel is closed. Note that on multiple calls to
626 /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
627 /// passed to the most recent call is scheduled to receive a wakeup.
628 ///
629 /// If this method returns `Poll::Pending` due to a spurious failure, then
630 /// the `Waker` will be notified when the situation causing the spurious
631 /// failure has been resolved. Note that receiving such a wakeup does not
632 /// guarantee that the next call will succeed — it could fail with another
633 /// spurious failure.
634 pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
635 self.chan.recv(cx)
636 }
637
638 /// Polls to receive multiple messages on this channel, extending the provided buffer.
639 ///
640 /// This method returns:
641 /// * `Poll::Pending` if no messages are available but the channel is not closed, or if a
642 /// spurious failure happens.
643 /// * `Poll::Ready(count)` where `count` is the number of messages successfully received and
644 /// stored in `buffer`. This can be less than, or equal to, `limit`.
645 /// * `Poll::Ready(0)` if `limit` is set to zero or when the channel is closed.
646 ///
647 /// When the method returns `Poll::Pending`, the `Waker` in the provided
648 /// `Context` is scheduled to receive a wakeup when a message is sent on any
649 /// receiver, or when the channel is closed. Note that on multiple calls to
650 /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
651 /// passed to the most recent call is scheduled to receive a wakeup.
652 ///
653 /// Note that this method does not guarantee that exactly `limit` messages
654 /// are received. Rather, if at least one message is available, it returns
655 /// as many messages as it can up to the given limit. This method returns
656 /// zero only if the channel is closed (or if `limit` is zero).
657 ///
658 /// # Examples
659 ///
660 /// ```
661 /// use std::task::{Context, Poll};
662 /// use std::pin::Pin;
663 /// use tokio::sync::mpsc;
664 /// use futures::Future;
665 ///
666 /// struct MyReceiverFuture<'a> {
667 /// receiver: mpsc::Receiver<i32>,
668 /// buffer: &'a mut Vec<i32>,
669 /// limit: usize,
670 /// }
671 ///
672 /// impl<'a> Future for MyReceiverFuture<'a> {
673 /// type Output = usize; // Number of messages received
674 ///
675 /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
676 /// let MyReceiverFuture { receiver, buffer, limit } = &mut *self;
677 ///
678 /// // Now `receiver` and `buffer` are mutable references, and `limit` is copied
679 /// match receiver.poll_recv_many(cx, *buffer, *limit) {
680 /// Poll::Pending => Poll::Pending,
681 /// Poll::Ready(count) => Poll::Ready(count),
682 /// }
683 /// }
684 /// }
685 ///
686 /// #[tokio::main]
687 /// async fn main() {
688 /// let (tx, rx) = mpsc::channel(32);
689 /// let mut buffer = Vec::new();
690 ///
691 /// let my_receiver_future = MyReceiverFuture {
692 /// receiver: rx,
693 /// buffer: &mut buffer,
694 /// limit: 3,
695 /// };
696 ///
697 /// for i in 0..10 {
698 /// tx.send(i).await.unwrap();
699 /// }
700 ///
701 /// let count = my_receiver_future.await;
702 /// assert_eq!(count, 3);
703 /// assert_eq!(buffer, vec![0,1,2])
704 /// }
705 /// ```
706 pub fn poll_recv_many(
707 &mut self,
708 cx: &mut Context<'_>,
709 buffer: &mut Vec<T>,
710 limit: usize,
711 ) -> Poll<usize> {
712 self.chan.recv_many(cx, buffer, limit)
713 }
714}
715
716impl<T> fmt::Debug for Receiver<T> {
717 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
718 fmt.debug_struct("Receiver")
719 .field("chan", &self.chan)
720 .finish()
721 }
722}
723
724impl<T> Unpin for Receiver<T> {}
725
726impl<T> Sender<T> {
727 pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> Sender<T> {
728 Sender { chan }
729 }
730
731 /// Sends a value, waiting until there is capacity.
732 ///
733 /// A successful send occurs when it is determined that the other end of the
734 /// channel has not hung up already. An unsuccessful send would be one where
735 /// the corresponding receiver has already been closed. Note that a return
736 /// value of `Err` means that the data will never be received, but a return
737 /// value of `Ok` does not mean that the data will be received. It is
738 /// possible for the corresponding receiver to hang up immediately after
739 /// this function returns `Ok`.
740 ///
741 /// # Errors
742 ///
743 /// If the receive half of the channel is closed, either due to [`close`]
744 /// being called or the [`Receiver`] handle dropping, the function returns
745 /// an error. The error includes the value passed to `send`.
746 ///
747 /// [`close`]: Receiver::close
748 /// [`Receiver`]: Receiver
749 ///
750 /// # Cancel safety
751 ///
752 /// If `send` is used as the event in a [`tokio::select!`](crate::select)
753 /// statement and some other branch completes first, then it is guaranteed
754 /// that the message was not sent. **However, in that case, the message
755 /// is dropped and will be lost.**
756 ///
757 /// To avoid losing messages, use [`reserve`](Self::reserve) to reserve
758 /// capacity, then use the returned [`Permit`] to send the message.
759 ///
760 /// This channel uses a queue to ensure that calls to `send` and `reserve`
761 /// complete in the order they were requested. Cancelling a call to
762 /// `send` makes you lose your place in the queue.
763 ///
764 /// # Examples
765 ///
766 /// In the following example, each call to `send` will block until the
767 /// previously sent value was received.
768 ///
769 /// ```rust
770 /// use tokio::sync::mpsc;
771 ///
772 /// #[tokio::main]
773 /// async fn main() {
774 /// let (tx, mut rx) = mpsc::channel(1);
775 ///
776 /// tokio::spawn(async move {
777 /// for i in 0..10 {
778 /// if let Err(_) = tx.send(i).await {
779 /// println!("receiver dropped");
780 /// return;
781 /// }
782 /// }
783 /// });
784 ///
785 /// while let Some(i) = rx.recv().await {
786 /// println!("got = {}", i);
787 /// }
788 /// }
789 /// ```
790 pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
791 match self.reserve().await {
792 Ok(permit) => {
793 permit.send(value);
794 Ok(())
795 }
796 Err(_) => Err(SendError(value)),
797 }
798 }
799
800 /// Completes when the receiver has dropped.
801 ///
802 /// This allows the producers to get notified when interest in the produced
803 /// values is canceled and immediately stop doing work.
804 ///
805 /// # Cancel safety
806 ///
807 /// This method is cancel safe. Once the channel is closed, it stays closed
808 /// forever and all future calls to `closed` will return immediately.
809 ///
810 /// # Examples
811 ///
812 /// ```
813 /// use tokio::sync::mpsc;
814 ///
815 /// #[tokio::main]
816 /// async fn main() {
817 /// let (tx1, rx) = mpsc::channel::<()>(1);
818 /// let tx2 = tx1.clone();
819 /// let tx3 = tx1.clone();
820 /// let tx4 = tx1.clone();
821 /// let tx5 = tx1.clone();
822 /// tokio::spawn(async move {
823 /// drop(rx);
824 /// });
825 ///
826 /// futures::join!(
827 /// tx1.closed(),
828 /// tx2.closed(),
829 /// tx3.closed(),
830 /// tx4.closed(),
831 /// tx5.closed()
832 /// );
833 /// println!("Receiver dropped");
834 /// }
835 /// ```
836 pub async fn closed(&self) {
837 self.chan.closed().await;
838 }
839
840 /// Attempts to immediately send a message on this `Sender`
841 ///
842 /// This method differs from [`send`] by returning immediately if the channel's
843 /// buffer is full or no receiver is waiting to acquire some data. Compared
844 /// with [`send`], this function has two failure cases instead of one (one for
845 /// disconnection, one for a full buffer).
846 ///
847 /// # Errors
848 ///
849 /// If the channel capacity has been reached, i.e., the channel has `n`
850 /// buffered values where `n` is the argument passed to [`channel`], then an
851 /// error is returned.
852 ///
853 /// If the receive half of the channel is closed, either due to [`close`]
854 /// being called or the [`Receiver`] handle dropping, the function returns
855 /// an error. The error includes the value passed to `send`.
856 ///
857 /// [`send`]: Sender::send
858 /// [`channel`]: channel
859 /// [`close`]: Receiver::close
860 ///
861 /// # Examples
862 ///
863 /// ```
864 /// use tokio::sync::mpsc;
865 ///
866 /// #[tokio::main]
867 /// async fn main() {
868 /// // Create a channel with buffer size 1
869 /// let (tx1, mut rx) = mpsc::channel(1);
870 /// let tx2 = tx1.clone();
871 ///
872 /// tokio::spawn(async move {
873 /// tx1.send(1).await.unwrap();
874 /// tx1.send(2).await.unwrap();
875 /// // task waits until the receiver receives a value.
876 /// });
877 ///
878 /// tokio::spawn(async move {
879 /// // This will return an error and send
880 /// // no message if the buffer is full
881 /// let _ = tx2.try_send(3);
882 /// });
883 ///
884 /// let mut msg;
885 /// msg = rx.recv().await.unwrap();
886 /// println!("message {} received", msg);
887 ///
888 /// msg = rx.recv().await.unwrap();
889 /// println!("message {} received", msg);
890 ///
891 /// // Third message may have never been sent
892 /// match rx.recv().await {
893 /// Some(msg) => println!("message {} received", msg),
894 /// None => println!("the third message was never sent"),
895 /// }
896 /// }
897 /// ```
898 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
899 match self.chan.semaphore().semaphore.try_acquire(1) {
900 Ok(()) => {}
901 Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(message)),
902 Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(message)),
903 }
904
905 // Send the message
906 self.chan.send(message);
907 Ok(())
908 }
909
910 /// Sends a value, waiting until there is capacity, but only for a limited time.
911 ///
912 /// Shares the same success and error conditions as [`send`], adding one more
913 /// condition for an unsuccessful send, which is when the provided timeout has
914 /// elapsed, and there is no capacity available.
915 ///
916 /// [`send`]: Sender::send
917 ///
918 /// # Errors
919 ///
920 /// If the receive half of the channel is closed, either due to [`close`]
921 /// being called or the [`Receiver`] having been dropped,
922 /// the function returns an error. The error includes the value passed to `send`.
923 ///
924 /// [`close`]: Receiver::close
925 /// [`Receiver`]: Receiver
926 ///
927 /// # Panics
928 ///
929 /// This function panics if it is called outside the context of a Tokio
930 /// runtime [with time enabled](crate::runtime::Builder::enable_time).
931 ///
932 /// # Examples
933 ///
934 /// In the following example, each call to `send_timeout` will block until the
935 /// previously sent value was received, unless the timeout has elapsed.
936 ///
937 /// ```rust
938 /// use tokio::sync::mpsc;
939 /// use tokio::time::{sleep, Duration};
940 ///
941 /// #[tokio::main]
942 /// async fn main() {
943 /// let (tx, mut rx) = mpsc::channel(1);
944 ///
945 /// tokio::spawn(async move {
946 /// for i in 0..10 {
947 /// if let Err(e) = tx.send_timeout(i, Duration::from_millis(100)).await {
948 /// println!("send error: #{:?}", e);
949 /// return;
950 /// }
951 /// }
952 /// });
953 ///
954 /// while let Some(i) = rx.recv().await {
955 /// println!("got = {}", i);
956 /// sleep(Duration::from_millis(200)).await;
957 /// }
958 /// }
959 /// ```
960 #[cfg(feature = "time")]
961 #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
962 pub async fn send_timeout(
963 &self,
964 value: T,
965 timeout: Duration,
966 ) -> Result<(), SendTimeoutError<T>> {
967 let permit = match crate::time::timeout(timeout, self.reserve()).await {
968 Err(_) => {
969 return Err(SendTimeoutError::Timeout(value));
970 }
971 Ok(Err(_)) => {
972 return Err(SendTimeoutError::Closed(value));
973 }
974 Ok(Ok(permit)) => permit,
975 };
976
977 permit.send(value);
978 Ok(())
979 }
980
981 /// Blocking send to call outside of asynchronous contexts.
982 ///
983 /// This method is intended for use cases where you are sending from
984 /// synchronous code to asynchronous code, and will work even if the
985 /// receiver is not using [`blocking_recv`] to receive the message.
986 ///
987 /// [`blocking_recv`]: fn@crate::sync::mpsc::Receiver::blocking_recv
988 ///
989 /// # Panics
990 ///
991 /// This function panics if called within an asynchronous execution
992 /// context.
993 ///
994 /// # Examples
995 ///
996 /// ```
997 /// use std::thread;
998 /// use tokio::runtime::Runtime;
999 /// use tokio::sync::mpsc;
1000 ///
1001 /// fn main() {
1002 /// let (tx, mut rx) = mpsc::channel::<u8>(1);
1003 ///
1004 /// let sync_code = thread::spawn(move || {
1005 /// tx.blocking_send(10).unwrap();
1006 /// });
1007 ///
1008 /// Runtime::new().unwrap().block_on(async move {
1009 /// assert_eq!(Some(10), rx.recv().await);
1010 /// });
1011 /// sync_code.join().unwrap()
1012 /// }
1013 /// ```
1014 #[track_caller]
1015 #[cfg(feature = "sync")]
1016 #[cfg_attr(docsrs, doc(alias = "send_blocking"))]
1017 pub fn blocking_send(&self, value: T) -> Result<(), SendError<T>> {
1018 crate::future::block_on(self.send(value))
1019 }
1020
1021 /// Checks if the channel has been closed. This happens when the
1022 /// [`Receiver`] is dropped, or when the [`Receiver::close`] method is
1023 /// called.
1024 ///
1025 /// [`Receiver`]: crate::sync::mpsc::Receiver
1026 /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
1027 ///
1028 /// ```
1029 /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(42);
1030 /// assert!(!tx.is_closed());
1031 ///
1032 /// let tx2 = tx.clone();
1033 /// assert!(!tx2.is_closed());
1034 ///
1035 /// drop(rx);
1036 /// assert!(tx.is_closed());
1037 /// assert!(tx2.is_closed());
1038 /// ```
1039 pub fn is_closed(&self) -> bool {
1040 self.chan.is_closed()
1041 }
1042
1043 /// Waits for channel capacity. Once capacity to send one message is
1044 /// available, it is reserved for the caller.
1045 ///
1046 /// If the channel is full, the function waits for the number of unreceived
1047 /// messages to become less than the channel capacity. Capacity to send one
1048 /// message is reserved for the caller. A [`Permit`] is returned to track
1049 /// the reserved capacity. The [`send`] function on [`Permit`] consumes the
1050 /// reserved capacity.
1051 ///
1052 /// Dropping [`Permit`] without sending a message releases the capacity back
1053 /// to the channel.
1054 ///
1055 /// [`Permit`]: Permit
1056 /// [`send`]: Permit::send
1057 ///
1058 /// # Cancel safety
1059 ///
1060 /// This channel uses a queue to ensure that calls to `send` and `reserve`
1061 /// complete in the order they were requested. Cancelling a call to
1062 /// `reserve` makes you lose your place in the queue.
1063 ///
1064 /// # Examples
1065 ///
1066 /// ```
1067 /// use tokio::sync::mpsc;
1068 ///
1069 /// #[tokio::main]
1070 /// async fn main() {
1071 /// let (tx, mut rx) = mpsc::channel(1);
1072 ///
1073 /// // Reserve capacity
1074 /// let permit = tx.reserve().await.unwrap();
1075 ///
1076 /// // Trying to send directly on the `tx` will fail due to no
1077 /// // available capacity.
1078 /// assert!(tx.try_send(123).is_err());
1079 ///
1080 /// // Sending on the permit succeeds
1081 /// permit.send(456);
1082 ///
1083 /// // The value sent on the permit is received
1084 /// assert_eq!(rx.recv().await.unwrap(), 456);
1085 /// }
1086 /// ```
1087 pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> {
1088 self.reserve_inner(1).await?;
1089 Ok(Permit { chan: &self.chan })
1090 }
1091
1092 /// Waits for channel capacity. Once capacity to send `n` messages is
1093 /// available, it is reserved for the caller.
1094 ///
1095 /// If the channel is full or if there are fewer than `n` permits available, the function waits
1096 /// for the number of unreceived messages to become `n` less than the channel capacity.
1097 /// Capacity to send `n` message is then reserved for the caller.
1098 ///
1099 /// A [`PermitIterator`] is returned to track the reserved capacity.
1100 /// You can call this [`Iterator`] until it is exhausted to
1101 /// get a [`Permit`] and then call [`Permit::send`]. This function is similar to
1102 /// [`try_reserve_many`] except it awaits for the slots to become available.
1103 ///
1104 /// If the channel is closed, the function returns a [`SendError`].
1105 ///
1106 /// Dropping [`PermitIterator`] without consuming it entirely releases the remaining
1107 /// permits back to the channel.
1108 ///
1109 /// [`PermitIterator`]: PermitIterator
1110 /// [`Permit`]: Permit
1111 /// [`send`]: Permit::send
1112 /// [`try_reserve_many`]: Sender::try_reserve_many
1113 ///
1114 /// # Cancel safety
1115 ///
1116 /// This channel uses a queue to ensure that calls to `send` and `reserve_many`
1117 /// complete in the order they were requested. Cancelling a call to
1118 /// `reserve_many` makes you lose your place in the queue.
1119 ///
1120 /// # Examples
1121 ///
1122 /// ```
1123 /// use tokio::sync::mpsc;
1124 ///
1125 /// #[tokio::main]
1126 /// async fn main() {
1127 /// let (tx, mut rx) = mpsc::channel(2);
1128 ///
1129 /// // Reserve capacity
1130 /// let mut permit = tx.reserve_many(2).await.unwrap();
1131 ///
1132 /// // Trying to send directly on the `tx` will fail due to no
1133 /// // available capacity.
1134 /// assert!(tx.try_send(123).is_err());
1135 ///
1136 /// // Sending with the permit iterator succeeds
1137 /// permit.next().unwrap().send(456);
1138 /// permit.next().unwrap().send(457);
1139 ///
1140 /// // The iterator should now be exhausted
1141 /// assert!(permit.next().is_none());
1142 ///
1143 /// // The value sent on the permit is received
1144 /// assert_eq!(rx.recv().await.unwrap(), 456);
1145 /// assert_eq!(rx.recv().await.unwrap(), 457);
1146 /// }
1147 /// ```
1148 pub async fn reserve_many(&self, n: usize) -> Result<PermitIterator<'_, T>, SendError<()>> {
1149 self.reserve_inner(n).await?;
1150 Ok(PermitIterator {
1151 chan: &self.chan,
1152 n,
1153 })
1154 }
1155
1156 /// Waits for channel capacity, moving the `Sender` and returning an owned
1157 /// permit. Once capacity to send one message is available, it is reserved
1158 /// for the caller.
1159 ///
1160 /// This moves the sender _by value_, and returns an owned permit that can
1161 /// be used to send a message into the channel. Unlike [`Sender::reserve`],
1162 /// this method may be used in cases where the permit must be valid for the
1163 /// `'static` lifetime. `Sender`s may be cloned cheaply (`Sender::clone` is
1164 /// essentially a reference count increment, comparable to [`Arc::clone`]),
1165 /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be
1166 /// moved, it can be cloned prior to calling `reserve_owned`.
1167 ///
1168 /// If the channel is full, the function waits for the number of unreceived
1169 /// messages to become less than the channel capacity. Capacity to send one
1170 /// message is reserved for the caller. An [`OwnedPermit`] is returned to
1171 /// track the reserved capacity. The [`send`] function on [`OwnedPermit`]
1172 /// consumes the reserved capacity.
1173 ///
1174 /// Dropping the [`OwnedPermit`] without sending a message releases the
1175 /// capacity back to the channel.
1176 ///
1177 /// # Cancel safety
1178 ///
1179 /// This channel uses a queue to ensure that calls to `send` and `reserve`
1180 /// complete in the order they were requested. Cancelling a call to
1181 /// `reserve_owned` makes you lose your place in the queue.
1182 ///
1183 /// # Examples
1184 /// Sending a message using an [`OwnedPermit`]:
1185 /// ```
1186 /// use tokio::sync::mpsc;
1187 ///
1188 /// #[tokio::main]
1189 /// async fn main() {
1190 /// let (tx, mut rx) = mpsc::channel(1);
1191 ///
1192 /// // Reserve capacity, moving the sender.
1193 /// let permit = tx.reserve_owned().await.unwrap();
1194 ///
1195 /// // Send a message, consuming the permit and returning
1196 /// // the moved sender.
1197 /// let tx = permit.send(123);
1198 ///
1199 /// // The value sent on the permit is received.
1200 /// assert_eq!(rx.recv().await.unwrap(), 123);
1201 ///
1202 /// // The sender can now be used again.
1203 /// tx.send(456).await.unwrap();
1204 /// }
1205 /// ```
1206 ///
1207 /// When multiple [`OwnedPermit`]s are needed, or the sender cannot be moved
1208 /// by value, it can be inexpensively cloned before calling `reserve_owned`:
1209 ///
1210 /// ```
1211 /// use tokio::sync::mpsc;
1212 ///
1213 /// #[tokio::main]
1214 /// async fn main() {
1215 /// let (tx, mut rx) = mpsc::channel(1);
1216 ///
1217 /// // Clone the sender and reserve capacity.
1218 /// let permit = tx.clone().reserve_owned().await.unwrap();
1219 ///
1220 /// // Trying to send directly on the `tx` will fail due to no
1221 /// // available capacity.
1222 /// assert!(tx.try_send(123).is_err());
1223 ///
1224 /// // Sending on the permit succeeds.
1225 /// permit.send(456);
1226 ///
1227 /// // The value sent on the permit is received
1228 /// assert_eq!(rx.recv().await.unwrap(), 456);
1229 /// }
1230 /// ```
1231 ///
1232 /// [`Sender::reserve`]: Sender::reserve
1233 /// [`OwnedPermit`]: OwnedPermit
1234 /// [`send`]: OwnedPermit::send
1235 /// [`Arc::clone`]: std::sync::Arc::clone
1236 pub async fn reserve_owned(self) -> Result<OwnedPermit<T>, SendError<()>> {
1237 self.reserve_inner(1).await?;
1238 Ok(OwnedPermit {
1239 chan: Some(self.chan),
1240 })
1241 }
1242
1243 async fn reserve_inner(&self, n: usize) -> Result<(), SendError<()>> {
1244 crate::trace::async_trace_leaf().await;
1245
1246 if n > self.max_capacity() {
1247 return Err(SendError(()));
1248 }
1249 match self.chan.semaphore().semaphore.acquire(n).await {
1250 Ok(()) => Ok(()),
1251 Err(_) => Err(SendError(())),
1252 }
1253 }
1254
1255 /// Tries to acquire a slot in the channel without waiting for the slot to become
1256 /// available.
1257 ///
1258 /// If the channel is full this function will return [`TrySendError`], otherwise
1259 /// if there is a slot available it will return a [`Permit`] that will then allow you
1260 /// to [`send`] on the channel with a guaranteed slot. This function is similar to
1261 /// [`reserve`] except it does not await for the slot to become available.
1262 ///
1263 /// Dropping [`Permit`] without sending a message releases the capacity back
1264 /// to the channel.
1265 ///
1266 /// [`Permit`]: Permit
1267 /// [`send`]: Permit::send
1268 /// [`reserve`]: Sender::reserve
1269 ///
1270 /// # Examples
1271 ///
1272 /// ```
1273 /// use tokio::sync::mpsc;
1274 ///
1275 /// #[tokio::main]
1276 /// async fn main() {
1277 /// let (tx, mut rx) = mpsc::channel(1);
1278 ///
1279 /// // Reserve capacity
1280 /// let permit = tx.try_reserve().unwrap();
1281 ///
1282 /// // Trying to send directly on the `tx` will fail due to no
1283 /// // available capacity.
1284 /// assert!(tx.try_send(123).is_err());
1285 ///
1286 /// // Trying to reserve an additional slot on the `tx` will
1287 /// // fail because there is no capacity.
1288 /// assert!(tx.try_reserve().is_err());
1289 ///
1290 /// // Sending on the permit succeeds
1291 /// permit.send(456);
1292 ///
1293 /// // The value sent on the permit is received
1294 /// assert_eq!(rx.recv().await.unwrap(), 456);
1295 ///
1296 /// }
1297 /// ```
1298 pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> {
1299 match self.chan.semaphore().semaphore.try_acquire(1) {
1300 Ok(()) => {}
1301 Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())),
1302 Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())),
1303 }
1304
1305 Ok(Permit { chan: &self.chan })
1306 }
1307
1308 /// Tries to acquire `n` slots in the channel without waiting for the slot to become
1309 /// available.
1310 ///
1311 /// A [`PermitIterator`] is returned to track the reserved capacity.
1312 /// You can call this [`Iterator`] until it is exhausted to
1313 /// get a [`Permit`] and then call [`Permit::send`]. This function is similar to
1314 /// [`reserve_many`] except it does not await for the slots to become available.
1315 ///
1316 /// If there are fewer than `n` permits available on the channel, then
1317 /// this function will return a [`TrySendError::Full`]. If the channel is closed
1318 /// this function will return a [`TrySendError::Closed`].
1319 ///
1320 /// Dropping [`PermitIterator`] without consuming it entirely releases the remaining
1321 /// permits back to the channel.
1322 ///
1323 /// [`PermitIterator`]: PermitIterator
1324 /// [`send`]: Permit::send
1325 /// [`reserve_many`]: Sender::reserve_many
1326 ///
1327 /// # Examples
1328 ///
1329 /// ```
1330 /// use tokio::sync::mpsc;
1331 ///
1332 /// #[tokio::main]
1333 /// async fn main() {
1334 /// let (tx, mut rx) = mpsc::channel(2);
1335 ///
1336 /// // Reserve capacity
1337 /// let mut permit = tx.try_reserve_many(2).unwrap();
1338 ///
1339 /// // Trying to send directly on the `tx` will fail due to no
1340 /// // available capacity.
1341 /// assert!(tx.try_send(123).is_err());
1342 ///
1343 /// // Trying to reserve an additional slot on the `tx` will
1344 /// // fail because there is no capacity.
1345 /// assert!(tx.try_reserve().is_err());
1346 ///
1347 /// // Sending with the permit iterator succeeds
1348 /// permit.next().unwrap().send(456);
1349 /// permit.next().unwrap().send(457);
1350 ///
1351 /// // The iterator should now be exhausted
1352 /// assert!(permit.next().is_none());
1353 ///
1354 /// // The value sent on the permit is received
1355 /// assert_eq!(rx.recv().await.unwrap(), 456);
1356 /// assert_eq!(rx.recv().await.unwrap(), 457);
1357 ///
1358 /// // Trying to call try_reserve_many with 0 will return an empty iterator
1359 /// let mut permit = tx.try_reserve_many(0).unwrap();
1360 /// assert!(permit.next().is_none());
1361 ///
1362 /// // Trying to call try_reserve_many with a number greater than the channel
1363 /// // capacity will return an error
1364 /// let permit = tx.try_reserve_many(3);
1365 /// assert!(permit.is_err());
1366 ///
1367 /// // Trying to call try_reserve_many on a closed channel will return an error
1368 /// drop(rx);
1369 /// let permit = tx.try_reserve_many(1);
1370 /// assert!(permit.is_err());
1371 ///
1372 /// let permit = tx.try_reserve_many(0);
1373 /// assert!(permit.is_err());
1374 /// }
1375 /// ```
1376 pub fn try_reserve_many(&self, n: usize) -> Result<PermitIterator<'_, T>, TrySendError<()>> {
1377 if n > self.max_capacity() {
1378 return Err(TrySendError::Full(()));
1379 }
1380
1381 match self.chan.semaphore().semaphore.try_acquire(n) {
1382 Ok(()) => {}
1383 Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())),
1384 Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())),
1385 }
1386
1387 Ok(PermitIterator {
1388 chan: &self.chan,
1389 n,
1390 })
1391 }
1392
1393 /// Tries to acquire a slot in the channel without waiting for the slot to become
1394 /// available, returning an owned permit.
1395 ///
1396 /// This moves the sender _by value_, and returns an owned permit that can
1397 /// be used to send a message into the channel. Unlike [`Sender::try_reserve`],
1398 /// this method may be used in cases where the permit must be valid for the
1399 /// `'static` lifetime. `Sender`s may be cloned cheaply (`Sender::clone` is
1400 /// essentially a reference count increment, comparable to [`Arc::clone`]),
1401 /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be
1402 /// moved, it can be cloned prior to calling `try_reserve_owned`.
1403 ///
1404 /// If the channel is full this function will return a [`TrySendError`].
1405 /// Since the sender is taken by value, the `TrySendError` returned in this
1406 /// case contains the sender, so that it may be used again. Otherwise, if
1407 /// there is a slot available, this method will return an [`OwnedPermit`]
1408 /// that can then be used to [`send`] on the channel with a guaranteed slot.
1409 /// This function is similar to [`reserve_owned`] except it does not await
1410 /// for the slot to become available.
1411 ///
1412 /// Dropping the [`OwnedPermit`] without sending a message releases the capacity back
1413 /// to the channel.
1414 ///
1415 /// [`OwnedPermit`]: OwnedPermit
1416 /// [`send`]: OwnedPermit::send
1417 /// [`reserve_owned`]: Sender::reserve_owned
1418 /// [`Arc::clone`]: std::sync::Arc::clone
1419 ///
1420 /// # Examples
1421 ///
1422 /// ```
1423 /// use tokio::sync::mpsc;
1424 ///
1425 /// #[tokio::main]
1426 /// async fn main() {
1427 /// let (tx, mut rx) = mpsc::channel(1);
1428 ///
1429 /// // Reserve capacity
1430 /// let permit = tx.clone().try_reserve_owned().unwrap();
1431 ///
1432 /// // Trying to send directly on the `tx` will fail due to no
1433 /// // available capacity.
1434 /// assert!(tx.try_send(123).is_err());
1435 ///
1436 /// // Trying to reserve an additional slot on the `tx` will
1437 /// // fail because there is no capacity.
1438 /// assert!(tx.try_reserve().is_err());
1439 ///
1440 /// // Sending on the permit succeeds
1441 /// permit.send(456);
1442 ///
1443 /// // The value sent on the permit is received
1444 /// assert_eq!(rx.recv().await.unwrap(), 456);
1445 ///
1446 /// }
1447 /// ```
1448 pub fn try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>> {
1449 match self.chan.semaphore().semaphore.try_acquire(1) {
1450 Ok(()) => {}
1451 Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(self)),
1452 Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(self)),
1453 }
1454
1455 Ok(OwnedPermit {
1456 chan: Some(self.chan),
1457 })
1458 }
1459
1460 /// Returns `true` if senders belong to the same channel.
1461 ///
1462 /// # Examples
1463 ///
1464 /// ```
1465 /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(1);
1466 /// let tx2 = tx.clone();
1467 /// assert!(tx.same_channel(&tx2));
1468 ///
1469 /// let (tx3, rx3) = tokio::sync::mpsc::channel::<()>(1);
1470 /// assert!(!tx3.same_channel(&tx2));
1471 /// ```
1472 pub fn same_channel(&self, other: &Self) -> bool {
1473 self.chan.same_channel(&other.chan)
1474 }
1475
1476 /// Returns the current capacity of the channel.
1477 ///
1478 /// The capacity goes down when sending a value by calling [`send`] or by reserving capacity
1479 /// with [`reserve`]. The capacity goes up when values are received by the [`Receiver`].
1480 /// This is distinct from [`max_capacity`], which always returns buffer capacity initially
1481 /// specified when calling [`channel`]
1482 ///
1483 /// # Examples
1484 ///
1485 /// ```
1486 /// use tokio::sync::mpsc;
1487 ///
1488 /// #[tokio::main]
1489 /// async fn main() {
1490 /// let (tx, mut rx) = mpsc::channel::<()>(5);
1491 ///
1492 /// assert_eq!(tx.capacity(), 5);
1493 ///
1494 /// // Making a reservation drops the capacity by one.
1495 /// let permit = tx.reserve().await.unwrap();
1496 /// assert_eq!(tx.capacity(), 4);
1497 ///
1498 /// // Sending and receiving a value increases the capacity by one.
1499 /// permit.send(());
1500 /// rx.recv().await.unwrap();
1501 /// assert_eq!(tx.capacity(), 5);
1502 /// }
1503 /// ```
1504 ///
1505 /// [`send`]: Sender::send
1506 /// [`reserve`]: Sender::reserve
1507 /// [`channel`]: channel
1508 /// [`max_capacity`]: Sender::max_capacity
1509 pub fn capacity(&self) -> usize {
1510 self.chan.semaphore().semaphore.available_permits()
1511 }
1512
1513 /// Converts the `Sender` to a [`WeakSender`] that does not count
1514 /// towards RAII semantics, i.e. if all `Sender` instances of the
1515 /// channel were dropped and only `WeakSender` instances remain,
1516 /// the channel is closed.
1517 #[must_use = "Downgrade creates a WeakSender without destroying the original non-weak sender."]
1518 pub fn downgrade(&self) -> WeakSender<T> {
1519 WeakSender {
1520 chan: self.chan.downgrade(),
1521 }
1522 }
1523
1524 /// Returns the maximum buffer capacity of the channel.
1525 ///
1526 /// The maximum capacity is the buffer capacity initially specified when calling
1527 /// [`channel`]. This is distinct from [`capacity`], which returns the *current*
1528 /// available buffer capacity: as messages are sent and received, the
1529 /// value returned by [`capacity`] will go up or down, whereas the value
1530 /// returned by [`max_capacity`] will remain constant.
1531 ///
1532 /// # Examples
1533 ///
1534 /// ```
1535 /// use tokio::sync::mpsc;
1536 ///
1537 /// #[tokio::main]
1538 /// async fn main() {
1539 /// let (tx, _rx) = mpsc::channel::<()>(5);
1540 ///
1541 /// // both max capacity and capacity are the same at first
1542 /// assert_eq!(tx.max_capacity(), 5);
1543 /// assert_eq!(tx.capacity(), 5);
1544 ///
1545 /// // Making a reservation doesn't change the max capacity.
1546 /// let permit = tx.reserve().await.unwrap();
1547 /// assert_eq!(tx.max_capacity(), 5);
1548 /// // but drops the capacity by one
1549 /// assert_eq!(tx.capacity(), 4);
1550 /// }
1551 /// ```
1552 ///
1553 /// [`channel`]: channel
1554 /// [`max_capacity`]: Sender::max_capacity
1555 /// [`capacity`]: Sender::capacity
1556 pub fn max_capacity(&self) -> usize {
1557 self.chan.semaphore().bound
1558 }
1559
1560 /// Returns the number of [`Sender`] handles.
1561 pub fn strong_count(&self) -> usize {
1562 self.chan.strong_count()
1563 }
1564
1565 /// Returns the number of [`WeakSender`] handles.
1566 pub fn weak_count(&self) -> usize {
1567 self.chan.weak_count()
1568 }
1569}
1570
1571impl<T> Clone for Sender<T> {
1572 fn clone(&self) -> Self {
1573 Sender {
1574 chan: self.chan.clone(),
1575 }
1576 }
1577}
1578
1579impl<T> fmt::Debug for Sender<T> {
1580 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1581 fmt.debug_struct("Sender")
1582 .field("chan", &self.chan)
1583 .finish()
1584 }
1585}
1586
1587impl<T> Clone for WeakSender<T> {
1588 fn clone(&self) -> Self {
1589 self.chan.increment_weak_count();
1590
1591 WeakSender {
1592 chan: self.chan.clone(),
1593 }
1594 }
1595}
1596
1597impl<T> Drop for WeakSender<T> {
1598 fn drop(&mut self) {
1599 self.chan.decrement_weak_count();
1600 }
1601}
1602
1603impl<T> WeakSender<T> {
1604 /// Tries to convert a `WeakSender` into a [`Sender`]. This will return `Some`
1605 /// if there are other `Sender` instances alive and the channel wasn't
1606 /// previously dropped, otherwise `None` is returned.
1607 pub fn upgrade(&self) -> Option<Sender<T>> {
1608 chan::Tx::upgrade(self.chan.clone()).map(Sender::new)
1609 }
1610
1611 /// Returns the number of [`Sender`] handles.
1612 pub fn strong_count(&self) -> usize {
1613 self.chan.strong_count()
1614 }
1615
1616 /// Returns the number of [`WeakSender`] handles.
1617 pub fn weak_count(&self) -> usize {
1618 self.chan.weak_count()
1619 }
1620}
1621
1622impl<T> fmt::Debug for WeakSender<T> {
1623 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1624 fmt.debug_struct("WeakSender").finish()
1625 }
1626}
1627
1628// ===== impl Permit =====
1629
1630impl<T> Permit<'_, T> {
1631 /// Sends a value using the reserved capacity.
1632 ///
1633 /// Capacity for the message has already been reserved. The message is sent
1634 /// to the receiver and the permit is consumed. The operation will succeed
1635 /// even if the receiver half has been closed. See [`Receiver::close`] for
1636 /// more details on performing a clean shutdown.
1637 ///
1638 /// [`Receiver::close`]: Receiver::close
1639 ///
1640 /// # Examples
1641 ///
1642 /// ```
1643 /// use tokio::sync::mpsc;
1644 ///
1645 /// #[tokio::main]
1646 /// async fn main() {
1647 /// let (tx, mut rx) = mpsc::channel(1);
1648 ///
1649 /// // Reserve capacity
1650 /// let permit = tx.reserve().await.unwrap();
1651 ///
1652 /// // Trying to send directly on the `tx` will fail due to no
1653 /// // available capacity.
1654 /// assert!(tx.try_send(123).is_err());
1655 ///
1656 /// // Send a message on the permit
1657 /// permit.send(456);
1658 ///
1659 /// // The value sent on the permit is received
1660 /// assert_eq!(rx.recv().await.unwrap(), 456);
1661 /// }
1662 /// ```
1663 pub fn send(self, value: T) {
1664 use std::mem;
1665
1666 self.chan.send(value);
1667
1668 // Avoid the drop logic
1669 mem::forget(self);
1670 }
1671}
1672
1673impl<T> Drop for Permit<'_, T> {
1674 fn drop(&mut self) {
1675 use chan::Semaphore;
1676
1677 let semaphore = self.chan.semaphore();
1678
1679 // Add the permit back to the semaphore
1680 semaphore.add_permit();
1681
1682 // If this is the last sender for this channel, wake the receiver so
1683 // that it can be notified that the channel is closed.
1684 if semaphore.is_closed() && semaphore.is_idle() {
1685 self.chan.wake_rx();
1686 }
1687 }
1688}
1689
1690impl<T> fmt::Debug for Permit<'_, T> {
1691 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1692 fmt.debug_struct("Permit")
1693 .field("chan", &self.chan)
1694 .finish()
1695 }
1696}
1697
1698// ===== impl PermitIterator =====
1699
1700impl<'a, T> Iterator for PermitIterator<'a, T> {
1701 type Item = Permit<'a, T>;
1702
1703 fn next(&mut self) -> Option<Self::Item> {
1704 if self.n == 0 {
1705 return None;
1706 }
1707
1708 self.n -= 1;
1709 Some(Permit { chan: self.chan })
1710 }
1711
1712 fn size_hint(&self) -> (usize, Option<usize>) {
1713 let n = self.n;
1714 (n, Some(n))
1715 }
1716}
1717impl<T> ExactSizeIterator for PermitIterator<'_, T> {}
1718impl<T> std::iter::FusedIterator for PermitIterator<'_, T> {}
1719
1720impl<T> Drop for PermitIterator<'_, T> {
1721 fn drop(&mut self) {
1722 use chan::Semaphore;
1723
1724 if self.n == 0 {
1725 return;
1726 }
1727
1728 let semaphore = self.chan.semaphore();
1729
1730 // Add the remaining permits back to the semaphore
1731 semaphore.add_permits(self.n);
1732
1733 // If this is the last sender for this channel, wake the receiver so
1734 // that it can be notified that the channel is closed.
1735 if semaphore.is_closed() && semaphore.is_idle() {
1736 self.chan.wake_rx();
1737 }
1738 }
1739}
1740
1741impl<T> fmt::Debug for PermitIterator<'_, T> {
1742 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1743 fmt.debug_struct("PermitIterator")
1744 .field("chan", &self.chan)
1745 .field("capacity", &self.n)
1746 .finish()
1747 }
1748}
1749
1750// ===== impl Permit =====
1751
1752impl<T> OwnedPermit<T> {
1753 /// Sends a value using the reserved capacity.
1754 ///
1755 /// Capacity for the message has already been reserved. The message is sent
1756 /// to the receiver and the permit is consumed. The operation will succeed
1757 /// even if the receiver half has been closed. See [`Receiver::close`] for
1758 /// more details on performing a clean shutdown.
1759 ///
1760 /// Unlike [`Permit::send`], this method returns the [`Sender`] from which
1761 /// the `OwnedPermit` was reserved.
1762 ///
1763 /// [`Receiver::close`]: Receiver::close
1764 ///
1765 /// # Examples
1766 ///
1767 /// ```
1768 /// use tokio::sync::mpsc;
1769 ///
1770 /// #[tokio::main]
1771 /// async fn main() {
1772 /// let (tx, mut rx) = mpsc::channel(1);
1773 ///
1774 /// // Reserve capacity
1775 /// let permit = tx.reserve_owned().await.unwrap();
1776 ///
1777 /// // Send a message on the permit, returning the sender.
1778 /// let tx = permit.send(456);
1779 ///
1780 /// // The value sent on the permit is received
1781 /// assert_eq!(rx.recv().await.unwrap(), 456);
1782 ///
1783 /// // We may now reuse `tx` to send another message.
1784 /// tx.send(789).await.unwrap();
1785 /// }
1786 /// ```
1787 pub fn send(mut self, value: T) -> Sender<T> {
1788 let chan = self.chan.take().unwrap_or_else(|| {
1789 unreachable!("OwnedPermit channel is only taken when the permit is moved")
1790 });
1791 chan.send(value);
1792
1793 Sender { chan }
1794 }
1795
1796 /// Releases the reserved capacity *without* sending a message, returning the
1797 /// [`Sender`].
1798 ///
1799 /// # Examples
1800 ///
1801 /// ```
1802 /// use tokio::sync::mpsc;
1803 ///
1804 /// #[tokio::main]
1805 /// async fn main() {
1806 /// let (tx, rx) = mpsc::channel(1);
1807 ///
1808 /// // Clone the sender and reserve capacity
1809 /// let permit = tx.clone().reserve_owned().await.unwrap();
1810 ///
1811 /// // Trying to send on the original `tx` will fail, since the `permit`
1812 /// // has reserved all the available capacity.
1813 /// assert!(tx.try_send(123).is_err());
1814 ///
1815 /// // Release the permit without sending a message, returning the clone
1816 /// // of the sender.
1817 /// let tx2 = permit.release();
1818 ///
1819 /// // We may now reuse `tx` to send another message.
1820 /// tx.send(789).await.unwrap();
1821 /// # drop(rx); drop(tx2);
1822 /// }
1823 /// ```
1824 ///
1825 /// [`Sender`]: Sender
1826 pub fn release(mut self) -> Sender<T> {
1827 use chan::Semaphore;
1828
1829 let chan = self.chan.take().unwrap_or_else(|| {
1830 unreachable!("OwnedPermit channel is only taken when the permit is moved")
1831 });
1832
1833 // Add the permit back to the semaphore
1834 chan.semaphore().add_permit();
1835 Sender { chan }
1836 }
1837}
1838
1839impl<T> Drop for OwnedPermit<T> {
1840 fn drop(&mut self) {
1841 use chan::Semaphore;
1842
1843 // Are we still holding onto the sender?
1844 if let Some(chan) = self.chan.take() {
1845 let semaphore = chan.semaphore();
1846
1847 // Add the permit back to the semaphore
1848 semaphore.add_permit();
1849
1850 // If this `OwnedPermit` is holding the last sender for this
1851 // channel, wake the receiver so that it can be notified that the
1852 // channel is closed.
1853 if semaphore.is_closed() && semaphore.is_idle() {
1854 chan.wake_rx();
1855 }
1856 }
1857
1858 // Otherwise, do nothing.
1859 }
1860}
1861
1862impl<T> fmt::Debug for OwnedPermit<T> {
1863 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1864 fmt.debug_struct("OwnedPermit")
1865 .field("chan", &self.chan)
1866 .finish()
1867 }
1868}