async_channel/lib.rs
1//! An async multi-producer multi-consumer channel, where each message can be received by only
2//! one of all existing consumers.
3//!
4//! There are two kinds of channels:
5//!
6//! 1. [Bounded][`bounded()`] channel with limited capacity.
7//! 2. [Unbounded][`unbounded()`] channel with unlimited capacity.
8//!
9//! A channel has the [`Sender`] and [`Receiver`] side. Both sides are cloneable and can be shared
10//! among multiple threads.
11//!
12//! When all [`Sender`]s or all [`Receiver`]s are dropped, the channel becomes closed. When a
13//! channel is closed, no more messages can be sent, but remaining messages can still be received.
14//!
15//! The channel can also be closed manually by calling [`Sender::close()`] or
16//! [`Receiver::close()`].
17//!
18//! # Examples
19//!
20//! ```
21//! # futures_lite::future::block_on(async {
22//! let (s, r) = async_channel::unbounded();
23//!
24//! assert_eq!(s.send("Hello").await, Ok(()));
25//! assert_eq!(r.recv().await, Ok("Hello"));
26//! # });
27//! ```
28
29#![forbid(unsafe_code)]
30#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
31
32use std::error;
33use std::fmt;
34use std::future::Future;
35use std::pin::Pin;
36use std::process;
37use std::sync::atomic::{AtomicUsize, Ordering};
38use std::sync::Arc;
39use std::task::{Context, Poll};
40use std::usize;
41
42use concurrent_queue::{ConcurrentQueue, PopError, PushError};
43use event_listener::{Event, EventListener};
44use futures_core::stream::Stream;
45
46struct Channel<T> {
47 /// Inner message queue.
48 queue: ConcurrentQueue<T>,
49
50 /// Send operations waiting while the channel is full.
51 send_ops: Event,
52
53 /// Receive operations waiting while the channel is empty and not closed.
54 recv_ops: Event,
55
56 /// Stream operations while the channel is empty and not closed.
57 stream_ops: Event,
58
59 /// The number of currently active `Sender`s.
60 sender_count: AtomicUsize,
61
62 /// The number of currently active `Receivers`s.
63 receiver_count: AtomicUsize,
64}
65
66impl<T> Channel<T> {
67 /// Closes the channel and notifies all blocked operations.
68 ///
69 /// Returns `true` if this call has closed the channel and it was not closed already.
70 fn close(&self) -> bool {
71 if self.queue.close() {
72 // Notify all send operations.
73 self.send_ops.notify(usize::MAX);
74
75 // Notify all receive and stream operations.
76 self.recv_ops.notify(usize::MAX);
77 self.stream_ops.notify(usize::MAX);
78
79 true
80 } else {
81 false
82 }
83 }
84}
85
86/// Creates a bounded channel.
87///
88/// The created channel has space to hold at most `cap` messages at a time.
89///
90/// # Panics
91///
92/// Capacity must be a positive number. If `cap` is zero, this function will panic.
93///
94/// # Examples
95///
96/// ```
97/// # futures_lite::future::block_on(async {
98/// use async_channel::{bounded, TryRecvError, TrySendError};
99///
100/// let (s, r) = bounded(1);
101///
102/// assert_eq!(s.send(10).await, Ok(()));
103/// assert_eq!(s.try_send(20), Err(TrySendError::Full(20)));
104///
105/// assert_eq!(r.recv().await, Ok(10));
106/// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
107/// # });
108/// ```
109pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
110 assert!(cap > 0, "capacity cannot be zero");
111
112 let channel = Arc::new(Channel {
113 queue: ConcurrentQueue::bounded(cap),
114 send_ops: Event::new(),
115 recv_ops: Event::new(),
116 stream_ops: Event::new(),
117 sender_count: AtomicUsize::new(1),
118 receiver_count: AtomicUsize::new(1),
119 });
120
121 let s = Sender {
122 channel: channel.clone(),
123 };
124 let r = Receiver {
125 channel,
126 listener: None,
127 };
128 (s, r)
129}
130
131/// Creates an unbounded channel.
132///
133/// The created channel can hold an unlimited number of messages.
134///
135/// # Examples
136///
137/// ```
138/// # futures_lite::future::block_on(async {
139/// use async_channel::{unbounded, TryRecvError};
140///
141/// let (s, r) = unbounded();
142///
143/// assert_eq!(s.send(10).await, Ok(()));
144/// assert_eq!(s.send(20).await, Ok(()));
145///
146/// assert_eq!(r.recv().await, Ok(10));
147/// assert_eq!(r.recv().await, Ok(20));
148/// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
149/// # });
150/// ```
151pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
152 let channel = Arc::new(Channel {
153 queue: ConcurrentQueue::unbounded(),
154 send_ops: Event::new(),
155 recv_ops: Event::new(),
156 stream_ops: Event::new(),
157 sender_count: AtomicUsize::new(1),
158 receiver_count: AtomicUsize::new(1),
159 });
160
161 let s = Sender {
162 channel: channel.clone(),
163 };
164 let r = Receiver {
165 channel,
166 listener: None,
167 };
168 (s, r)
169}
170
171/// The sending side of a channel.
172///
173/// Senders can be cloned and shared among threads. When all senders associated with a channel are
174/// dropped, the channel becomes closed.
175///
176/// The channel can also be closed manually by calling [`Sender::close()`].
177pub struct Sender<T> {
178 /// Inner channel state.
179 channel: Arc<Channel<T>>,
180}
181
182impl<T> Sender<T> {
183 /// Attempts to send a message into the channel.
184 ///
185 /// If the channel is full or closed, this method returns an error.
186 ///
187 /// # Examples
188 ///
189 /// ```
190 /// use async_channel::{bounded, TrySendError};
191 ///
192 /// let (s, r) = bounded(1);
193 ///
194 /// assert_eq!(s.try_send(1), Ok(()));
195 /// assert_eq!(s.try_send(2), Err(TrySendError::Full(2)));
196 ///
197 /// drop(r);
198 /// assert_eq!(s.try_send(3), Err(TrySendError::Closed(3)));
199 /// ```
200 pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
201 match self.channel.queue.push(msg) {
202 Ok(()) => {
203 // Notify a blocked receive operation. If the notified operation gets canceled,
204 // it will notify another blocked receive operation.
205 self.channel.recv_ops.notify_additional(1);
206
207 // Notify all blocked streams.
208 self.channel.stream_ops.notify(usize::MAX);
209
210 Ok(())
211 }
212 Err(PushError::Full(msg)) => Err(TrySendError::Full(msg)),
213 Err(PushError::Closed(msg)) => Err(TrySendError::Closed(msg)),
214 }
215 }
216
217 /// Sends a message into the channel.
218 ///
219 /// If the channel is full, this method waits until there is space for a message.
220 ///
221 /// If the channel is closed, this method returns an error.
222 ///
223 /// # Examples
224 ///
225 /// ```
226 /// # futures_lite::future::block_on(async {
227 /// use async_channel::{unbounded, SendError};
228 ///
229 /// let (s, r) = unbounded();
230 ///
231 /// assert_eq!(s.send(1).await, Ok(()));
232 /// drop(r);
233 /// assert_eq!(s.send(2).await, Err(SendError(2)));
234 /// # });
235 /// ```
236 pub fn send(&self, msg: T) -> Send<'_, T> {
237 Send {
238 sender: self,
239 listener: None,
240 msg: Some(msg),
241 }
242 }
243
244 /// Sends a message into this channel using the blocking strategy.
245 ///
246 /// If the channel is full, this method will block until there is room.
247 /// If the channel is closed, this method returns an error.
248 ///
249 /// # Blocking
250 ///
251 /// Rather than using asynchronous waiting, like the [`send`](Self::send) method,
252 /// this method will block the current thread until the message is sent.
253 ///
254 /// This method should not be used in an asynchronous context. It is intended
255 /// to be used such that a channel can be used in both asynchronous and synchronous contexts.
256 /// Calling this method in an asynchronous context may result in deadlocks.
257 ///
258 /// # Examples
259 ///
260 /// ```
261 /// use async_channel::{unbounded, SendError};
262 ///
263 /// let (s, r) = unbounded();
264 ///
265 /// assert_eq!(s.send_blocking(1), Ok(()));
266 /// drop(r);
267 /// assert_eq!(s.send_blocking(2), Err(SendError(2)));
268 /// ```
269 pub fn send_blocking(&self, msg: T) -> Result<(), SendError<T>> {
270 self.send(msg).wait()
271 }
272
273 /// Closes the channel.
274 ///
275 /// Returns `true` if this call has closed the channel and it was not closed already.
276 ///
277 /// The remaining messages can still be received.
278 ///
279 /// # Examples
280 ///
281 /// ```
282 /// # futures_lite::future::block_on(async {
283 /// use async_channel::{unbounded, RecvError};
284 ///
285 /// let (s, r) = unbounded();
286 /// assert_eq!(s.send(1).await, Ok(()));
287 /// assert!(s.close());
288 ///
289 /// assert_eq!(r.recv().await, Ok(1));
290 /// assert_eq!(r.recv().await, Err(RecvError));
291 /// # });
292 /// ```
293 pub fn close(&self) -> bool {
294 self.channel.close()
295 }
296
297 /// Returns `true` if the channel is closed.
298 ///
299 /// # Examples
300 ///
301 /// ```
302 /// # futures_lite::future::block_on(async {
303 /// use async_channel::{unbounded, RecvError};
304 ///
305 /// let (s, r) = unbounded::<()>();
306 /// assert!(!s.is_closed());
307 ///
308 /// drop(r);
309 /// assert!(s.is_closed());
310 /// # });
311 /// ```
312 pub fn is_closed(&self) -> bool {
313 self.channel.queue.is_closed()
314 }
315
316 /// Returns `true` if the channel is empty.
317 ///
318 /// # Examples
319 ///
320 /// ```
321 /// # futures_lite::future::block_on(async {
322 /// use async_channel::unbounded;
323 ///
324 /// let (s, r) = unbounded();
325 ///
326 /// assert!(s.is_empty());
327 /// s.send(1).await;
328 /// assert!(!s.is_empty());
329 /// # });
330 /// ```
331 pub fn is_empty(&self) -> bool {
332 self.channel.queue.is_empty()
333 }
334
335 /// Returns `true` if the channel is full.
336 ///
337 /// Unbounded channels are never full.
338 ///
339 /// # Examples
340 ///
341 /// ```
342 /// # futures_lite::future::block_on(async {
343 /// use async_channel::bounded;
344 ///
345 /// let (s, r) = bounded(1);
346 ///
347 /// assert!(!s.is_full());
348 /// s.send(1).await;
349 /// assert!(s.is_full());
350 /// # });
351 /// ```
352 pub fn is_full(&self) -> bool {
353 self.channel.queue.is_full()
354 }
355
356 /// Returns the number of messages in the channel.
357 ///
358 /// # Examples
359 ///
360 /// ```
361 /// # futures_lite::future::block_on(async {
362 /// use async_channel::unbounded;
363 ///
364 /// let (s, r) = unbounded();
365 /// assert_eq!(s.len(), 0);
366 ///
367 /// s.send(1).await;
368 /// s.send(2).await;
369 /// assert_eq!(s.len(), 2);
370 /// # });
371 /// ```
372 pub fn len(&self) -> usize {
373 self.channel.queue.len()
374 }
375
376 /// Returns the channel capacity if it's bounded.
377 ///
378 /// # Examples
379 ///
380 /// ```
381 /// use async_channel::{bounded, unbounded};
382 ///
383 /// let (s, r) = bounded::<i32>(5);
384 /// assert_eq!(s.capacity(), Some(5));
385 ///
386 /// let (s, r) = unbounded::<i32>();
387 /// assert_eq!(s.capacity(), None);
388 /// ```
389 pub fn capacity(&self) -> Option<usize> {
390 self.channel.queue.capacity()
391 }
392
393 /// Returns the number of receivers for the channel.
394 ///
395 /// # Examples
396 ///
397 /// ```
398 /// # futures_lite::future::block_on(async {
399 /// use async_channel::unbounded;
400 ///
401 /// let (s, r) = unbounded::<()>();
402 /// assert_eq!(s.receiver_count(), 1);
403 ///
404 /// let r2 = r.clone();
405 /// assert_eq!(s.receiver_count(), 2);
406 /// # });
407 /// ```
408 pub fn receiver_count(&self) -> usize {
409 self.channel.receiver_count.load(Ordering::SeqCst)
410 }
411
412 /// Returns the number of senders for the channel.
413 ///
414 /// # Examples
415 ///
416 /// ```
417 /// # futures_lite::future::block_on(async {
418 /// use async_channel::unbounded;
419 ///
420 /// let (s, r) = unbounded::<()>();
421 /// assert_eq!(s.sender_count(), 1);
422 ///
423 /// let s2 = s.clone();
424 /// assert_eq!(s.sender_count(), 2);
425 /// # });
426 /// ```
427 pub fn sender_count(&self) -> usize {
428 self.channel.sender_count.load(Ordering::SeqCst)
429 }
430
431 /// Downgrade the sender to a weak reference.
432 pub fn downgrade(&self) -> WeakSender<T> {
433 WeakSender {
434 channel: self.channel.clone(),
435 }
436 }
437}
438
439impl<T> Drop for Sender<T> {
440 fn drop(&mut self) {
441 // Decrement the sender count and close the channel if it drops down to zero.
442 if self.channel.sender_count.fetch_sub(1, Ordering::AcqRel) == 1 {
443 self.channel.close();
444 }
445 }
446}
447
448impl<T> fmt::Debug for Sender<T> {
449 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
450 write!(f, "Sender {{ .. }}")
451 }
452}
453
454impl<T> Clone for Sender<T> {
455 fn clone(&self) -> Sender<T> {
456 let count = self.channel.sender_count.fetch_add(1, Ordering::Relaxed);
457
458 // Make sure the count never overflows, even if lots of sender clones are leaked.
459 if count > usize::MAX / 2 {
460 process::abort();
461 }
462
463 Sender {
464 channel: self.channel.clone(),
465 }
466 }
467}
468
469/// The receiving side of a channel.
470///
471/// Receivers can be cloned and shared among threads. When all receivers associated with a channel
472/// are dropped, the channel becomes closed.
473///
474/// The channel can also be closed manually by calling [`Receiver::close()`].
475///
476/// Receivers implement the [`Stream`] trait.
477pub struct Receiver<T> {
478 /// Inner channel state.
479 channel: Arc<Channel<T>>,
480
481 /// Listens for a send or close event to unblock this stream.
482 listener: Option<EventListener>,
483}
484
485impl<T> Receiver<T> {
486 /// Attempts to receive a message from the channel.
487 ///
488 /// If the channel is empty, or empty and closed, this method returns an error.
489 ///
490 /// # Examples
491 ///
492 /// ```
493 /// # futures_lite::future::block_on(async {
494 /// use async_channel::{unbounded, TryRecvError};
495 ///
496 /// let (s, r) = unbounded();
497 /// assert_eq!(s.send(1).await, Ok(()));
498 ///
499 /// assert_eq!(r.try_recv(), Ok(1));
500 /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
501 ///
502 /// drop(s);
503 /// assert_eq!(r.try_recv(), Err(TryRecvError::Closed));
504 /// # });
505 /// ```
506 pub fn try_recv(&self) -> Result<T, TryRecvError> {
507 match self.channel.queue.pop() {
508 Ok(msg) => {
509 // Notify a blocked send operation. If the notified operation gets canceled, it
510 // will notify another blocked send operation.
511 self.channel.send_ops.notify_additional(1);
512
513 Ok(msg)
514 }
515 Err(PopError::Empty) => Err(TryRecvError::Empty),
516 Err(PopError::Closed) => Err(TryRecvError::Closed),
517 }
518 }
519
520 /// Receives a message from the channel.
521 ///
522 /// If the channel is empty, this method waits until there is a message.
523 ///
524 /// If the channel is closed, this method receives a message or returns an error if there are
525 /// no more messages.
526 ///
527 /// # Examples
528 ///
529 /// ```
530 /// # futures_lite::future::block_on(async {
531 /// use async_channel::{unbounded, RecvError};
532 ///
533 /// let (s, r) = unbounded();
534 ///
535 /// assert_eq!(s.send(1).await, Ok(()));
536 /// drop(s);
537 ///
538 /// assert_eq!(r.recv().await, Ok(1));
539 /// assert_eq!(r.recv().await, Err(RecvError));
540 /// # });
541 /// ```
542 pub fn recv(&self) -> Recv<'_, T> {
543 Recv {
544 receiver: self,
545 listener: None,
546 }
547 }
548
549 /// Receives a message from the channel using the blocking strategy.
550 ///
551 /// If the channel is empty, this method waits until there is a message.
552 /// If the channel is closed, this method receives a message or returns an error if there are
553 /// no more messages.
554 ///
555 /// # Blocking
556 ///
557 /// Rather than using asynchronous waiting, like the [`recv`](Self::recv) method,
558 /// this method will block the current thread until the message is sent.
559 ///
560 /// This method should not be used in an asynchronous context. It is intended
561 /// to be used such that a channel can be used in both asynchronous and synchronous contexts.
562 /// Calling this method in an asynchronous context may result in deadlocks.
563 ///
564 /// # Examples
565 ///
566 /// ```
567 /// use async_channel::{unbounded, RecvError};
568 ///
569 /// let (s, r) = unbounded();
570 ///
571 /// assert_eq!(s.send_blocking(1), Ok(()));
572 /// drop(s);
573 ///
574 /// assert_eq!(r.recv_blocking(), Ok(1));
575 /// assert_eq!(r.recv_blocking(), Err(RecvError));
576 /// ```
577 pub fn recv_blocking(&self) -> Result<T, RecvError> {
578 self.recv().wait()
579 }
580
581 /// Closes the channel.
582 ///
583 /// Returns `true` if this call has closed the channel and it was not closed already.
584 ///
585 /// The remaining messages can still be received.
586 ///
587 /// # Examples
588 ///
589 /// ```
590 /// # futures_lite::future::block_on(async {
591 /// use async_channel::{unbounded, RecvError};
592 ///
593 /// let (s, r) = unbounded();
594 /// assert_eq!(s.send(1).await, Ok(()));
595 ///
596 /// assert!(r.close());
597 /// assert_eq!(r.recv().await, Ok(1));
598 /// assert_eq!(r.recv().await, Err(RecvError));
599 /// # });
600 /// ```
601 pub fn close(&self) -> bool {
602 self.channel.close()
603 }
604
605 /// Returns `true` if the channel is closed.
606 ///
607 /// # Examples
608 ///
609 /// ```
610 /// # futures_lite::future::block_on(async {
611 /// use async_channel::{unbounded, RecvError};
612 ///
613 /// let (s, r) = unbounded::<()>();
614 /// assert!(!r.is_closed());
615 ///
616 /// drop(s);
617 /// assert!(r.is_closed());
618 /// # });
619 /// ```
620 pub fn is_closed(&self) -> bool {
621 self.channel.queue.is_closed()
622 }
623
624 /// Returns `true` if the channel is empty.
625 ///
626 /// # Examples
627 ///
628 /// ```
629 /// # futures_lite::future::block_on(async {
630 /// use async_channel::unbounded;
631 ///
632 /// let (s, r) = unbounded();
633 ///
634 /// assert!(s.is_empty());
635 /// s.send(1).await;
636 /// assert!(!s.is_empty());
637 /// # });
638 /// ```
639 pub fn is_empty(&self) -> bool {
640 self.channel.queue.is_empty()
641 }
642
643 /// Returns `true` if the channel is full.
644 ///
645 /// Unbounded channels are never full.
646 ///
647 /// # Examples
648 ///
649 /// ```
650 /// # futures_lite::future::block_on(async {
651 /// use async_channel::bounded;
652 ///
653 /// let (s, r) = bounded(1);
654 ///
655 /// assert!(!r.is_full());
656 /// s.send(1).await;
657 /// assert!(r.is_full());
658 /// # });
659 /// ```
660 pub fn is_full(&self) -> bool {
661 self.channel.queue.is_full()
662 }
663
664 /// Returns the number of messages in the channel.
665 ///
666 /// # Examples
667 ///
668 /// ```
669 /// # futures_lite::future::block_on(async {
670 /// use async_channel::unbounded;
671 ///
672 /// let (s, r) = unbounded();
673 /// assert_eq!(r.len(), 0);
674 ///
675 /// s.send(1).await;
676 /// s.send(2).await;
677 /// assert_eq!(r.len(), 2);
678 /// # });
679 /// ```
680 pub fn len(&self) -> usize {
681 self.channel.queue.len()
682 }
683
684 /// Returns the channel capacity if it's bounded.
685 ///
686 /// # Examples
687 ///
688 /// ```
689 /// use async_channel::{bounded, unbounded};
690 ///
691 /// let (s, r) = bounded::<i32>(5);
692 /// assert_eq!(r.capacity(), Some(5));
693 ///
694 /// let (s, r) = unbounded::<i32>();
695 /// assert_eq!(r.capacity(), None);
696 /// ```
697 pub fn capacity(&self) -> Option<usize> {
698 self.channel.queue.capacity()
699 }
700
701 /// Returns the number of receivers for the channel.
702 ///
703 /// # Examples
704 ///
705 /// ```
706 /// # futures_lite::future::block_on(async {
707 /// use async_channel::unbounded;
708 ///
709 /// let (s, r) = unbounded::<()>();
710 /// assert_eq!(r.receiver_count(), 1);
711 ///
712 /// let r2 = r.clone();
713 /// assert_eq!(r.receiver_count(), 2);
714 /// # });
715 /// ```
716 pub fn receiver_count(&self) -> usize {
717 self.channel.receiver_count.load(Ordering::SeqCst)
718 }
719
720 /// Returns the number of senders for the channel.
721 ///
722 /// # Examples
723 ///
724 /// ```
725 /// # futures_lite::future::block_on(async {
726 /// use async_channel::unbounded;
727 ///
728 /// let (s, r) = unbounded::<()>();
729 /// assert_eq!(r.sender_count(), 1);
730 ///
731 /// let s2 = s.clone();
732 /// assert_eq!(r.sender_count(), 2);
733 /// # });
734 /// ```
735 pub fn sender_count(&self) -> usize {
736 self.channel.sender_count.load(Ordering::SeqCst)
737 }
738
739 /// Downgrade the receiver to a weak reference.
740 pub fn downgrade(&self) -> WeakReceiver<T> {
741 WeakReceiver {
742 channel: self.channel.clone(),
743 }
744 }
745}
746
747impl<T> Drop for Receiver<T> {
748 fn drop(&mut self) {
749 // Decrement the receiver count and close the channel if it drops down to zero.
750 if self.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 {
751 self.channel.close();
752 }
753 }
754}
755
756impl<T> fmt::Debug for Receiver<T> {
757 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
758 write!(f, "Receiver {{ .. }}")
759 }
760}
761
762impl<T> Clone for Receiver<T> {
763 fn clone(&self) -> Receiver<T> {
764 let count = self.channel.receiver_count.fetch_add(1, Ordering::Relaxed);
765
766 // Make sure the count never overflows, even if lots of receiver clones are leaked.
767 if count > usize::MAX / 2 {
768 process::abort();
769 }
770
771 Receiver {
772 channel: self.channel.clone(),
773 listener: None,
774 }
775 }
776}
777
778impl<T> Stream for Receiver<T> {
779 type Item = T;
780
781 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
782 loop {
783 // If this stream is listening for events, first wait for a notification.
784 if let Some(listener) = self.listener.as_mut() {
785 futures_core::ready!(Pin::new(listener).poll(cx));
786 self.listener = None;
787 }
788
789 loop {
790 // Attempt to receive a message.
791 match self.try_recv() {
792 Ok(msg) => {
793 // The stream is not blocked on an event - drop the listener.
794 self.listener = None;
795 return Poll::Ready(Some(msg));
796 }
797 Err(TryRecvError::Closed) => {
798 // The stream is not blocked on an event - drop the listener.
799 self.listener = None;
800 return Poll::Ready(None);
801 }
802 Err(TryRecvError::Empty) => {}
803 }
804
805 // Receiving failed - now start listening for notifications or wait for one.
806 match self.listener.as_mut() {
807 None => {
808 // Create a listener and try sending the message again.
809 self.listener = Some(self.channel.stream_ops.listen());
810 }
811 Some(_) => {
812 // Go back to the outer loop to poll the listener.
813 break;
814 }
815 }
816 }
817 }
818 }
819}
820
821impl<T> futures_core::stream::FusedStream for Receiver<T> {
822 fn is_terminated(&self) -> bool {
823 self.channel.queue.is_closed() && self.channel.queue.is_empty()
824 }
825}
826
827/// A [`Sender`] that prevents the channel from not being closed.
828///
829/// This is created through the [`Sender::downgrade`] method. In order to use it, it needs
830/// to be upgraded into a [`Sender`] through the `upgrade` method.
831#[derive(Clone)]
832pub struct WeakSender<T> {
833 channel: Arc<Channel<T>>,
834}
835
836impl<T> WeakSender<T> {
837 /// Upgrade the [`WeakSender`] into a [`Sender`].
838 pub fn upgrade(&self) -> Option<Sender<T>> {
839 if self.channel.queue.is_closed() {
840 None
841 } else {
842 let old_count = self.channel.sender_count.fetch_add(1, Ordering::Relaxed);
843 if old_count == 0 {
844 // Channel was closed while we were incrementing the count.
845 self.channel.sender_count.store(0, Ordering::Release);
846 None
847 } else if old_count > usize::MAX / 2 {
848 // Make sure the count never overflows, even if lots of sender clones are leaked.
849 process::abort();
850 } else {
851 Some(Sender {
852 channel: self.channel.clone(),
853 })
854 }
855 }
856 }
857}
858
859impl<T> fmt::Debug for WeakSender<T> {
860 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
861 write!(f, "WeakSender {{ .. }}")
862 }
863}
864
865/// A [`Receiver`] that prevents the channel from not being closed.
866///
867/// This is created through the [`Receiver::downgrade`] method. In order to use it, it needs
868/// to be upgraded into a [`Receiver`] through the `upgrade` method.
869#[derive(Clone)]
870pub struct WeakReceiver<T> {
871 channel: Arc<Channel<T>>,
872}
873
874impl<T> WeakReceiver<T> {
875 /// Upgrade the [`WeakReceiver`] into a [`Receiver`].
876 pub fn upgrade(&self) -> Option<Receiver<T>> {
877 if self.channel.queue.is_closed() {
878 None
879 } else {
880 let old_count = self.channel.receiver_count.fetch_add(1, Ordering::Relaxed);
881 if old_count == 0 {
882 // Channel was closed while we were incrementing the count.
883 self.channel.receiver_count.store(0, Ordering::Release);
884 None
885 } else if old_count > usize::MAX / 2 {
886 // Make sure the count never overflows, even if lots of receiver clones are leaked.
887 process::abort();
888 } else {
889 Some(Receiver {
890 channel: self.channel.clone(),
891 listener: None,
892 })
893 }
894 }
895 }
896}
897
898impl<T> fmt::Debug for WeakReceiver<T> {
899 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
900 write!(f, "WeakReceiver {{ .. }}")
901 }
902}
903
904/// An error returned from [`Sender::send()`].
905///
906/// Received because the channel is closed.
907#[derive(PartialEq, Eq, Clone, Copy)]
908pub struct SendError<T>(pub T);
909
910impl<T> SendError<T> {
911 /// Unwraps the message that couldn't be sent.
912 pub fn into_inner(self) -> T {
913 self.0
914 }
915}
916
917impl<T> error::Error for SendError<T> {}
918
919impl<T> fmt::Debug for SendError<T> {
920 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
921 write!(f, "SendError(..)")
922 }
923}
924
925impl<T> fmt::Display for SendError<T> {
926 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
927 write!(f, "sending into a closed channel")
928 }
929}
930
931/// An error returned from [`Sender::try_send()`].
932#[derive(PartialEq, Eq, Clone, Copy)]
933pub enum TrySendError<T> {
934 /// The channel is full but not closed.
935 Full(T),
936
937 /// The channel is closed.
938 Closed(T),
939}
940
941impl<T> TrySendError<T> {
942 /// Unwraps the message that couldn't be sent.
943 pub fn into_inner(self) -> T {
944 match self {
945 TrySendError::Full(t) => t,
946 TrySendError::Closed(t) => t,
947 }
948 }
949
950 /// Returns `true` if the channel is full but not closed.
951 pub fn is_full(&self) -> bool {
952 match self {
953 TrySendError::Full(_) => true,
954 TrySendError::Closed(_) => false,
955 }
956 }
957
958 /// Returns `true` if the channel is closed.
959 pub fn is_closed(&self) -> bool {
960 match self {
961 TrySendError::Full(_) => false,
962 TrySendError::Closed(_) => true,
963 }
964 }
965}
966
967impl<T> error::Error for TrySendError<T> {}
968
969impl<T> fmt::Debug for TrySendError<T> {
970 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
971 match *self {
972 TrySendError::Full(..) => write!(f, "Full(..)"),
973 TrySendError::Closed(..) => write!(f, "Closed(..)"),
974 }
975 }
976}
977
978impl<T> fmt::Display for TrySendError<T> {
979 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
980 match *self {
981 TrySendError::Full(..) => write!(f, "sending into a full channel"),
982 TrySendError::Closed(..) => write!(f, "sending into a closed channel"),
983 }
984 }
985}
986
987/// An error returned from [`Receiver::recv()`].
988///
989/// Received because the channel is empty and closed.
990#[derive(PartialEq, Eq, Clone, Copy, Debug)]
991pub struct RecvError;
992
993impl error::Error for RecvError {}
994
995impl fmt::Display for RecvError {
996 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
997 write!(f, "receiving from an empty and closed channel")
998 }
999}
1000
1001/// An error returned from [`Receiver::try_recv()`].
1002#[derive(PartialEq, Eq, Clone, Copy, Debug)]
1003pub enum TryRecvError {
1004 /// The channel is empty but not closed.
1005 Empty,
1006
1007 /// The channel is empty and closed.
1008 Closed,
1009}
1010
1011impl TryRecvError {
1012 /// Returns `true` if the channel is empty but not closed.
1013 pub fn is_empty(&self) -> bool {
1014 match self {
1015 TryRecvError::Empty => true,
1016 TryRecvError::Closed => false,
1017 }
1018 }
1019
1020 /// Returns `true` if the channel is empty and closed.
1021 pub fn is_closed(&self) -> bool {
1022 match self {
1023 TryRecvError::Empty => false,
1024 TryRecvError::Closed => true,
1025 }
1026 }
1027}
1028
1029impl error::Error for TryRecvError {}
1030
1031impl fmt::Display for TryRecvError {
1032 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1033 match *self {
1034 TryRecvError::Empty => write!(f, "receiving from an empty channel"),
1035 TryRecvError::Closed => write!(f, "receiving from an empty and closed channel"),
1036 }
1037 }
1038}
1039
1040/// A future returned by [`Sender::send()`].
1041#[derive(Debug)]
1042#[must_use = "futures do nothing unless you `.await` or poll them"]
1043pub struct Send<'a, T> {
1044 sender: &'a Sender<T>,
1045 listener: Option<EventListener>,
1046 msg: Option<T>,
1047}
1048
1049impl<'a, T> Send<'a, T> {
1050 /// Run this future with the given `Strategy`.
1051 fn run_with_strategy<S: Strategy>(
1052 &mut self,
1053 cx: &mut S::Context,
1054 ) -> Poll<Result<(), SendError<T>>> {
1055 loop {
1056 let msg = self.msg.take().unwrap();
1057 // Attempt to send a message.
1058 match self.sender.try_send(msg) {
1059 Ok(()) => return Poll::Ready(Ok(())),
1060 Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))),
1061 Err(TrySendError::Full(m)) => self.msg = Some(m),
1062 }
1063
1064 // Sending failed - now start listening for notifications or wait for one.
1065 match self.listener.take() {
1066 None => {
1067 // Start listening and then try sending again.
1068 self.listener = Some(self.sender.channel.send_ops.listen());
1069 }
1070 Some(l) => {
1071 // Poll using the given strategy
1072 if let Err(l) = S::poll(l, cx) {
1073 self.listener = Some(l);
1074 return Poll::Pending;
1075 }
1076 }
1077 }
1078 }
1079 }
1080
1081 /// Run using the blocking strategy.
1082 fn wait(mut self) -> Result<(), SendError<T>> {
1083 match self.run_with_strategy::<Blocking>(&mut ()) {
1084 Poll::Ready(res) => res,
1085 Poll::Pending => unreachable!(),
1086 }
1087 }
1088}
1089
1090impl<'a, T> Unpin for Send<'a, T> {}
1091
1092impl<'a, T> Future for Send<'a, T> {
1093 type Output = Result<(), SendError<T>>;
1094
1095 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1096 self.run_with_strategy::<NonBlocking<'_>>(cx)
1097 }
1098}
1099
1100/// A future returned by [`Receiver::recv()`].
1101#[derive(Debug)]
1102#[must_use = "futures do nothing unless you `.await` or poll them"]
1103pub struct Recv<'a, T> {
1104 receiver: &'a Receiver<T>,
1105 listener: Option<EventListener>,
1106}
1107
1108impl<'a, T> Unpin for Recv<'a, T> {}
1109
1110impl<'a, T> Recv<'a, T> {
1111 /// Run this future with the given `Strategy`.
1112 fn run_with_strategy<S: Strategy>(
1113 &mut self,
1114 cx: &mut S::Context,
1115 ) -> Poll<Result<T, RecvError>> {
1116 loop {
1117 // Attempt to receive a message.
1118 match self.receiver.try_recv() {
1119 Ok(msg) => return Poll::Ready(Ok(msg)),
1120 Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError)),
1121 Err(TryRecvError::Empty) => {}
1122 }
1123
1124 // Receiving failed - now start listening for notifications or wait for one.
1125 match self.listener.take() {
1126 None => {
1127 // Start listening and then try receiving again.
1128 self.listener = Some(self.receiver.channel.recv_ops.listen());
1129 }
1130 Some(l) => {
1131 // Poll using the given strategy.
1132 if let Err(l) = S::poll(l, cx) {
1133 self.listener = Some(l);
1134 return Poll::Pending;
1135 }
1136 }
1137 }
1138 }
1139 }
1140
1141 /// Run with the blocking strategy.
1142 fn wait(mut self) -> Result<T, RecvError> {
1143 match self.run_with_strategy::<Blocking>(&mut ()) {
1144 Poll::Ready(res) => res,
1145 Poll::Pending => unreachable!(),
1146 }
1147 }
1148}
1149
1150impl<'a, T> Future for Recv<'a, T> {
1151 type Output = Result<T, RecvError>;
1152
1153 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1154 self.run_with_strategy::<NonBlocking<'_>>(cx)
1155 }
1156}
1157
1158/// A strategy used to poll an `EventListener`.
1159trait Strategy {
1160 /// Context needed to be provided to the `poll` method.
1161 type Context;
1162
1163 /// Polls the given `EventListener`.
1164 ///
1165 /// Returns the `EventListener` back if it was not completed; otherwise,
1166 /// returns `Ok(())`.
1167 fn poll(evl: EventListener, cx: &mut Self::Context) -> Result<(), EventListener>;
1168}
1169
1170/// Non-blocking strategy for use in asynchronous code.
1171struct NonBlocking<'a>(&'a mut ());
1172
1173impl<'a> Strategy for NonBlocking<'a> {
1174 type Context = Context<'a>;
1175
1176 fn poll(mut evl: EventListener, cx: &mut Context<'a>) -> Result<(), EventListener> {
1177 match Pin::new(&mut evl).poll(cx) {
1178 Poll::Ready(()) => Ok(()),
1179 Poll::Pending => Err(evl),
1180 }
1181 }
1182}
1183
1184/// Blocking strategy for use in synchronous code.
1185struct Blocking;
1186
1187impl Strategy for Blocking {
1188 type Context = ();
1189
1190 fn poll(evl: EventListener, _cx: &mut ()) -> Result<(), EventListener> {
1191 evl.wait();
1192 Ok(())
1193 }
1194}