tokio/sync/mpsc/unbounded.rs
1use crate::loom::sync::{atomic::AtomicUsize, Arc};
2use crate::sync::mpsc::chan;
3use crate::sync::mpsc::error::{SendError, TryRecvError};
4
5use std::fmt;
6use std::task::{Context, Poll};
7
8/// Send values to the associated `UnboundedReceiver`.
9///
10/// Instances are created by the [`unbounded_channel`] function.
11pub struct UnboundedSender<T> {
12 chan: chan::Tx<T, Semaphore>,
13}
14
15/// An unbounded sender that does not prevent the channel from being closed.
16///
17/// If all [`UnboundedSender`] instances of a channel were dropped and only
18/// `WeakUnboundedSender` instances remain, the channel is closed.
19///
20/// In order to send messages, the `WeakUnboundedSender` needs to be upgraded using
21/// [`WeakUnboundedSender::upgrade`], which returns `Option<UnboundedSender>`. It returns `None`
22/// if all `UnboundedSender`s have been dropped, and otherwise it returns an `UnboundedSender`.
23///
24/// [`UnboundedSender`]: UnboundedSender
25/// [`WeakUnboundedSender::upgrade`]: WeakUnboundedSender::upgrade
26///
27/// # Examples
28///
29/// ```
30/// use tokio::sync::mpsc::unbounded_channel;
31///
32/// #[tokio::main]
33/// async fn main() {
34/// let (tx, _rx) = unbounded_channel::<i32>();
35/// let tx_weak = tx.downgrade();
36///
37/// // Upgrading will succeed because `tx` still exists.
38/// assert!(tx_weak.upgrade().is_some());
39///
40/// // If we drop `tx`, then it will fail.
41/// drop(tx);
42/// assert!(tx_weak.clone().upgrade().is_none());
43/// }
44/// ```
45pub struct WeakUnboundedSender<T> {
46 chan: Arc<chan::Chan<T, Semaphore>>,
47}
48
49impl<T> Clone for UnboundedSender<T> {
50 fn clone(&self) -> Self {
51 UnboundedSender {
52 chan: self.chan.clone(),
53 }
54 }
55}
56
57impl<T> fmt::Debug for UnboundedSender<T> {
58 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
59 fmt.debug_struct("UnboundedSender")
60 .field("chan", &self.chan)
61 .finish()
62 }
63}
64
65/// Receive values from the associated `UnboundedSender`.
66///
67/// Instances are created by the [`unbounded_channel`] function.
68///
69/// This receiver can be turned into a `Stream` using [`UnboundedReceiverStream`].
70///
71/// [`UnboundedReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.UnboundedReceiverStream.html
72pub struct UnboundedReceiver<T> {
73 /// The channel receiver
74 chan: chan::Rx<T, Semaphore>,
75}
76
77impl<T> fmt::Debug for UnboundedReceiver<T> {
78 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
79 fmt.debug_struct("UnboundedReceiver")
80 .field("chan", &self.chan)
81 .finish()
82 }
83}
84
85/// Creates an unbounded mpsc channel for communicating between asynchronous
86/// tasks without backpressure.
87///
88/// A `send` on this channel will always succeed as long as the receive half has
89/// not been closed. If the receiver falls behind, messages will be arbitrarily
90/// buffered.
91///
92/// **Note** that the amount of available system memory is an implicit bound to
93/// the channel. Using an `unbounded` channel has the ability of causing the
94/// process to run out of memory. In this case, the process will be aborted.
95pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
96 let (tx, rx) = chan::channel(Semaphore(AtomicUsize::new(0)));
97
98 let tx = UnboundedSender::new(tx);
99 let rx = UnboundedReceiver::new(rx);
100
101 (tx, rx)
102}
103
104/// No capacity
105#[derive(Debug)]
106pub(crate) struct Semaphore(pub(crate) AtomicUsize);
107
108impl<T> UnboundedReceiver<T> {
109 pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> UnboundedReceiver<T> {
110 UnboundedReceiver { chan }
111 }
112
113 /// Receives the next value for this receiver.
114 ///
115 /// This method returns `None` if the channel has been closed and there are
116 /// no remaining messages in the channel's buffer. This indicates that no
117 /// further values can ever be received from this `Receiver`. The channel is
118 /// closed when all senders have been dropped, or when [`close`] is called.
119 ///
120 /// If there are no messages in the channel's buffer, but the channel has
121 /// not yet been closed, this method will sleep until a message is sent or
122 /// the channel is closed.
123 ///
124 /// # Cancel safety
125 ///
126 /// This method is cancel safe. If `recv` is used as the event in a
127 /// [`tokio::select!`](crate::select) statement and some other branch
128 /// completes first, it is guaranteed that no messages were received on this
129 /// channel.
130 ///
131 /// [`close`]: Self::close
132 ///
133 /// # Examples
134 ///
135 /// ```
136 /// use tokio::sync::mpsc;
137 ///
138 /// #[tokio::main]
139 /// async fn main() {
140 /// let (tx, mut rx) = mpsc::unbounded_channel();
141 ///
142 /// tokio::spawn(async move {
143 /// tx.send("hello").unwrap();
144 /// });
145 ///
146 /// assert_eq!(Some("hello"), rx.recv().await);
147 /// assert_eq!(None, rx.recv().await);
148 /// }
149 /// ```
150 ///
151 /// Values are buffered:
152 ///
153 /// ```
154 /// use tokio::sync::mpsc;
155 ///
156 /// #[tokio::main]
157 /// async fn main() {
158 /// let (tx, mut rx) = mpsc::unbounded_channel();
159 ///
160 /// tx.send("hello").unwrap();
161 /// tx.send("world").unwrap();
162 ///
163 /// assert_eq!(Some("hello"), rx.recv().await);
164 /// assert_eq!(Some("world"), rx.recv().await);
165 /// }
166 /// ```
167 pub async fn recv(&mut self) -> Option<T> {
168 use crate::future::poll_fn;
169
170 poll_fn(|cx| self.poll_recv(cx)).await
171 }
172
173 /// Receives the next values for this receiver and extends `buffer`.
174 ///
175 /// This method extends `buffer` by no more than a fixed number of values
176 /// as specified by `limit`. If `limit` is zero, the function returns
177 /// immediately with `0`. The return value is the number of values added to
178 /// `buffer`.
179 ///
180 /// For `limit > 0`, if there are no messages in the channel's queue,
181 /// but the channel has not yet been closed, this method will sleep
182 /// until a message is sent or the channel is closed.
183 ///
184 /// For non-zero values of `limit`, this method will never return `0` unless
185 /// the channel has been closed and there are no remaining messages in the
186 /// channel's queue. This indicates that no further values can ever be
187 /// received from this `Receiver`. The channel is closed when all senders
188 /// have been dropped, or when [`close`] is called.
189 ///
190 /// The capacity of `buffer` is increased as needed.
191 ///
192 /// # Cancel safety
193 ///
194 /// This method is cancel safe. If `recv_many` is used as the event in a
195 /// [`tokio::select!`](crate::select) statement and some other branch
196 /// completes first, it is guaranteed that no messages were received on this
197 /// channel.
198 ///
199 /// [`close`]: Self::close
200 ///
201 /// # Examples
202 ///
203 /// ```
204 /// use tokio::sync::mpsc;
205 ///
206 /// #[tokio::main]
207 /// async fn main() {
208 /// let mut buffer: Vec<&str> = Vec::with_capacity(2);
209 /// let limit = 2;
210 /// let (tx, mut rx) = mpsc::unbounded_channel();
211 /// let tx2 = tx.clone();
212 /// tx2.send("first").unwrap();
213 /// tx2.send("second").unwrap();
214 /// tx2.send("third").unwrap();
215 ///
216 /// // Call `recv_many` to receive up to `limit` (2) values.
217 /// assert_eq!(2, rx.recv_many(&mut buffer, limit).await);
218 /// assert_eq!(vec!["first", "second"], buffer);
219 ///
220 /// // If the buffer is full, the next call to `recv_many`
221 /// // reserves additional capacity.
222 /// assert_eq!(1, rx.recv_many(&mut buffer, limit).await);
223 ///
224 /// tokio::spawn(async move {
225 /// tx.send("fourth").unwrap();
226 /// });
227 ///
228 /// // 'tx' is dropped, but `recv_many`
229 /// // is guaranteed not to return 0 as the channel
230 /// // is not yet closed.
231 /// assert_eq!(1, rx.recv_many(&mut buffer, limit).await);
232 /// assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
233 ///
234 /// // Once the last sender is dropped, the channel is
235 /// // closed and `recv_many` returns 0, capacity unchanged.
236 /// drop(tx2);
237 /// assert_eq!(0, rx.recv_many(&mut buffer, limit).await);
238 /// assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
239 /// }
240 /// ```
241 pub async fn recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize {
242 use crate::future::poll_fn;
243 poll_fn(|cx| self.chan.recv_many(cx, buffer, limit)).await
244 }
245
246 /// Tries to receive the next value for this receiver.
247 ///
248 /// This method returns the [`Empty`] error if the channel is currently
249 /// empty, but there are still outstanding [senders] or [permits].
250 ///
251 /// This method returns the [`Disconnected`] error if the channel is
252 /// currently empty, and there are no outstanding [senders] or [permits].
253 ///
254 /// Unlike the [`poll_recv`] method, this method will never return an
255 /// [`Empty`] error spuriously.
256 ///
257 /// [`Empty`]: crate::sync::mpsc::error::TryRecvError::Empty
258 /// [`Disconnected`]: crate::sync::mpsc::error::TryRecvError::Disconnected
259 /// [`poll_recv`]: Self::poll_recv
260 /// [senders]: crate::sync::mpsc::Sender
261 /// [permits]: crate::sync::mpsc::Permit
262 ///
263 /// # Examples
264 ///
265 /// ```
266 /// use tokio::sync::mpsc;
267 /// use tokio::sync::mpsc::error::TryRecvError;
268 ///
269 /// #[tokio::main]
270 /// async fn main() {
271 /// let (tx, mut rx) = mpsc::unbounded_channel();
272 ///
273 /// tx.send("hello").unwrap();
274 ///
275 /// assert_eq!(Ok("hello"), rx.try_recv());
276 /// assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
277 ///
278 /// tx.send("hello").unwrap();
279 /// // Drop the last sender, closing the channel.
280 /// drop(tx);
281 ///
282 /// assert_eq!(Ok("hello"), rx.try_recv());
283 /// assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
284 /// }
285 /// ```
286 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
287 self.chan.try_recv()
288 }
289
290 /// Blocking receive to call outside of asynchronous contexts.
291 ///
292 /// # Panics
293 ///
294 /// This function panics if called within an asynchronous execution
295 /// context.
296 ///
297 /// # Examples
298 ///
299 /// ```
300 /// use std::thread;
301 /// use tokio::sync::mpsc;
302 ///
303 /// #[tokio::main]
304 /// async fn main() {
305 /// let (tx, mut rx) = mpsc::unbounded_channel::<u8>();
306 ///
307 /// let sync_code = thread::spawn(move || {
308 /// assert_eq!(Some(10), rx.blocking_recv());
309 /// });
310 ///
311 /// let _ = tx.send(10);
312 /// sync_code.join().unwrap();
313 /// }
314 /// ```
315 #[track_caller]
316 #[cfg(feature = "sync")]
317 #[cfg_attr(docsrs, doc(alias = "recv_blocking"))]
318 pub fn blocking_recv(&mut self) -> Option<T> {
319 crate::future::block_on(self.recv())
320 }
321
322 /// Closes the receiving half of a channel, without dropping it.
323 ///
324 /// This prevents any further messages from being sent on the channel while
325 /// still enabling the receiver to drain messages that are buffered.
326 ///
327 /// To guarantee that no messages are dropped, after calling `close()`,
328 /// `recv()` must be called until `None` is returned.
329 pub fn close(&mut self) {
330 self.chan.close();
331 }
332
333 /// Checks if a channel is closed.
334 ///
335 /// This method returns `true` if the channel has been closed. The channel is closed
336 /// when all [`UnboundedSender`] have been dropped, or when [`UnboundedReceiver::close`] is called.
337 ///
338 /// [`UnboundedSender`]: crate::sync::mpsc::UnboundedSender
339 /// [`UnboundedReceiver::close`]: crate::sync::mpsc::UnboundedReceiver::close
340 ///
341 /// # Examples
342 /// ```
343 /// use tokio::sync::mpsc;
344 ///
345 /// #[tokio::main]
346 /// async fn main() {
347 /// let (_tx, mut rx) = mpsc::unbounded_channel::<()>();
348 /// assert!(!rx.is_closed());
349 ///
350 /// rx.close();
351 ///
352 /// assert!(rx.is_closed());
353 /// }
354 /// ```
355 pub fn is_closed(&self) -> bool {
356 self.chan.is_closed()
357 }
358
359 /// Checks if a channel is empty.
360 ///
361 /// This method returns `true` if the channel has no messages.
362 ///
363 /// # Examples
364 /// ```
365 /// use tokio::sync::mpsc;
366 ///
367 /// #[tokio::main]
368 /// async fn main() {
369 /// let (tx, rx) = mpsc::unbounded_channel();
370 /// assert!(rx.is_empty());
371 ///
372 /// tx.send(0).unwrap();
373 /// assert!(!rx.is_empty());
374 /// }
375 ///
376 /// ```
377 pub fn is_empty(&self) -> bool {
378 self.chan.is_empty()
379 }
380
381 /// Returns the number of messages in the channel.
382 ///
383 /// # Examples
384 /// ```
385 /// use tokio::sync::mpsc;
386 ///
387 /// #[tokio::main]
388 /// async fn main() {
389 /// let (tx, rx) = mpsc::unbounded_channel();
390 /// assert_eq!(0, rx.len());
391 ///
392 /// tx.send(0).unwrap();
393 /// assert_eq!(1, rx.len());
394 /// }
395 /// ```
396 pub fn len(&self) -> usize {
397 self.chan.len()
398 }
399
400 /// Polls to receive the next message on this channel.
401 ///
402 /// This method returns:
403 ///
404 /// * `Poll::Pending` if no messages are available but the channel is not
405 /// closed, or if a spurious failure happens.
406 /// * `Poll::Ready(Some(message))` if a message is available.
407 /// * `Poll::Ready(None)` if the channel has been closed and all messages
408 /// sent before it was closed have been received.
409 ///
410 /// When the method returns `Poll::Pending`, the `Waker` in the provided
411 /// `Context` is scheduled to receive a wakeup when a message is sent on any
412 /// receiver, or when the channel is closed. Note that on multiple calls to
413 /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
414 /// passed to the most recent call is scheduled to receive a wakeup.
415 ///
416 /// If this method returns `Poll::Pending` due to a spurious failure, then
417 /// the `Waker` will be notified when the situation causing the spurious
418 /// failure has been resolved. Note that receiving such a wakeup does not
419 /// guarantee that the next call will succeed — it could fail with another
420 /// spurious failure.
421 pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
422 self.chan.recv(cx)
423 }
424
425 /// Polls to receive multiple messages on this channel, extending the provided buffer.
426 ///
427 /// This method returns:
428 /// * `Poll::Pending` if no messages are available but the channel is not closed, or if a
429 /// spurious failure happens.
430 /// * `Poll::Ready(count)` where `count` is the number of messages successfully received and
431 /// stored in `buffer`. This can be less than, or equal to, `limit`.
432 /// * `Poll::Ready(0)` if `limit` is set to zero or when the channel is closed.
433 ///
434 /// When the method returns `Poll::Pending`, the `Waker` in the provided
435 /// `Context` is scheduled to receive a wakeup when a message is sent on any
436 /// receiver, or when the channel is closed. Note that on multiple calls to
437 /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
438 /// passed to the most recent call is scheduled to receive a wakeup.
439 ///
440 /// Note that this method does not guarantee that exactly `limit` messages
441 /// are received. Rather, if at least one message is available, it returns
442 /// as many messages as it can up to the given limit. This method returns
443 /// zero only if the channel is closed (or if `limit` is zero).
444 ///
445 /// # Examples
446 ///
447 /// ```
448 /// use std::task::{Context, Poll};
449 /// use std::pin::Pin;
450 /// use tokio::sync::mpsc;
451 /// use futures::Future;
452 ///
453 /// struct MyReceiverFuture<'a> {
454 /// receiver: mpsc::UnboundedReceiver<i32>,
455 /// buffer: &'a mut Vec<i32>,
456 /// limit: usize,
457 /// }
458 ///
459 /// impl<'a> Future for MyReceiverFuture<'a> {
460 /// type Output = usize; // Number of messages received
461 ///
462 /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
463 /// let MyReceiverFuture { receiver, buffer, limit } = &mut *self;
464 ///
465 /// // Now `receiver` and `buffer` are mutable references, and `limit` is copied
466 /// match receiver.poll_recv_many(cx, *buffer, *limit) {
467 /// Poll::Pending => Poll::Pending,
468 /// Poll::Ready(count) => Poll::Ready(count),
469 /// }
470 /// }
471 /// }
472 ///
473 /// #[tokio::main]
474 /// async fn main() {
475 /// let (tx, rx) = mpsc::unbounded_channel::<i32>();
476 /// let mut buffer = Vec::new();
477 ///
478 /// let my_receiver_future = MyReceiverFuture {
479 /// receiver: rx,
480 /// buffer: &mut buffer,
481 /// limit: 3,
482 /// };
483 ///
484 /// for i in 0..10 {
485 /// tx.send(i).expect("Unable to send integer");
486 /// }
487 ///
488 /// let count = my_receiver_future.await;
489 /// assert_eq!(count, 3);
490 /// assert_eq!(buffer, vec![0,1,2])
491 /// }
492 /// ```
493 pub fn poll_recv_many(
494 &mut self,
495 cx: &mut Context<'_>,
496 buffer: &mut Vec<T>,
497 limit: usize,
498 ) -> Poll<usize> {
499 self.chan.recv_many(cx, buffer, limit)
500 }
501}
502
503impl<T> UnboundedSender<T> {
504 pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> UnboundedSender<T> {
505 UnboundedSender { chan }
506 }
507
508 /// Attempts to send a message on this `UnboundedSender` without blocking.
509 ///
510 /// This method is not marked async because sending a message to an unbounded channel
511 /// never requires any form of waiting. Because of this, the `send` method can be
512 /// used in both synchronous and asynchronous code without problems.
513 ///
514 /// If the receive half of the channel is closed, either due to [`close`]
515 /// being called or the [`UnboundedReceiver`] having been dropped, this
516 /// function returns an error. The error includes the value passed to `send`.
517 ///
518 /// [`close`]: UnboundedReceiver::close
519 /// [`UnboundedReceiver`]: UnboundedReceiver
520 pub fn send(&self, message: T) -> Result<(), SendError<T>> {
521 if !self.inc_num_messages() {
522 return Err(SendError(message));
523 }
524
525 self.chan.send(message);
526 Ok(())
527 }
528
529 fn inc_num_messages(&self) -> bool {
530 use std::process;
531 use std::sync::atomic::Ordering::{AcqRel, Acquire};
532
533 let mut curr = self.chan.semaphore().0.load(Acquire);
534
535 loop {
536 if curr & 1 == 1 {
537 return false;
538 }
539
540 if curr == usize::MAX ^ 1 {
541 // Overflowed the ref count. There is no safe way to recover, so
542 // abort the process. In practice, this should never happen.
543 process::abort()
544 }
545
546 match self
547 .chan
548 .semaphore()
549 .0
550 .compare_exchange(curr, curr + 2, AcqRel, Acquire)
551 {
552 Ok(_) => return true,
553 Err(actual) => {
554 curr = actual;
555 }
556 }
557 }
558 }
559
560 /// Completes when the receiver has dropped.
561 ///
562 /// This allows the producers to get notified when interest in the produced
563 /// values is canceled and immediately stop doing work.
564 ///
565 /// # Cancel safety
566 ///
567 /// This method is cancel safe. Once the channel is closed, it stays closed
568 /// forever and all future calls to `closed` will return immediately.
569 ///
570 /// # Examples
571 ///
572 /// ```
573 /// use tokio::sync::mpsc;
574 ///
575 /// #[tokio::main]
576 /// async fn main() {
577 /// let (tx1, rx) = mpsc::unbounded_channel::<()>();
578 /// let tx2 = tx1.clone();
579 /// let tx3 = tx1.clone();
580 /// let tx4 = tx1.clone();
581 /// let tx5 = tx1.clone();
582 /// tokio::spawn(async move {
583 /// drop(rx);
584 /// });
585 ///
586 /// futures::join!(
587 /// tx1.closed(),
588 /// tx2.closed(),
589 /// tx3.closed(),
590 /// tx4.closed(),
591 /// tx5.closed()
592 /// );
593 //// println!("Receiver dropped");
594 /// }
595 /// ```
596 pub async fn closed(&self) {
597 self.chan.closed().await;
598 }
599
600 /// Checks if the channel has been closed. This happens when the
601 /// [`UnboundedReceiver`] is dropped, or when the
602 /// [`UnboundedReceiver::close`] method is called.
603 ///
604 /// [`UnboundedReceiver`]: crate::sync::mpsc::UnboundedReceiver
605 /// [`UnboundedReceiver::close`]: crate::sync::mpsc::UnboundedReceiver::close
606 ///
607 /// ```
608 /// let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<()>();
609 /// assert!(!tx.is_closed());
610 ///
611 /// let tx2 = tx.clone();
612 /// assert!(!tx2.is_closed());
613 ///
614 /// drop(rx);
615 /// assert!(tx.is_closed());
616 /// assert!(tx2.is_closed());
617 /// ```
618 pub fn is_closed(&self) -> bool {
619 self.chan.is_closed()
620 }
621
622 /// Returns `true` if senders belong to the same channel.
623 ///
624 /// # Examples
625 ///
626 /// ```
627 /// let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<()>();
628 /// let tx2 = tx.clone();
629 /// assert!(tx.same_channel(&tx2));
630 ///
631 /// let (tx3, rx3) = tokio::sync::mpsc::unbounded_channel::<()>();
632 /// assert!(!tx3.same_channel(&tx2));
633 /// ```
634 pub fn same_channel(&self, other: &Self) -> bool {
635 self.chan.same_channel(&other.chan)
636 }
637
638 /// Converts the `UnboundedSender` to a [`WeakUnboundedSender`] that does not count
639 /// towards RAII semantics, i.e. if all `UnboundedSender` instances of the
640 /// channel were dropped and only `WeakUnboundedSender` instances remain,
641 /// the channel is closed.
642 #[must_use = "Downgrade creates a WeakSender without destroying the original non-weak sender."]
643 pub fn downgrade(&self) -> WeakUnboundedSender<T> {
644 WeakUnboundedSender {
645 chan: self.chan.downgrade(),
646 }
647 }
648
649 /// Returns the number of [`UnboundedSender`] handles.
650 pub fn strong_count(&self) -> usize {
651 self.chan.strong_count()
652 }
653
654 /// Returns the number of [`WeakUnboundedSender`] handles.
655 pub fn weak_count(&self) -> usize {
656 self.chan.weak_count()
657 }
658}
659
660impl<T> Clone for WeakUnboundedSender<T> {
661 fn clone(&self) -> Self {
662 self.chan.increment_weak_count();
663
664 WeakUnboundedSender {
665 chan: self.chan.clone(),
666 }
667 }
668}
669
670impl<T> Drop for WeakUnboundedSender<T> {
671 fn drop(&mut self) {
672 self.chan.decrement_weak_count();
673 }
674}
675
676impl<T> WeakUnboundedSender<T> {
677 /// Tries to convert a `WeakUnboundedSender` into an [`UnboundedSender`].
678 /// This will return `Some` if there are other `Sender` instances alive and
679 /// the channel wasn't previously dropped, otherwise `None` is returned.
680 pub fn upgrade(&self) -> Option<UnboundedSender<T>> {
681 chan::Tx::upgrade(self.chan.clone()).map(UnboundedSender::new)
682 }
683
684 /// Returns the number of [`UnboundedSender`] handles.
685 pub fn strong_count(&self) -> usize {
686 self.chan.strong_count()
687 }
688
689 /// Returns the number of [`WeakUnboundedSender`] handles.
690 pub fn weak_count(&self) -> usize {
691 self.chan.weak_count()
692 }
693}
694
695impl<T> fmt::Debug for WeakUnboundedSender<T> {
696 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
697 fmt.debug_struct("WeakUnboundedSender").finish()
698 }
699}