async_channel/
lib.rs

1//! An async multi-producer multi-consumer channel, where each message can be received by only
2//! one of all existing consumers.
3//!
4//! There are two kinds of channels:
5//!
6//! 1. [Bounded][`bounded()`] channel with limited capacity.
7//! 2. [Unbounded][`unbounded()`] channel with unlimited capacity.
8//!
9//! A channel has the [`Sender`] and [`Receiver`] side. Both sides are cloneable and can be shared
10//! among multiple threads.
11//!
12//! When all [`Sender`]s or all [`Receiver`]s are dropped, the channel becomes closed. When a
13//! channel is closed, no more messages can be sent, but remaining messages can still be received.
14//!
15//! The channel can also be closed manually by calling [`Sender::close()`] or
16//! [`Receiver::close()`].
17//!
18//! # Examples
19//!
20//! ```
21//! # futures_lite::future::block_on(async {
22//! let (s, r) = async_channel::unbounded();
23//!
24//! assert_eq!(s.send("Hello").await, Ok(()));
25//! assert_eq!(r.recv().await, Ok("Hello"));
26//! # });
27//! ```
28
29#![forbid(unsafe_code)]
30#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
31
32use std::error;
33use std::fmt;
34use std::future::Future;
35use std::pin::Pin;
36use std::process;
37use std::sync::atomic::{AtomicUsize, Ordering};
38use std::sync::Arc;
39use std::task::{Context, Poll};
40use std::usize;
41
42use concurrent_queue::{ConcurrentQueue, PopError, PushError};
43use event_listener::{Event, EventListener};
44use futures_core::stream::Stream;
45
46struct Channel<T> {
47    /// Inner message queue.
48    queue: ConcurrentQueue<T>,
49
50    /// Send operations waiting while the channel is full.
51    send_ops: Event,
52
53    /// Receive operations waiting while the channel is empty and not closed.
54    recv_ops: Event,
55
56    /// Stream operations while the channel is empty and not closed.
57    stream_ops: Event,
58
59    /// The number of currently active `Sender`s.
60    sender_count: AtomicUsize,
61
62    /// The number of currently active `Receivers`s.
63    receiver_count: AtomicUsize,
64}
65
66impl<T> Channel<T> {
67    /// Closes the channel and notifies all blocked operations.
68    ///
69    /// Returns `true` if this call has closed the channel and it was not closed already.
70    fn close(&self) -> bool {
71        if self.queue.close() {
72            // Notify all send operations.
73            self.send_ops.notify(usize::MAX);
74
75            // Notify all receive and stream operations.
76            self.recv_ops.notify(usize::MAX);
77            self.stream_ops.notify(usize::MAX);
78
79            true
80        } else {
81            false
82        }
83    }
84}
85
86/// Creates a bounded channel.
87///
88/// The created channel has space to hold at most `cap` messages at a time.
89///
90/// # Panics
91///
92/// Capacity must be a positive number. If `cap` is zero, this function will panic.
93///
94/// # Examples
95///
96/// ```
97/// # futures_lite::future::block_on(async {
98/// use async_channel::{bounded, TryRecvError, TrySendError};
99///
100/// let (s, r) = bounded(1);
101///
102/// assert_eq!(s.send(10).await, Ok(()));
103/// assert_eq!(s.try_send(20), Err(TrySendError::Full(20)));
104///
105/// assert_eq!(r.recv().await, Ok(10));
106/// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
107/// # });
108/// ```
109pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
110    assert!(cap > 0, "capacity cannot be zero");
111
112    let channel = Arc::new(Channel {
113        queue: ConcurrentQueue::bounded(cap),
114        send_ops: Event::new(),
115        recv_ops: Event::new(),
116        stream_ops: Event::new(),
117        sender_count: AtomicUsize::new(1),
118        receiver_count: AtomicUsize::new(1),
119    });
120
121    let s = Sender {
122        channel: channel.clone(),
123    };
124    let r = Receiver {
125        channel,
126        listener: None,
127    };
128    (s, r)
129}
130
131/// Creates an unbounded channel.
132///
133/// The created channel can hold an unlimited number of messages.
134///
135/// # Examples
136///
137/// ```
138/// # futures_lite::future::block_on(async {
139/// use async_channel::{unbounded, TryRecvError};
140///
141/// let (s, r) = unbounded();
142///
143/// assert_eq!(s.send(10).await, Ok(()));
144/// assert_eq!(s.send(20).await, Ok(()));
145///
146/// assert_eq!(r.recv().await, Ok(10));
147/// assert_eq!(r.recv().await, Ok(20));
148/// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
149/// # });
150/// ```
151pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
152    let channel = Arc::new(Channel {
153        queue: ConcurrentQueue::unbounded(),
154        send_ops: Event::new(),
155        recv_ops: Event::new(),
156        stream_ops: Event::new(),
157        sender_count: AtomicUsize::new(1),
158        receiver_count: AtomicUsize::new(1),
159    });
160
161    let s = Sender {
162        channel: channel.clone(),
163    };
164    let r = Receiver {
165        channel,
166        listener: None,
167    };
168    (s, r)
169}
170
171/// The sending side of a channel.
172///
173/// Senders can be cloned and shared among threads. When all senders associated with a channel are
174/// dropped, the channel becomes closed.
175///
176/// The channel can also be closed manually by calling [`Sender::close()`].
177pub struct Sender<T> {
178    /// Inner channel state.
179    channel: Arc<Channel<T>>,
180}
181
182impl<T> Sender<T> {
183    /// Attempts to send a message into the channel.
184    ///
185    /// If the channel is full or closed, this method returns an error.
186    ///
187    /// # Examples
188    ///
189    /// ```
190    /// use async_channel::{bounded, TrySendError};
191    ///
192    /// let (s, r) = bounded(1);
193    ///
194    /// assert_eq!(s.try_send(1), Ok(()));
195    /// assert_eq!(s.try_send(2), Err(TrySendError::Full(2)));
196    ///
197    /// drop(r);
198    /// assert_eq!(s.try_send(3), Err(TrySendError::Closed(3)));
199    /// ```
200    pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
201        match self.channel.queue.push(msg) {
202            Ok(()) => {
203                // Notify a blocked receive operation. If the notified operation gets canceled,
204                // it will notify another blocked receive operation.
205                self.channel.recv_ops.notify_additional(1);
206
207                // Notify all blocked streams.
208                self.channel.stream_ops.notify(usize::MAX);
209
210                Ok(())
211            }
212            Err(PushError::Full(msg)) => Err(TrySendError::Full(msg)),
213            Err(PushError::Closed(msg)) => Err(TrySendError::Closed(msg)),
214        }
215    }
216
217    /// Sends a message into the channel.
218    ///
219    /// If the channel is full, this method waits until there is space for a message.
220    ///
221    /// If the channel is closed, this method returns an error.
222    ///
223    /// # Examples
224    ///
225    /// ```
226    /// # futures_lite::future::block_on(async {
227    /// use async_channel::{unbounded, SendError};
228    ///
229    /// let (s, r) = unbounded();
230    ///
231    /// assert_eq!(s.send(1).await, Ok(()));
232    /// drop(r);
233    /// assert_eq!(s.send(2).await, Err(SendError(2)));
234    /// # });
235    /// ```
236    pub fn send(&self, msg: T) -> Send<'_, T> {
237        Send {
238            sender: self,
239            listener: None,
240            msg: Some(msg),
241        }
242    }
243
244    /// Sends a message into this channel using the blocking strategy.
245    ///
246    /// If the channel is full, this method will block until there is room.
247    /// If the channel is closed, this method returns an error.
248    ///
249    /// # Blocking
250    ///
251    /// Rather than using asynchronous waiting, like the [`send`](Self::send) method,
252    /// this method will block the current thread until the message is sent.
253    ///
254    /// This method should not be used in an asynchronous context. It is intended
255    /// to be used such that a channel can be used in both asynchronous and synchronous contexts.
256    /// Calling this method in an asynchronous context may result in deadlocks.
257    ///
258    /// # Examples
259    ///
260    /// ```
261    /// use async_channel::{unbounded, SendError};
262    ///
263    /// let (s, r) = unbounded();
264    ///
265    /// assert_eq!(s.send_blocking(1), Ok(()));
266    /// drop(r);
267    /// assert_eq!(s.send_blocking(2), Err(SendError(2)));
268    /// ```
269    pub fn send_blocking(&self, msg: T) -> Result<(), SendError<T>> {
270        self.send(msg).wait()
271    }
272
273    /// Closes the channel.
274    ///
275    /// Returns `true` if this call has closed the channel and it was not closed already.
276    ///
277    /// The remaining messages can still be received.
278    ///
279    /// # Examples
280    ///
281    /// ```
282    /// # futures_lite::future::block_on(async {
283    /// use async_channel::{unbounded, RecvError};
284    ///
285    /// let (s, r) = unbounded();
286    /// assert_eq!(s.send(1).await, Ok(()));
287    /// assert!(s.close());
288    ///
289    /// assert_eq!(r.recv().await, Ok(1));
290    /// assert_eq!(r.recv().await, Err(RecvError));
291    /// # });
292    /// ```
293    pub fn close(&self) -> bool {
294        self.channel.close()
295    }
296
297    /// Returns `true` if the channel is closed.
298    ///
299    /// # Examples
300    ///
301    /// ```
302    /// # futures_lite::future::block_on(async {
303    /// use async_channel::{unbounded, RecvError};
304    ///
305    /// let (s, r) = unbounded::<()>();
306    /// assert!(!s.is_closed());
307    ///
308    /// drop(r);
309    /// assert!(s.is_closed());
310    /// # });
311    /// ```
312    pub fn is_closed(&self) -> bool {
313        self.channel.queue.is_closed()
314    }
315
316    /// Returns `true` if the channel is empty.
317    ///
318    /// # Examples
319    ///
320    /// ```
321    /// # futures_lite::future::block_on(async {
322    /// use async_channel::unbounded;
323    ///
324    /// let (s, r) = unbounded();
325    ///
326    /// assert!(s.is_empty());
327    /// s.send(1).await;
328    /// assert!(!s.is_empty());
329    /// # });
330    /// ```
331    pub fn is_empty(&self) -> bool {
332        self.channel.queue.is_empty()
333    }
334
335    /// Returns `true` if the channel is full.
336    ///
337    /// Unbounded channels are never full.
338    ///
339    /// # Examples
340    ///
341    /// ```
342    /// # futures_lite::future::block_on(async {
343    /// use async_channel::bounded;
344    ///
345    /// let (s, r) = bounded(1);
346    ///
347    /// assert!(!s.is_full());
348    /// s.send(1).await;
349    /// assert!(s.is_full());
350    /// # });
351    /// ```
352    pub fn is_full(&self) -> bool {
353        self.channel.queue.is_full()
354    }
355
356    /// Returns the number of messages in the channel.
357    ///
358    /// # Examples
359    ///
360    /// ```
361    /// # futures_lite::future::block_on(async {
362    /// use async_channel::unbounded;
363    ///
364    /// let (s, r) = unbounded();
365    /// assert_eq!(s.len(), 0);
366    ///
367    /// s.send(1).await;
368    /// s.send(2).await;
369    /// assert_eq!(s.len(), 2);
370    /// # });
371    /// ```
372    pub fn len(&self) -> usize {
373        self.channel.queue.len()
374    }
375
376    /// Returns the channel capacity if it's bounded.
377    ///
378    /// # Examples
379    ///
380    /// ```
381    /// use async_channel::{bounded, unbounded};
382    ///
383    /// let (s, r) = bounded::<i32>(5);
384    /// assert_eq!(s.capacity(), Some(5));
385    ///
386    /// let (s, r) = unbounded::<i32>();
387    /// assert_eq!(s.capacity(), None);
388    /// ```
389    pub fn capacity(&self) -> Option<usize> {
390        self.channel.queue.capacity()
391    }
392
393    /// Returns the number of receivers for the channel.
394    ///
395    /// # Examples
396    ///
397    /// ```
398    /// # futures_lite::future::block_on(async {
399    /// use async_channel::unbounded;
400    ///
401    /// let (s, r) = unbounded::<()>();
402    /// assert_eq!(s.receiver_count(), 1);
403    ///
404    /// let r2 = r.clone();
405    /// assert_eq!(s.receiver_count(), 2);
406    /// # });
407    /// ```
408    pub fn receiver_count(&self) -> usize {
409        self.channel.receiver_count.load(Ordering::SeqCst)
410    }
411
412    /// Returns the number of senders for the channel.
413    ///
414    /// # Examples
415    ///
416    /// ```
417    /// # futures_lite::future::block_on(async {
418    /// use async_channel::unbounded;
419    ///
420    /// let (s, r) = unbounded::<()>();
421    /// assert_eq!(s.sender_count(), 1);
422    ///
423    /// let s2 = s.clone();
424    /// assert_eq!(s.sender_count(), 2);
425    /// # });
426    /// ```
427    pub fn sender_count(&self) -> usize {
428        self.channel.sender_count.load(Ordering::SeqCst)
429    }
430
431    /// Downgrade the sender to a weak reference.
432    pub fn downgrade(&self) -> WeakSender<T> {
433        WeakSender {
434            channel: self.channel.clone(),
435        }
436    }
437}
438
439impl<T> Drop for Sender<T> {
440    fn drop(&mut self) {
441        // Decrement the sender count and close the channel if it drops down to zero.
442        if self.channel.sender_count.fetch_sub(1, Ordering::AcqRel) == 1 {
443            self.channel.close();
444        }
445    }
446}
447
448impl<T> fmt::Debug for Sender<T> {
449    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
450        write!(f, "Sender {{ .. }}")
451    }
452}
453
454impl<T> Clone for Sender<T> {
455    fn clone(&self) -> Sender<T> {
456        let count = self.channel.sender_count.fetch_add(1, Ordering::Relaxed);
457
458        // Make sure the count never overflows, even if lots of sender clones are leaked.
459        if count > usize::MAX / 2 {
460            process::abort();
461        }
462
463        Sender {
464            channel: self.channel.clone(),
465        }
466    }
467}
468
469/// The receiving side of a channel.
470///
471/// Receivers can be cloned and shared among threads. When all receivers associated with a channel
472/// are dropped, the channel becomes closed.
473///
474/// The channel can also be closed manually by calling [`Receiver::close()`].
475///
476/// Receivers implement the [`Stream`] trait.
477pub struct Receiver<T> {
478    /// Inner channel state.
479    channel: Arc<Channel<T>>,
480
481    /// Listens for a send or close event to unblock this stream.
482    listener: Option<EventListener>,
483}
484
485impl<T> Receiver<T> {
486    /// Attempts to receive a message from the channel.
487    ///
488    /// If the channel is empty, or empty and closed, this method returns an error.
489    ///
490    /// # Examples
491    ///
492    /// ```
493    /// # futures_lite::future::block_on(async {
494    /// use async_channel::{unbounded, TryRecvError};
495    ///
496    /// let (s, r) = unbounded();
497    /// assert_eq!(s.send(1).await, Ok(()));
498    ///
499    /// assert_eq!(r.try_recv(), Ok(1));
500    /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
501    ///
502    /// drop(s);
503    /// assert_eq!(r.try_recv(), Err(TryRecvError::Closed));
504    /// # });
505    /// ```
506    pub fn try_recv(&self) -> Result<T, TryRecvError> {
507        match self.channel.queue.pop() {
508            Ok(msg) => {
509                // Notify a blocked send operation. If the notified operation gets canceled, it
510                // will notify another blocked send operation.
511                self.channel.send_ops.notify_additional(1);
512
513                Ok(msg)
514            }
515            Err(PopError::Empty) => Err(TryRecvError::Empty),
516            Err(PopError::Closed) => Err(TryRecvError::Closed),
517        }
518    }
519
520    /// Receives a message from the channel.
521    ///
522    /// If the channel is empty, this method waits until there is a message.
523    ///
524    /// If the channel is closed, this method receives a message or returns an error if there are
525    /// no more messages.
526    ///
527    /// # Examples
528    ///
529    /// ```
530    /// # futures_lite::future::block_on(async {
531    /// use async_channel::{unbounded, RecvError};
532    ///
533    /// let (s, r) = unbounded();
534    ///
535    /// assert_eq!(s.send(1).await, Ok(()));
536    /// drop(s);
537    ///
538    /// assert_eq!(r.recv().await, Ok(1));
539    /// assert_eq!(r.recv().await, Err(RecvError));
540    /// # });
541    /// ```
542    pub fn recv(&self) -> Recv<'_, T> {
543        Recv {
544            receiver: self,
545            listener: None,
546        }
547    }
548
549    /// Receives a message from the channel using the blocking strategy.
550    ///
551    /// If the channel is empty, this method waits until there is a message.
552    /// If the channel is closed, this method receives a message or returns an error if there are
553    /// no more messages.
554    ///
555    /// # Blocking
556    ///
557    /// Rather than using asynchronous waiting, like the [`recv`](Self::recv) method,
558    /// this method will block the current thread until the message is sent.
559    ///
560    /// This method should not be used in an asynchronous context. It is intended
561    /// to be used such that a channel can be used in both asynchronous and synchronous contexts.
562    /// Calling this method in an asynchronous context may result in deadlocks.
563    ///
564    /// # Examples
565    ///
566    /// ```
567    /// use async_channel::{unbounded, RecvError};
568    ///
569    /// let (s, r) = unbounded();
570    ///
571    /// assert_eq!(s.send_blocking(1), Ok(()));
572    /// drop(s);
573    ///
574    /// assert_eq!(r.recv_blocking(), Ok(1));
575    /// assert_eq!(r.recv_blocking(), Err(RecvError));
576    /// ```
577    pub fn recv_blocking(&self) -> Result<T, RecvError> {
578        self.recv().wait()
579    }
580
581    /// Closes the channel.
582    ///
583    /// Returns `true` if this call has closed the channel and it was not closed already.
584    ///
585    /// The remaining messages can still be received.
586    ///
587    /// # Examples
588    ///
589    /// ```
590    /// # futures_lite::future::block_on(async {
591    /// use async_channel::{unbounded, RecvError};
592    ///
593    /// let (s, r) = unbounded();
594    /// assert_eq!(s.send(1).await, Ok(()));
595    ///
596    /// assert!(r.close());
597    /// assert_eq!(r.recv().await, Ok(1));
598    /// assert_eq!(r.recv().await, Err(RecvError));
599    /// # });
600    /// ```
601    pub fn close(&self) -> bool {
602        self.channel.close()
603    }
604
605    /// Returns `true` if the channel is closed.
606    ///
607    /// # Examples
608    ///
609    /// ```
610    /// # futures_lite::future::block_on(async {
611    /// use async_channel::{unbounded, RecvError};
612    ///
613    /// let (s, r) = unbounded::<()>();
614    /// assert!(!r.is_closed());
615    ///
616    /// drop(s);
617    /// assert!(r.is_closed());
618    /// # });
619    /// ```
620    pub fn is_closed(&self) -> bool {
621        self.channel.queue.is_closed()
622    }
623
624    /// Returns `true` if the channel is empty.
625    ///
626    /// # Examples
627    ///
628    /// ```
629    /// # futures_lite::future::block_on(async {
630    /// use async_channel::unbounded;
631    ///
632    /// let (s, r) = unbounded();
633    ///
634    /// assert!(s.is_empty());
635    /// s.send(1).await;
636    /// assert!(!s.is_empty());
637    /// # });
638    /// ```
639    pub fn is_empty(&self) -> bool {
640        self.channel.queue.is_empty()
641    }
642
643    /// Returns `true` if the channel is full.
644    ///
645    /// Unbounded channels are never full.
646    ///
647    /// # Examples
648    ///
649    /// ```
650    /// # futures_lite::future::block_on(async {
651    /// use async_channel::bounded;
652    ///
653    /// let (s, r) = bounded(1);
654    ///
655    /// assert!(!r.is_full());
656    /// s.send(1).await;
657    /// assert!(r.is_full());
658    /// # });
659    /// ```
660    pub fn is_full(&self) -> bool {
661        self.channel.queue.is_full()
662    }
663
664    /// Returns the number of messages in the channel.
665    ///
666    /// # Examples
667    ///
668    /// ```
669    /// # futures_lite::future::block_on(async {
670    /// use async_channel::unbounded;
671    ///
672    /// let (s, r) = unbounded();
673    /// assert_eq!(r.len(), 0);
674    ///
675    /// s.send(1).await;
676    /// s.send(2).await;
677    /// assert_eq!(r.len(), 2);
678    /// # });
679    /// ```
680    pub fn len(&self) -> usize {
681        self.channel.queue.len()
682    }
683
684    /// Returns the channel capacity if it's bounded.
685    ///
686    /// # Examples
687    ///
688    /// ```
689    /// use async_channel::{bounded, unbounded};
690    ///
691    /// let (s, r) = bounded::<i32>(5);
692    /// assert_eq!(r.capacity(), Some(5));
693    ///
694    /// let (s, r) = unbounded::<i32>();
695    /// assert_eq!(r.capacity(), None);
696    /// ```
697    pub fn capacity(&self) -> Option<usize> {
698        self.channel.queue.capacity()
699    }
700
701    /// Returns the number of receivers for the channel.
702    ///
703    /// # Examples
704    ///
705    /// ```
706    /// # futures_lite::future::block_on(async {
707    /// use async_channel::unbounded;
708    ///
709    /// let (s, r) = unbounded::<()>();
710    /// assert_eq!(r.receiver_count(), 1);
711    ///
712    /// let r2 = r.clone();
713    /// assert_eq!(r.receiver_count(), 2);
714    /// # });
715    /// ```
716    pub fn receiver_count(&self) -> usize {
717        self.channel.receiver_count.load(Ordering::SeqCst)
718    }
719
720    /// Returns the number of senders for the channel.
721    ///
722    /// # Examples
723    ///
724    /// ```
725    /// # futures_lite::future::block_on(async {
726    /// use async_channel::unbounded;
727    ///
728    /// let (s, r) = unbounded::<()>();
729    /// assert_eq!(r.sender_count(), 1);
730    ///
731    /// let s2 = s.clone();
732    /// assert_eq!(r.sender_count(), 2);
733    /// # });
734    /// ```
735    pub fn sender_count(&self) -> usize {
736        self.channel.sender_count.load(Ordering::SeqCst)
737    }
738
739    /// Downgrade the receiver to a weak reference.
740    pub fn downgrade(&self) -> WeakReceiver<T> {
741        WeakReceiver {
742            channel: self.channel.clone(),
743        }
744    }
745}
746
747impl<T> Drop for Receiver<T> {
748    fn drop(&mut self) {
749        // Decrement the receiver count and close the channel if it drops down to zero.
750        if self.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 {
751            self.channel.close();
752        }
753    }
754}
755
756impl<T> fmt::Debug for Receiver<T> {
757    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
758        write!(f, "Receiver {{ .. }}")
759    }
760}
761
762impl<T> Clone for Receiver<T> {
763    fn clone(&self) -> Receiver<T> {
764        let count = self.channel.receiver_count.fetch_add(1, Ordering::Relaxed);
765
766        // Make sure the count never overflows, even if lots of receiver clones are leaked.
767        if count > usize::MAX / 2 {
768            process::abort();
769        }
770
771        Receiver {
772            channel: self.channel.clone(),
773            listener: None,
774        }
775    }
776}
777
778impl<T> Stream for Receiver<T> {
779    type Item = T;
780
781    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
782        loop {
783            // If this stream is listening for events, first wait for a notification.
784            if let Some(listener) = self.listener.as_mut() {
785                futures_core::ready!(Pin::new(listener).poll(cx));
786                self.listener = None;
787            }
788
789            loop {
790                // Attempt to receive a message.
791                match self.try_recv() {
792                    Ok(msg) => {
793                        // The stream is not blocked on an event - drop the listener.
794                        self.listener = None;
795                        return Poll::Ready(Some(msg));
796                    }
797                    Err(TryRecvError::Closed) => {
798                        // The stream is not blocked on an event - drop the listener.
799                        self.listener = None;
800                        return Poll::Ready(None);
801                    }
802                    Err(TryRecvError::Empty) => {}
803                }
804
805                // Receiving failed - now start listening for notifications or wait for one.
806                match self.listener.as_mut() {
807                    None => {
808                        // Create a listener and try sending the message again.
809                        self.listener = Some(self.channel.stream_ops.listen());
810                    }
811                    Some(_) => {
812                        // Go back to the outer loop to poll the listener.
813                        break;
814                    }
815                }
816            }
817        }
818    }
819}
820
821impl<T> futures_core::stream::FusedStream for Receiver<T> {
822    fn is_terminated(&self) -> bool {
823        self.channel.queue.is_closed() && self.channel.queue.is_empty()
824    }
825}
826
827/// A [`Sender`] that prevents the channel from not being closed.
828///
829/// This is created through the [`Sender::downgrade`] method. In order to use it, it needs
830/// to be upgraded into a [`Sender`] through the `upgrade` method.
831#[derive(Clone)]
832pub struct WeakSender<T> {
833    channel: Arc<Channel<T>>,
834}
835
836impl<T> WeakSender<T> {
837    /// Upgrade the [`WeakSender`] into a [`Sender`].
838    pub fn upgrade(&self) -> Option<Sender<T>> {
839        if self.channel.queue.is_closed() {
840            None
841        } else {
842            let old_count = self.channel.sender_count.fetch_add(1, Ordering::Relaxed);
843            if old_count == 0 {
844                // Channel was closed while we were incrementing the count.
845                self.channel.sender_count.store(0, Ordering::Release);
846                None
847            } else if old_count > usize::MAX / 2 {
848                // Make sure the count never overflows, even if lots of sender clones are leaked.
849                process::abort();
850            } else {
851                Some(Sender {
852                    channel: self.channel.clone(),
853                })
854            }
855        }
856    }
857}
858
859impl<T> fmt::Debug for WeakSender<T> {
860    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
861        write!(f, "WeakSender {{ .. }}")
862    }
863}
864
865/// A [`Receiver`] that prevents the channel from not being closed.
866///
867/// This is created through the [`Receiver::downgrade`] method. In order to use it, it needs
868/// to be upgraded into a [`Receiver`] through the `upgrade` method.
869#[derive(Clone)]
870pub struct WeakReceiver<T> {
871    channel: Arc<Channel<T>>,
872}
873
874impl<T> WeakReceiver<T> {
875    /// Upgrade the [`WeakReceiver`] into a [`Receiver`].
876    pub fn upgrade(&self) -> Option<Receiver<T>> {
877        if self.channel.queue.is_closed() {
878            None
879        } else {
880            let old_count = self.channel.receiver_count.fetch_add(1, Ordering::Relaxed);
881            if old_count == 0 {
882                // Channel was closed while we were incrementing the count.
883                self.channel.receiver_count.store(0, Ordering::Release);
884                None
885            } else if old_count > usize::MAX / 2 {
886                // Make sure the count never overflows, even if lots of receiver clones are leaked.
887                process::abort();
888            } else {
889                Some(Receiver {
890                    channel: self.channel.clone(),
891                    listener: None,
892                })
893            }
894        }
895    }
896}
897
898impl<T> fmt::Debug for WeakReceiver<T> {
899    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
900        write!(f, "WeakReceiver {{ .. }}")
901    }
902}
903
904/// An error returned from [`Sender::send()`].
905///
906/// Received because the channel is closed.
907#[derive(PartialEq, Eq, Clone, Copy)]
908pub struct SendError<T>(pub T);
909
910impl<T> SendError<T> {
911    /// Unwraps the message that couldn't be sent.
912    pub fn into_inner(self) -> T {
913        self.0
914    }
915}
916
917impl<T> error::Error for SendError<T> {}
918
919impl<T> fmt::Debug for SendError<T> {
920    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
921        write!(f, "SendError(..)")
922    }
923}
924
925impl<T> fmt::Display for SendError<T> {
926    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
927        write!(f, "sending into a closed channel")
928    }
929}
930
931/// An error returned from [`Sender::try_send()`].
932#[derive(PartialEq, Eq, Clone, Copy)]
933pub enum TrySendError<T> {
934    /// The channel is full but not closed.
935    Full(T),
936
937    /// The channel is closed.
938    Closed(T),
939}
940
941impl<T> TrySendError<T> {
942    /// Unwraps the message that couldn't be sent.
943    pub fn into_inner(self) -> T {
944        match self {
945            TrySendError::Full(t) => t,
946            TrySendError::Closed(t) => t,
947        }
948    }
949
950    /// Returns `true` if the channel is full but not closed.
951    pub fn is_full(&self) -> bool {
952        match self {
953            TrySendError::Full(_) => true,
954            TrySendError::Closed(_) => false,
955        }
956    }
957
958    /// Returns `true` if the channel is closed.
959    pub fn is_closed(&self) -> bool {
960        match self {
961            TrySendError::Full(_) => false,
962            TrySendError::Closed(_) => true,
963        }
964    }
965}
966
967impl<T> error::Error for TrySendError<T> {}
968
969impl<T> fmt::Debug for TrySendError<T> {
970    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
971        match *self {
972            TrySendError::Full(..) => write!(f, "Full(..)"),
973            TrySendError::Closed(..) => write!(f, "Closed(..)"),
974        }
975    }
976}
977
978impl<T> fmt::Display for TrySendError<T> {
979    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
980        match *self {
981            TrySendError::Full(..) => write!(f, "sending into a full channel"),
982            TrySendError::Closed(..) => write!(f, "sending into a closed channel"),
983        }
984    }
985}
986
987/// An error returned from [`Receiver::recv()`].
988///
989/// Received because the channel is empty and closed.
990#[derive(PartialEq, Eq, Clone, Copy, Debug)]
991pub struct RecvError;
992
993impl error::Error for RecvError {}
994
995impl fmt::Display for RecvError {
996    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
997        write!(f, "receiving from an empty and closed channel")
998    }
999}
1000
1001/// An error returned from [`Receiver::try_recv()`].
1002#[derive(PartialEq, Eq, Clone, Copy, Debug)]
1003pub enum TryRecvError {
1004    /// The channel is empty but not closed.
1005    Empty,
1006
1007    /// The channel is empty and closed.
1008    Closed,
1009}
1010
1011impl TryRecvError {
1012    /// Returns `true` if the channel is empty but not closed.
1013    pub fn is_empty(&self) -> bool {
1014        match self {
1015            TryRecvError::Empty => true,
1016            TryRecvError::Closed => false,
1017        }
1018    }
1019
1020    /// Returns `true` if the channel is empty and closed.
1021    pub fn is_closed(&self) -> bool {
1022        match self {
1023            TryRecvError::Empty => false,
1024            TryRecvError::Closed => true,
1025        }
1026    }
1027}
1028
1029impl error::Error for TryRecvError {}
1030
1031impl fmt::Display for TryRecvError {
1032    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1033        match *self {
1034            TryRecvError::Empty => write!(f, "receiving from an empty channel"),
1035            TryRecvError::Closed => write!(f, "receiving from an empty and closed channel"),
1036        }
1037    }
1038}
1039
1040/// A future returned by [`Sender::send()`].
1041#[derive(Debug)]
1042#[must_use = "futures do nothing unless you `.await` or poll them"]
1043pub struct Send<'a, T> {
1044    sender: &'a Sender<T>,
1045    listener: Option<EventListener>,
1046    msg: Option<T>,
1047}
1048
1049impl<'a, T> Send<'a, T> {
1050    /// Run this future with the given `Strategy`.
1051    fn run_with_strategy<S: Strategy>(
1052        &mut self,
1053        cx: &mut S::Context,
1054    ) -> Poll<Result<(), SendError<T>>> {
1055        loop {
1056            let msg = self.msg.take().unwrap();
1057            // Attempt to send a message.
1058            match self.sender.try_send(msg) {
1059                Ok(()) => return Poll::Ready(Ok(())),
1060                Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))),
1061                Err(TrySendError::Full(m)) => self.msg = Some(m),
1062            }
1063
1064            // Sending failed - now start listening for notifications or wait for one.
1065            match self.listener.take() {
1066                None => {
1067                    // Start listening and then try sending again.
1068                    self.listener = Some(self.sender.channel.send_ops.listen());
1069                }
1070                Some(l) => {
1071                    // Poll using the given strategy
1072                    if let Err(l) = S::poll(l, cx) {
1073                        self.listener = Some(l);
1074                        return Poll::Pending;
1075                    }
1076                }
1077            }
1078        }
1079    }
1080
1081    /// Run using the blocking strategy.
1082    fn wait(mut self) -> Result<(), SendError<T>> {
1083        match self.run_with_strategy::<Blocking>(&mut ()) {
1084            Poll::Ready(res) => res,
1085            Poll::Pending => unreachable!(),
1086        }
1087    }
1088}
1089
1090impl<'a, T> Unpin for Send<'a, T> {}
1091
1092impl<'a, T> Future for Send<'a, T> {
1093    type Output = Result<(), SendError<T>>;
1094
1095    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1096        self.run_with_strategy::<NonBlocking<'_>>(cx)
1097    }
1098}
1099
1100/// A future returned by [`Receiver::recv()`].
1101#[derive(Debug)]
1102#[must_use = "futures do nothing unless you `.await` or poll them"]
1103pub struct Recv<'a, T> {
1104    receiver: &'a Receiver<T>,
1105    listener: Option<EventListener>,
1106}
1107
1108impl<'a, T> Unpin for Recv<'a, T> {}
1109
1110impl<'a, T> Recv<'a, T> {
1111    /// Run this future with the given `Strategy`.
1112    fn run_with_strategy<S: Strategy>(
1113        &mut self,
1114        cx: &mut S::Context,
1115    ) -> Poll<Result<T, RecvError>> {
1116        loop {
1117            // Attempt to receive a message.
1118            match self.receiver.try_recv() {
1119                Ok(msg) => return Poll::Ready(Ok(msg)),
1120                Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError)),
1121                Err(TryRecvError::Empty) => {}
1122            }
1123
1124            // Receiving failed - now start listening for notifications or wait for one.
1125            match self.listener.take() {
1126                None => {
1127                    // Start listening and then try receiving again.
1128                    self.listener = Some(self.receiver.channel.recv_ops.listen());
1129                }
1130                Some(l) => {
1131                    // Poll using the given strategy.
1132                    if let Err(l) = S::poll(l, cx) {
1133                        self.listener = Some(l);
1134                        return Poll::Pending;
1135                    }
1136                }
1137            }
1138        }
1139    }
1140
1141    /// Run with the blocking strategy.
1142    fn wait(mut self) -> Result<T, RecvError> {
1143        match self.run_with_strategy::<Blocking>(&mut ()) {
1144            Poll::Ready(res) => res,
1145            Poll::Pending => unreachable!(),
1146        }
1147    }
1148}
1149
1150impl<'a, T> Future for Recv<'a, T> {
1151    type Output = Result<T, RecvError>;
1152
1153    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1154        self.run_with_strategy::<NonBlocking<'_>>(cx)
1155    }
1156}
1157
1158/// A strategy used to poll an `EventListener`.
1159trait Strategy {
1160    /// Context needed to be provided to the `poll` method.
1161    type Context;
1162
1163    /// Polls the given `EventListener`.
1164    ///
1165    /// Returns the `EventListener` back if it was not completed; otherwise,
1166    /// returns `Ok(())`.
1167    fn poll(evl: EventListener, cx: &mut Self::Context) -> Result<(), EventListener>;
1168}
1169
1170/// Non-blocking strategy for use in asynchronous code.
1171struct NonBlocking<'a>(&'a mut ());
1172
1173impl<'a> Strategy for NonBlocking<'a> {
1174    type Context = Context<'a>;
1175
1176    fn poll(mut evl: EventListener, cx: &mut Context<'a>) -> Result<(), EventListener> {
1177        match Pin::new(&mut evl).poll(cx) {
1178            Poll::Ready(()) => Ok(()),
1179            Poll::Pending => Err(evl),
1180        }
1181    }
1182}
1183
1184/// Blocking strategy for use in synchronous code.
1185struct Blocking;
1186
1187impl Strategy for Blocking {
1188    type Context = ();
1189
1190    fn poll(evl: EventListener, _cx: &mut ()) -> Result<(), EventListener> {
1191        evl.wait();
1192        Ok(())
1193    }
1194}