futures_channel/mpsc/
mod.rs

1//! A multi-producer, single-consumer queue for sending values across
2//! asynchronous tasks.
3//!
4//! Similarly to the `std`, channel creation provides [`Receiver`] and
5//! [`Sender`] handles. [`Receiver`] implements [`Stream`] and allows a task to
6//! read values out of the channel. If there is no message to read from the
7//! channel, the current task will be notified when a new value is sent.
8//! [`Sender`] implements the `Sink` trait and allows a task to send messages into
9//! the channel. If the channel is at capacity, the send will be rejected and
10//! the task will be notified when additional capacity is available. In other
11//! words, the channel provides backpressure.
12//!
13//! Unbounded channels are also available using the `unbounded` constructor.
14//!
15//! # Disconnection
16//!
17//! When all [`Sender`] handles have been dropped, it is no longer
18//! possible to send values into the channel. This is considered the termination
19//! event of the stream. As such, [`Receiver::poll_next`]
20//! will return `Ok(Ready(None))`.
21//!
22//! If the [`Receiver`] handle is dropped, then messages can no longer
23//! be read out of the channel. In this case, all further attempts to send will
24//! result in an error.
25//!
26//! # Clean Shutdown
27//!
28//! If the [`Receiver`] is simply dropped, then it is possible for
29//! there to be messages still in the channel that will not be processed. As
30//! such, it is usually desirable to perform a "clean" shutdown. To do this, the
31//! receiver will first call `close`, which will prevent any further messages to
32//! be sent into the channel. Then, the receiver consumes the channel to
33//! completion, at which point the receiver can be dropped.
34//!
35//! [`Sender`]: struct.Sender.html
36//! [`Receiver`]: struct.Receiver.html
37//! [`Stream`]: ../../futures_core/stream/trait.Stream.html
38//! [`Receiver::poll_next`]:
39//!     ../../futures_core/stream/trait.Stream.html#tymethod.poll_next
40
41// At the core, the channel uses an atomic FIFO queue for message passing. This
42// queue is used as the primary coordination primitive. In order to enforce
43// capacity limits and handle back pressure, a secondary FIFO queue is used to
44// send parked task handles.
45//
46// The general idea is that the channel is created with a `buffer` size of `n`.
47// The channel capacity is `n + num-senders`. Each sender gets one "guaranteed"
48// slot to hold a message. This allows `Sender` to know for a fact that a send
49// will succeed *before* starting to do the actual work of sending the value.
50// Since most of this work is lock-free, once the work starts, it is impossible
51// to safely revert.
52//
53// If the sender is unable to process a send operation, then the current
54// task is parked and the handle is sent on the parked task queue.
55//
56// Note that the implementation guarantees that the channel capacity will never
57// exceed the configured limit, however there is no *strict* guarantee that the
58// receiver will wake up a parked task *immediately* when a slot becomes
59// available. However, it will almost always unpark a task when a slot becomes
60// available and it is *guaranteed* that a sender will be unparked when the
61// message that caused the sender to become parked is read out of the channel.
62//
63// The steps for sending a message are roughly:
64//
65// 1) Increment the channel message count
66// 2) If the channel is at capacity, push the task handle onto the wait queue
67// 3) Push the message onto the message queue.
68//
69// The steps for receiving a message are roughly:
70//
71// 1) Pop a message from the message queue
72// 2) Pop a task handle from the wait queue
73// 3) Decrement the channel message count.
74//
75// It's important for the order of operations on lock-free structures to happen
76// in reverse order between the sender and receiver. This makes the message
77// queue the primary coordination structure and establishes the necessary
78// happens-before semantics required for the acquire / release semantics used
79// by the queue structure.
80
81use futures_core::stream::{FusedStream, Stream};
82use futures_core::task::__internal::AtomicWaker;
83use futures_core::task::{Context, Poll, Waker};
84use std::fmt;
85use std::pin::Pin;
86use std::sync::atomic::AtomicUsize;
87use std::sync::atomic::Ordering::SeqCst;
88use std::sync::{Arc, Mutex};
89use std::thread;
90
91use crate::mpsc::queue::Queue;
92
93mod queue;
94#[cfg(feature = "sink")]
95mod sink_impl;
96
97struct UnboundedSenderInner<T> {
98    // Channel state shared between the sender and receiver.
99    inner: Arc<UnboundedInner<T>>,
100}
101
102struct BoundedSenderInner<T> {
103    // Channel state shared between the sender and receiver.
104    inner: Arc<BoundedInner<T>>,
105
106    // Handle to the task that is blocked on this sender. This handle is sent
107    // to the receiver half in order to be notified when the sender becomes
108    // unblocked.
109    sender_task: Arc<Mutex<SenderTask>>,
110
111    // `true` if the sender might be blocked. This is an optimization to avoid
112    // having to lock the mutex most of the time.
113    maybe_parked: bool,
114}
115
116// We never project Pin<&mut SenderInner> to `Pin<&mut T>`
117impl<T> Unpin for UnboundedSenderInner<T> {}
118impl<T> Unpin for BoundedSenderInner<T> {}
119
120/// The transmission end of a bounded mpsc channel.
121///
122/// This value is created by the [`channel`] function.
123pub struct Sender<T>(Option<BoundedSenderInner<T>>);
124
125/// The transmission end of an unbounded mpsc channel.
126///
127/// This value is created by the [`unbounded`] function.
128pub struct UnboundedSender<T>(Option<UnboundedSenderInner<T>>);
129
130trait AssertKinds: Send + Sync + Clone {}
131impl AssertKinds for UnboundedSender<u32> {}
132
133/// The receiving end of a bounded mpsc channel.
134///
135/// This value is created by the [`channel`] function.
136pub struct Receiver<T> {
137    inner: Option<Arc<BoundedInner<T>>>,
138}
139
140/// The receiving end of an unbounded mpsc channel.
141///
142/// This value is created by the [`unbounded`] function.
143pub struct UnboundedReceiver<T> {
144    inner: Option<Arc<UnboundedInner<T>>>,
145}
146
147// `Pin<&mut UnboundedReceiver<T>>` is never projected to `Pin<&mut T>`
148impl<T> Unpin for UnboundedReceiver<T> {}
149
150/// The error type for [`Sender`s](Sender) used as `Sink`s.
151#[derive(Clone, Debug, PartialEq, Eq)]
152pub struct SendError {
153    kind: SendErrorKind,
154}
155
156/// The error type returned from [`try_send`](Sender::try_send).
157#[derive(Clone, PartialEq, Eq)]
158pub struct TrySendError<T> {
159    err: SendError,
160    val: T,
161}
162
163#[derive(Clone, Debug, PartialEq, Eq)]
164enum SendErrorKind {
165    Full,
166    Disconnected,
167}
168
169/// The error type returned from [`try_next`](Receiver::try_next).
170pub struct TryRecvError {
171    _priv: (),
172}
173
174impl fmt::Display for SendError {
175    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
176        if self.is_full() {
177            write!(f, "send failed because channel is full")
178        } else {
179            write!(f, "send failed because receiver is gone")
180        }
181    }
182}
183
184impl std::error::Error for SendError {}
185
186impl SendError {
187    /// Returns `true` if this error is a result of the channel being full.
188    pub fn is_full(&self) -> bool {
189        match self.kind {
190            SendErrorKind::Full => true,
191            _ => false,
192        }
193    }
194
195    /// Returns `true` if this error is a result of the receiver being dropped.
196    pub fn is_disconnected(&self) -> bool {
197        match self.kind {
198            SendErrorKind::Disconnected => true,
199            _ => false,
200        }
201    }
202}
203
204impl<T> fmt::Debug for TrySendError<T> {
205    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
206        f.debug_struct("TrySendError").field("kind", &self.err.kind).finish()
207    }
208}
209
210impl<T> fmt::Display for TrySendError<T> {
211    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212        if self.is_full() {
213            write!(f, "send failed because channel is full")
214        } else {
215            write!(f, "send failed because receiver is gone")
216        }
217    }
218}
219
220impl<T: core::any::Any> std::error::Error for TrySendError<T> {}
221
222impl<T> TrySendError<T> {
223    /// Returns `true` if this error is a result of the channel being full.
224    pub fn is_full(&self) -> bool {
225        self.err.is_full()
226    }
227
228    /// Returns `true` if this error is a result of the receiver being dropped.
229    pub fn is_disconnected(&self) -> bool {
230        self.err.is_disconnected()
231    }
232
233    /// Returns the message that was attempted to be sent but failed.
234    pub fn into_inner(self) -> T {
235        self.val
236    }
237
238    /// Drops the message and converts into a `SendError`.
239    pub fn into_send_error(self) -> SendError {
240        self.err
241    }
242}
243
244impl fmt::Debug for TryRecvError {
245    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
246        f.debug_tuple("TryRecvError").finish()
247    }
248}
249
250impl fmt::Display for TryRecvError {
251    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
252        write!(f, "receiver channel is empty")
253    }
254}
255
256impl std::error::Error for TryRecvError {}
257
258struct UnboundedInner<T> {
259    // Internal channel state. Consists of the number of messages stored in the
260    // channel as well as a flag signalling that the channel is closed.
261    state: AtomicUsize,
262
263    // Atomic, FIFO queue used to send messages to the receiver
264    message_queue: Queue<T>,
265
266    // Number of senders in existence
267    num_senders: AtomicUsize,
268
269    // Handle to the receiver's task.
270    recv_task: AtomicWaker,
271}
272
273struct BoundedInner<T> {
274    // Max buffer size of the channel. If `None` then the channel is unbounded.
275    buffer: usize,
276
277    // Internal channel state. Consists of the number of messages stored in the
278    // channel as well as a flag signalling that the channel is closed.
279    state: AtomicUsize,
280
281    // Atomic, FIFO queue used to send messages to the receiver
282    message_queue: Queue<T>,
283
284    // Atomic, FIFO queue used to send parked task handles to the receiver.
285    parked_queue: Queue<Arc<Mutex<SenderTask>>>,
286
287    // Number of senders in existence
288    num_senders: AtomicUsize,
289
290    // Handle to the receiver's task.
291    recv_task: AtomicWaker,
292}
293
294// Struct representation of `Inner::state`.
295#[derive(Clone, Copy)]
296struct State {
297    // `true` when the channel is open
298    is_open: bool,
299
300    // Number of messages in the channel
301    num_messages: usize,
302}
303
304// The `is_open` flag is stored in the left-most bit of `Inner::state`
305const OPEN_MASK: usize = usize::max_value() - (usize::max_value() >> 1);
306
307// When a new channel is created, it is created in the open state with no
308// pending messages.
309const INIT_STATE: usize = OPEN_MASK;
310
311// The maximum number of messages that a channel can track is `usize::max_value() >> 1`
312const MAX_CAPACITY: usize = !(OPEN_MASK);
313
314// The maximum requested buffer size must be less than the maximum capacity of
315// a channel. This is because each sender gets a guaranteed slot.
316const MAX_BUFFER: usize = MAX_CAPACITY >> 1;
317
318// Sent to the consumer to wake up blocked producers
319struct SenderTask {
320    task: Option<Waker>,
321    is_parked: bool,
322}
323
324impl SenderTask {
325    fn new() -> Self {
326        Self { task: None, is_parked: false }
327    }
328
329    fn notify(&mut self) {
330        self.is_parked = false;
331
332        if let Some(task) = self.task.take() {
333            task.wake();
334        }
335    }
336}
337
338/// Creates a bounded mpsc channel for communicating between asynchronous tasks.
339///
340/// Being bounded, this channel provides backpressure to ensure that the sender
341/// outpaces the receiver by only a limited amount. The channel's capacity is
342/// equal to `buffer + num-senders`. In other words, each sender gets a
343/// guaranteed slot in the channel capacity, and on top of that there are
344/// `buffer` "first come, first serve" slots available to all senders.
345///
346/// The [`Receiver`] returned implements the [`Stream`] trait, while [`Sender`]
347/// implements `Sink`.
348pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
349    // Check that the requested buffer size does not exceed the maximum buffer
350    // size permitted by the system.
351    assert!(buffer < MAX_BUFFER, "requested buffer size too large");
352
353    let inner = Arc::new(BoundedInner {
354        buffer,
355        state: AtomicUsize::new(INIT_STATE),
356        message_queue: Queue::new(),
357        parked_queue: Queue::new(),
358        num_senders: AtomicUsize::new(1),
359        recv_task: AtomicWaker::new(),
360    });
361
362    let tx = BoundedSenderInner {
363        inner: inner.clone(),
364        sender_task: Arc::new(Mutex::new(SenderTask::new())),
365        maybe_parked: false,
366    };
367
368    let rx = Receiver { inner: Some(inner) };
369
370    (Sender(Some(tx)), rx)
371}
372
373/// Creates an unbounded mpsc channel for communicating between asynchronous
374/// tasks.
375///
376/// A `send` on this channel will always succeed as long as the receive half has
377/// not been closed. If the receiver falls behind, messages will be arbitrarily
378/// buffered.
379///
380/// **Note** that the amount of available system memory is an implicit bound to
381/// the channel. Using an `unbounded` channel has the ability of causing the
382/// process to run out of memory. In this case, the process will be aborted.
383pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
384    let inner = Arc::new(UnboundedInner {
385        state: AtomicUsize::new(INIT_STATE),
386        message_queue: Queue::new(),
387        num_senders: AtomicUsize::new(1),
388        recv_task: AtomicWaker::new(),
389    });
390
391    let tx = UnboundedSenderInner { inner: inner.clone() };
392
393    let rx = UnboundedReceiver { inner: Some(inner) };
394
395    (UnboundedSender(Some(tx)), rx)
396}
397
398/*
399 *
400 * ===== impl Sender =====
401 *
402 */
403
404impl<T> UnboundedSenderInner<T> {
405    fn poll_ready_nb(&self) -> Poll<Result<(), SendError>> {
406        let state = decode_state(self.inner.state.load(SeqCst));
407        if state.is_open {
408            Poll::Ready(Ok(()))
409        } else {
410            Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected }))
411        }
412    }
413
414    // Push message to the queue and signal to the receiver
415    fn queue_push_and_signal(&self, msg: T) {
416        // Push the message onto the message queue
417        self.inner.message_queue.push(msg);
418
419        // Signal to the receiver that a message has been enqueued. If the
420        // receiver is parked, this will unpark the task.
421        self.inner.recv_task.wake();
422    }
423
424    // Increment the number of queued messages. Returns the resulting number.
425    fn inc_num_messages(&self) -> Option<usize> {
426        let mut curr = self.inner.state.load(SeqCst);
427
428        loop {
429            let mut state = decode_state(curr);
430
431            // The receiver end closed the channel.
432            if !state.is_open {
433                return None;
434            }
435
436            // This probably is never hit? Odds are the process will run out of
437            // memory first. It may be worth to return something else in this
438            // case?
439            assert!(
440                state.num_messages < MAX_CAPACITY,
441                "buffer space \
442                    exhausted; sending this messages would overflow the state"
443            );
444
445            state.num_messages += 1;
446
447            let next = encode_state(&state);
448            match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
449                Ok(_) => return Some(state.num_messages),
450                Err(actual) => curr = actual,
451            }
452        }
453    }
454
455    /// Returns whether the senders send to the same receiver.
456    fn same_receiver(&self, other: &Self) -> bool {
457        Arc::ptr_eq(&self.inner, &other.inner)
458    }
459
460    /// Returns whether the sender send to this receiver.
461    fn is_connected_to(&self, inner: &Arc<UnboundedInner<T>>) -> bool {
462        Arc::ptr_eq(&self.inner, inner)
463    }
464
465    /// Returns pointer to the Arc containing sender
466    ///
467    /// The returned pointer is not referenced and should be only used for hashing!
468    fn ptr(&self) -> *const UnboundedInner<T> {
469        &*self.inner
470    }
471
472    /// Returns whether this channel is closed without needing a context.
473    fn is_closed(&self) -> bool {
474        !decode_state(self.inner.state.load(SeqCst)).is_open
475    }
476
477    /// Closes this channel from the sender side, preventing any new messages.
478    fn close_channel(&self) {
479        // There's no need to park this sender, its dropping,
480        // and we don't want to check for capacity, so skip
481        // that stuff from `do_send`.
482
483        self.inner.set_closed();
484        self.inner.recv_task.wake();
485    }
486}
487
488impl<T> BoundedSenderInner<T> {
489    /// Attempts to send a message on this `Sender`, returning the message
490    /// if there was an error.
491    fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
492        // If the sender is currently blocked, reject the message
493        if !self.poll_unparked(None).is_ready() {
494            return Err(TrySendError { err: SendError { kind: SendErrorKind::Full }, val: msg });
495        }
496
497        // The channel has capacity to accept the message, so send it
498        self.do_send_b(msg)
499    }
500
501    // Do the send without failing.
502    // Can be called only by bounded sender.
503    fn do_send_b(&mut self, msg: T) -> Result<(), TrySendError<T>> {
504        // Anyone calling do_send *should* make sure there is room first,
505        // but assert here for tests as a sanity check.
506        debug_assert!(self.poll_unparked(None).is_ready());
507
508        // First, increment the number of messages contained by the channel.
509        // This operation will also atomically determine if the sender task
510        // should be parked.
511        //
512        // `None` is returned in the case that the channel has been closed by the
513        // receiver. This happens when `Receiver::close` is called or the
514        // receiver is dropped.
515        let park_self = match self.inc_num_messages() {
516            Some(num_messages) => {
517                // Block if the current number of pending messages has exceeded
518                // the configured buffer size
519                num_messages > self.inner.buffer
520            }
521            None => {
522                return Err(TrySendError {
523                    err: SendError { kind: SendErrorKind::Disconnected },
524                    val: msg,
525                })
526            }
527        };
528
529        // If the channel has reached capacity, then the sender task needs to
530        // be parked. This will send the task handle on the parked task queue.
531        //
532        // However, when `do_send` is called while dropping the `Sender`,
533        // `task::current()` can't be called safely. In this case, in order to
534        // maintain internal consistency, a blank message is pushed onto the
535        // parked task queue.
536        if park_self {
537            self.park();
538        }
539
540        self.queue_push_and_signal(msg);
541
542        Ok(())
543    }
544
545    // Push message to the queue and signal to the receiver
546    fn queue_push_and_signal(&self, msg: T) {
547        // Push the message onto the message queue
548        self.inner.message_queue.push(msg);
549
550        // Signal to the receiver that a message has been enqueued. If the
551        // receiver is parked, this will unpark the task.
552        self.inner.recv_task.wake();
553    }
554
555    // Increment the number of queued messages. Returns the resulting number.
556    fn inc_num_messages(&self) -> Option<usize> {
557        let mut curr = self.inner.state.load(SeqCst);
558
559        loop {
560            let mut state = decode_state(curr);
561
562            // The receiver end closed the channel.
563            if !state.is_open {
564                return None;
565            }
566
567            // This probably is never hit? Odds are the process will run out of
568            // memory first. It may be worth to return something else in this
569            // case?
570            assert!(
571                state.num_messages < MAX_CAPACITY,
572                "buffer space \
573                    exhausted; sending this messages would overflow the state"
574            );
575
576            state.num_messages += 1;
577
578            let next = encode_state(&state);
579            match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
580                Ok(_) => return Some(state.num_messages),
581                Err(actual) => curr = actual,
582            }
583        }
584    }
585
586    fn park(&mut self) {
587        {
588            let mut sender = self.sender_task.lock().unwrap();
589            sender.task = None;
590            sender.is_parked = true;
591        }
592
593        // Send handle over queue
594        let t = self.sender_task.clone();
595        self.inner.parked_queue.push(t);
596
597        // Check to make sure we weren't closed after we sent our task on the
598        // queue
599        let state = decode_state(self.inner.state.load(SeqCst));
600        self.maybe_parked = state.is_open;
601    }
602
603    /// Polls the channel to determine if there is guaranteed capacity to send
604    /// at least one item without waiting.
605    ///
606    /// # Return value
607    ///
608    /// This method returns:
609    ///
610    /// - `Poll::Ready(Ok(_))` if there is sufficient capacity;
611    /// - `Poll::Pending` if the channel may not have
612    ///   capacity, in which case the current task is queued to be notified once
613    ///   capacity is available;
614    /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
615    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
616        let state = decode_state(self.inner.state.load(SeqCst));
617        if !state.is_open {
618            return Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected }));
619        }
620
621        self.poll_unparked(Some(cx)).map(Ok)
622    }
623
624    /// Returns whether the senders send to the same receiver.
625    fn same_receiver(&self, other: &Self) -> bool {
626        Arc::ptr_eq(&self.inner, &other.inner)
627    }
628
629    /// Returns whether the sender send to this receiver.
630    fn is_connected_to(&self, receiver: &Arc<BoundedInner<T>>) -> bool {
631        Arc::ptr_eq(&self.inner, receiver)
632    }
633
634    /// Returns pointer to the Arc containing sender
635    ///
636    /// The returned pointer is not referenced and should be only used for hashing!
637    fn ptr(&self) -> *const BoundedInner<T> {
638        &*self.inner
639    }
640
641    /// Returns whether this channel is closed without needing a context.
642    fn is_closed(&self) -> bool {
643        !decode_state(self.inner.state.load(SeqCst)).is_open
644    }
645
646    /// Closes this channel from the sender side, preventing any new messages.
647    fn close_channel(&self) {
648        // There's no need to park this sender, its dropping,
649        // and we don't want to check for capacity, so skip
650        // that stuff from `do_send`.
651
652        self.inner.set_closed();
653        self.inner.recv_task.wake();
654    }
655
656    fn poll_unparked(&mut self, cx: Option<&mut Context<'_>>) -> Poll<()> {
657        // First check the `maybe_parked` variable. This avoids acquiring the
658        // lock in most cases
659        if self.maybe_parked {
660            // Get a lock on the task handle
661            let mut task = self.sender_task.lock().unwrap();
662
663            if !task.is_parked {
664                self.maybe_parked = false;
665                return Poll::Ready(());
666            }
667
668            // At this point, an unpark request is pending, so there will be an
669            // unpark sometime in the future. We just need to make sure that
670            // the correct task will be notified.
671            //
672            // Update the task in case the `Sender` has been moved to another
673            // task
674            task.task = cx.map(|cx| cx.waker().clone());
675
676            Poll::Pending
677        } else {
678            Poll::Ready(())
679        }
680    }
681}
682
683impl<T> Sender<T> {
684    /// Attempts to send a message on this `Sender`, returning the message
685    /// if there was an error.
686    pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
687        if let Some(inner) = &mut self.0 {
688            inner.try_send(msg)
689        } else {
690            Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
691        }
692    }
693
694    /// Send a message on the channel.
695    ///
696    /// This function should only be called after
697    /// [`poll_ready`](Sender::poll_ready) has reported that the channel is
698    /// ready to receive a message.
699    pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
700        self.try_send(msg).map_err(|e| e.err)
701    }
702
703    /// Polls the channel to determine if there is guaranteed capacity to send
704    /// at least one item without waiting.
705    ///
706    /// # Return value
707    ///
708    /// This method returns:
709    ///
710    /// - `Poll::Ready(Ok(_))` if there is sufficient capacity;
711    /// - `Poll::Pending` if the channel may not have
712    ///   capacity, in which case the current task is queued to be notified once
713    ///   capacity is available;
714    /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
715    pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
716        let inner = self.0.as_mut().ok_or(SendError { kind: SendErrorKind::Disconnected })?;
717        inner.poll_ready(cx)
718    }
719
720    /// Returns whether this channel is closed without needing a context.
721    pub fn is_closed(&self) -> bool {
722        self.0.as_ref().map(BoundedSenderInner::is_closed).unwrap_or(true)
723    }
724
725    /// Closes this channel from the sender side, preventing any new messages.
726    pub fn close_channel(&mut self) {
727        if let Some(inner) = &mut self.0 {
728            inner.close_channel();
729        }
730    }
731
732    /// Disconnects this sender from the channel, closing it if there are no more senders left.
733    pub fn disconnect(&mut self) {
734        self.0 = None;
735    }
736
737    /// Returns whether the senders send to the same receiver.
738    pub fn same_receiver(&self, other: &Self) -> bool {
739        match (&self.0, &other.0) {
740            (Some(inner), Some(other)) => inner.same_receiver(other),
741            _ => false,
742        }
743    }
744
745    /// Returns whether the sender send to this receiver.
746    pub fn is_connected_to(&self, receiver: &Receiver<T>) -> bool {
747        match (&self.0, &receiver.inner) {
748            (Some(inner), Some(receiver)) => inner.is_connected_to(receiver),
749            _ => false,
750        }
751    }
752
753    /// Hashes the receiver into the provided hasher
754    pub fn hash_receiver<H>(&self, hasher: &mut H)
755    where
756        H: std::hash::Hasher,
757    {
758        use std::hash::Hash;
759
760        let ptr = self.0.as_ref().map(|inner| inner.ptr());
761        ptr.hash(hasher);
762    }
763}
764
765impl<T> UnboundedSender<T> {
766    /// Check if the channel is ready to receive a message.
767    pub fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), SendError>> {
768        let inner = self.0.as_ref().ok_or(SendError { kind: SendErrorKind::Disconnected })?;
769        inner.poll_ready_nb()
770    }
771
772    /// Returns whether this channel is closed without needing a context.
773    pub fn is_closed(&self) -> bool {
774        self.0.as_ref().map(UnboundedSenderInner::is_closed).unwrap_or(true)
775    }
776
777    /// Closes this channel from the sender side, preventing any new messages.
778    pub fn close_channel(&self) {
779        if let Some(inner) = &self.0 {
780            inner.close_channel();
781        }
782    }
783
784    /// Disconnects this sender from the channel, closing it if there are no more senders left.
785    pub fn disconnect(&mut self) {
786        self.0 = None;
787    }
788
789    // Do the send without parking current task.
790    fn do_send_nb(&self, msg: T) -> Result<(), TrySendError<T>> {
791        if let Some(inner) = &self.0 {
792            if inner.inc_num_messages().is_some() {
793                inner.queue_push_and_signal(msg);
794                return Ok(());
795            }
796        }
797
798        Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
799    }
800
801    /// Send a message on the channel.
802    ///
803    /// This method should only be called after `poll_ready` has been used to
804    /// verify that the channel is ready to receive a message.
805    pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
806        self.do_send_nb(msg).map_err(|e| e.err)
807    }
808
809    /// Sends a message along this channel.
810    ///
811    /// This is an unbounded sender, so this function differs from `Sink::send`
812    /// by ensuring the return type reflects that the channel is always ready to
813    /// receive messages.
814    pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> {
815        self.do_send_nb(msg)
816    }
817
818    /// Returns whether the senders send to the same receiver.
819    pub fn same_receiver(&self, other: &Self) -> bool {
820        match (&self.0, &other.0) {
821            (Some(inner), Some(other)) => inner.same_receiver(other),
822            _ => false,
823        }
824    }
825
826    /// Returns whether the sender send to this receiver.
827    pub fn is_connected_to(&self, receiver: &UnboundedReceiver<T>) -> bool {
828        match (&self.0, &receiver.inner) {
829            (Some(inner), Some(receiver)) => inner.is_connected_to(receiver),
830            _ => false,
831        }
832    }
833
834    /// Hashes the receiver into the provided hasher
835    pub fn hash_receiver<H>(&self, hasher: &mut H)
836    where
837        H: std::hash::Hasher,
838    {
839        use std::hash::Hash;
840
841        let ptr = self.0.as_ref().map(|inner| inner.ptr());
842        ptr.hash(hasher);
843    }
844
845    /// Return the number of messages in the queue or 0 if channel is disconnected.
846    pub fn len(&self) -> usize {
847        if let Some(sender) = &self.0 {
848            decode_state(sender.inner.state.load(SeqCst)).num_messages
849        } else {
850            0
851        }
852    }
853
854    /// Return false is channel has no queued messages, true otherwise.
855    pub fn is_empty(&self) -> bool {
856        self.len() == 0
857    }
858}
859
860impl<T> Clone for Sender<T> {
861    fn clone(&self) -> Self {
862        Self(self.0.clone())
863    }
864}
865
866impl<T> Clone for UnboundedSender<T> {
867    fn clone(&self) -> Self {
868        Self(self.0.clone())
869    }
870}
871
872impl<T> Clone for UnboundedSenderInner<T> {
873    fn clone(&self) -> Self {
874        // Since this atomic op isn't actually guarding any memory and we don't
875        // care about any orderings besides the ordering on the single atomic
876        // variable, a relaxed ordering is acceptable.
877        let mut curr = self.inner.num_senders.load(SeqCst);
878
879        loop {
880            // If the maximum number of senders has been reached, then fail
881            if curr == MAX_BUFFER {
882                panic!("cannot clone `Sender` -- too many outstanding senders");
883            }
884
885            debug_assert!(curr < MAX_BUFFER);
886
887            let next = curr + 1;
888            match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) {
889                Ok(_) => {
890                    // The ABA problem doesn't matter here. We only care that the
891                    // number of senders never exceeds the maximum.
892                    return Self { inner: self.inner.clone() };
893                }
894                Err(actual) => curr = actual,
895            }
896        }
897    }
898}
899
900impl<T> Clone for BoundedSenderInner<T> {
901    fn clone(&self) -> Self {
902        // Since this atomic op isn't actually guarding any memory and we don't
903        // care about any orderings besides the ordering on the single atomic
904        // variable, a relaxed ordering is acceptable.
905        let mut curr = self.inner.num_senders.load(SeqCst);
906
907        loop {
908            // If the maximum number of senders has been reached, then fail
909            if curr == self.inner.max_senders() {
910                panic!("cannot clone `Sender` -- too many outstanding senders");
911            }
912
913            debug_assert!(curr < self.inner.max_senders());
914
915            let next = curr + 1;
916            match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) {
917                Ok(_) => {
918                    // The ABA problem doesn't matter here. We only care that the
919                    // number of senders never exceeds the maximum.
920                    return Self {
921                        inner: self.inner.clone(),
922                        sender_task: Arc::new(Mutex::new(SenderTask::new())),
923                        maybe_parked: false,
924                    };
925                }
926                Err(actual) => curr = actual,
927            }
928        }
929    }
930}
931
932impl<T> Drop for UnboundedSenderInner<T> {
933    fn drop(&mut self) {
934        // Ordering between variables don't matter here
935        let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
936
937        if prev == 1 {
938            self.close_channel();
939        }
940    }
941}
942
943impl<T> Drop for BoundedSenderInner<T> {
944    fn drop(&mut self) {
945        // Ordering between variables don't matter here
946        let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
947
948        if prev == 1 {
949            self.close_channel();
950        }
951    }
952}
953
954impl<T> fmt::Debug for Sender<T> {
955    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
956        f.debug_struct("Sender").field("closed", &self.is_closed()).finish()
957    }
958}
959
960impl<T> fmt::Debug for UnboundedSender<T> {
961    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
962        f.debug_struct("UnboundedSender").field("closed", &self.is_closed()).finish()
963    }
964}
965
966/*
967 *
968 * ===== impl Receiver =====
969 *
970 */
971
972impl<T> Receiver<T> {
973    /// Closes the receiving half of a channel, without dropping it.
974    ///
975    /// This prevents any further messages from being sent on the channel while
976    /// still enabling the receiver to drain messages that are buffered.
977    pub fn close(&mut self) {
978        if let Some(inner) = &mut self.inner {
979            inner.set_closed();
980
981            // Wake up any threads waiting as they'll see that we've closed the
982            // channel and will continue on their merry way.
983            while let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
984                task.lock().unwrap().notify();
985            }
986        }
987    }
988
989    /// Tries to receive the next message without notifying a context if empty.
990    ///
991    /// It is not recommended to call this function from inside of a future,
992    /// only when you've otherwise arranged to be notified when the channel is
993    /// no longer empty.
994    ///
995    /// This function returns:
996    /// * `Ok(Some(t))` when message is fetched
997    /// * `Ok(None)` when channel is closed and no messages left in the queue
998    /// * `Err(e)` when there are no messages available, but channel is not yet closed
999    pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
1000        match self.next_message() {
1001            Poll::Ready(msg) => Ok(msg),
1002            Poll::Pending => Err(TryRecvError { _priv: () }),
1003        }
1004    }
1005
1006    fn next_message(&mut self) -> Poll<Option<T>> {
1007        let inner = match self.inner.as_mut() {
1008            None => return Poll::Ready(None),
1009            Some(inner) => inner,
1010        };
1011        // Pop off a message
1012        match unsafe { inner.message_queue.pop_spin() } {
1013            Some(msg) => {
1014                // If there are any parked task handles in the parked queue,
1015                // pop one and unpark it.
1016                self.unpark_one();
1017
1018                // Decrement number of messages
1019                self.dec_num_messages();
1020
1021                Poll::Ready(Some(msg))
1022            }
1023            None => {
1024                let state = decode_state(inner.state.load(SeqCst));
1025                if state.is_closed() {
1026                    // If closed flag is set AND there are no pending messages
1027                    // it means end of stream
1028                    self.inner = None;
1029                    Poll::Ready(None)
1030                } else {
1031                    // If queue is open, we need to return Pending
1032                    // to be woken up when new messages arrive.
1033                    // If queue is closed but num_messages is non-zero,
1034                    // it means that senders updated the state,
1035                    // but didn't put message to queue yet,
1036                    // so we need to park until sender unparks the task
1037                    // after queueing the message.
1038                    Poll::Pending
1039                }
1040            }
1041        }
1042    }
1043
1044    // Unpark a single task handle if there is one pending in the parked queue
1045    fn unpark_one(&mut self) {
1046        if let Some(inner) = &mut self.inner {
1047            if let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
1048                task.lock().unwrap().notify();
1049            }
1050        }
1051    }
1052
1053    fn dec_num_messages(&self) {
1054        if let Some(inner) = &self.inner {
1055            // OPEN_MASK is highest bit, so it's unaffected by subtraction
1056            // unless there's underflow, and we know there's no underflow
1057            // because number of messages at this point is always > 0.
1058            inner.state.fetch_sub(1, SeqCst);
1059        }
1060    }
1061}
1062
1063// The receiver does not ever take a Pin to the inner T
1064impl<T> Unpin for Receiver<T> {}
1065
1066impl<T> FusedStream for Receiver<T> {
1067    fn is_terminated(&self) -> bool {
1068        self.inner.is_none()
1069    }
1070}
1071
1072impl<T> Stream for Receiver<T> {
1073    type Item = T;
1074
1075    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
1076        // Try to read a message off of the message queue.
1077        match self.next_message() {
1078            Poll::Ready(msg) => {
1079                if msg.is_none() {
1080                    self.inner = None;
1081                }
1082                Poll::Ready(msg)
1083            }
1084            Poll::Pending => {
1085                // There are no messages to read, in this case, park.
1086                self.inner.as_ref().unwrap().recv_task.register(cx.waker());
1087                // Check queue again after parking to prevent race condition:
1088                // a message could be added to the queue after previous `next_message`
1089                // before `register` call.
1090                self.next_message()
1091            }
1092        }
1093    }
1094
1095    fn size_hint(&self) -> (usize, Option<usize>) {
1096        if let Some(inner) = &self.inner {
1097            decode_state(inner.state.load(SeqCst)).size_hint()
1098        } else {
1099            (0, Some(0))
1100        }
1101    }
1102}
1103
1104impl<T> Drop for Receiver<T> {
1105    fn drop(&mut self) {
1106        // Drain the channel of all pending messages
1107        self.close();
1108        if self.inner.is_some() {
1109            loop {
1110                match self.next_message() {
1111                    Poll::Ready(Some(_)) => {}
1112                    Poll::Ready(None) => break,
1113                    Poll::Pending => {
1114                        let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
1115
1116                        // If the channel is closed, then there is no need to park.
1117                        if state.is_closed() {
1118                            break;
1119                        }
1120
1121                        // TODO: Spinning isn't ideal, it might be worth
1122                        // investigating using a condvar or some other strategy
1123                        // here. That said, if this case is hit, then another thread
1124                        // is about to push the value into the queue and this isn't
1125                        // the only spinlock in the impl right now.
1126                        thread::yield_now();
1127                    }
1128                }
1129            }
1130        }
1131    }
1132}
1133
1134impl<T> fmt::Debug for Receiver<T> {
1135    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1136        let closed = if let Some(ref inner) = self.inner {
1137            decode_state(inner.state.load(SeqCst)).is_closed()
1138        } else {
1139            false
1140        };
1141
1142        f.debug_struct("Receiver").field("closed", &closed).finish()
1143    }
1144}
1145
1146impl<T> UnboundedReceiver<T> {
1147    /// Closes the receiving half of a channel, without dropping it.
1148    ///
1149    /// This prevents any further messages from being sent on the channel while
1150    /// still enabling the receiver to drain messages that are buffered.
1151    pub fn close(&mut self) {
1152        if let Some(inner) = &mut self.inner {
1153            inner.set_closed();
1154        }
1155    }
1156
1157    /// Tries to receive the next message without notifying a context if empty.
1158    ///
1159    /// It is not recommended to call this function from inside of a future,
1160    /// only when you've otherwise arranged to be notified when the channel is
1161    /// no longer empty.
1162    ///
1163    /// This function returns:
1164    /// * `Ok(Some(t))` when message is fetched
1165    /// * `Ok(None)` when channel is closed and no messages left in the queue
1166    /// * `Err(e)` when there are no messages available, but channel is not yet closed
1167    pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
1168        match self.next_message() {
1169            Poll::Ready(msg) => Ok(msg),
1170            Poll::Pending => Err(TryRecvError { _priv: () }),
1171        }
1172    }
1173
1174    fn next_message(&mut self) -> Poll<Option<T>> {
1175        let inner = match self.inner.as_mut() {
1176            None => return Poll::Ready(None),
1177            Some(inner) => inner,
1178        };
1179        // Pop off a message
1180        match unsafe { inner.message_queue.pop_spin() } {
1181            Some(msg) => {
1182                // Decrement number of messages
1183                self.dec_num_messages();
1184
1185                Poll::Ready(Some(msg))
1186            }
1187            None => {
1188                let state = decode_state(inner.state.load(SeqCst));
1189                if state.is_closed() {
1190                    // If closed flag is set AND there are no pending messages
1191                    // it means end of stream
1192                    self.inner = None;
1193                    Poll::Ready(None)
1194                } else {
1195                    // If queue is open, we need to return Pending
1196                    // to be woken up when new messages arrive.
1197                    // If queue is closed but num_messages is non-zero,
1198                    // it means that senders updated the state,
1199                    // but didn't put message to queue yet,
1200                    // so we need to park until sender unparks the task
1201                    // after queueing the message.
1202                    Poll::Pending
1203                }
1204            }
1205        }
1206    }
1207
1208    fn dec_num_messages(&self) {
1209        if let Some(inner) = &self.inner {
1210            // OPEN_MASK is highest bit, so it's unaffected by subtraction
1211            // unless there's underflow, and we know there's no underflow
1212            // because number of messages at this point is always > 0.
1213            inner.state.fetch_sub(1, SeqCst);
1214        }
1215    }
1216}
1217
1218impl<T> FusedStream for UnboundedReceiver<T> {
1219    fn is_terminated(&self) -> bool {
1220        self.inner.is_none()
1221    }
1222}
1223
1224impl<T> Stream for UnboundedReceiver<T> {
1225    type Item = T;
1226
1227    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
1228        // Try to read a message off of the message queue.
1229        match self.next_message() {
1230            Poll::Ready(msg) => {
1231                if msg.is_none() {
1232                    self.inner = None;
1233                }
1234                Poll::Ready(msg)
1235            }
1236            Poll::Pending => {
1237                // There are no messages to read, in this case, park.
1238                self.inner.as_ref().unwrap().recv_task.register(cx.waker());
1239                // Check queue again after parking to prevent race condition:
1240                // a message could be added to the queue after previous `next_message`
1241                // before `register` call.
1242                self.next_message()
1243            }
1244        }
1245    }
1246
1247    fn size_hint(&self) -> (usize, Option<usize>) {
1248        if let Some(inner) = &self.inner {
1249            decode_state(inner.state.load(SeqCst)).size_hint()
1250        } else {
1251            (0, Some(0))
1252        }
1253    }
1254}
1255
1256impl<T> Drop for UnboundedReceiver<T> {
1257    fn drop(&mut self) {
1258        // Drain the channel of all pending messages
1259        self.close();
1260        if self.inner.is_some() {
1261            loop {
1262                match self.next_message() {
1263                    Poll::Ready(Some(_)) => {}
1264                    Poll::Ready(None) => break,
1265                    Poll::Pending => {
1266                        let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
1267
1268                        // If the channel is closed, then there is no need to park.
1269                        if state.is_closed() {
1270                            break;
1271                        }
1272
1273                        // TODO: Spinning isn't ideal, it might be worth
1274                        // investigating using a condvar or some other strategy
1275                        // here. That said, if this case is hit, then another thread
1276                        // is about to push the value into the queue and this isn't
1277                        // the only spinlock in the impl right now.
1278                        thread::yield_now();
1279                    }
1280                }
1281            }
1282        }
1283    }
1284}
1285
1286impl<T> fmt::Debug for UnboundedReceiver<T> {
1287    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1288        let closed = if let Some(ref inner) = self.inner {
1289            decode_state(inner.state.load(SeqCst)).is_closed()
1290        } else {
1291            false
1292        };
1293
1294        f.debug_struct("Receiver").field("closed", &closed).finish()
1295    }
1296}
1297
1298/*
1299 *
1300 * ===== impl Inner =====
1301 *
1302 */
1303
1304impl<T> UnboundedInner<T> {
1305    // Clear `open` flag in the state, keep `num_messages` intact.
1306    fn set_closed(&self) {
1307        let curr = self.state.load(SeqCst);
1308        if !decode_state(curr).is_open {
1309            return;
1310        }
1311
1312        self.state.fetch_and(!OPEN_MASK, SeqCst);
1313    }
1314}
1315
1316impl<T> BoundedInner<T> {
1317    // The return value is such that the total number of messages that can be
1318    // enqueued into the channel will never exceed MAX_CAPACITY
1319    fn max_senders(&self) -> usize {
1320        MAX_CAPACITY - self.buffer
1321    }
1322
1323    // Clear `open` flag in the state, keep `num_messages` intact.
1324    fn set_closed(&self) {
1325        let curr = self.state.load(SeqCst);
1326        if !decode_state(curr).is_open {
1327            return;
1328        }
1329
1330        self.state.fetch_and(!OPEN_MASK, SeqCst);
1331    }
1332}
1333
1334unsafe impl<T: Send> Send for UnboundedInner<T> {}
1335unsafe impl<T: Send> Sync for UnboundedInner<T> {}
1336
1337unsafe impl<T: Send> Send for BoundedInner<T> {}
1338unsafe impl<T: Send> Sync for BoundedInner<T> {}
1339
1340impl State {
1341    fn is_closed(&self) -> bool {
1342        !self.is_open && self.num_messages == 0
1343    }
1344
1345    fn size_hint(&self) -> (usize, Option<usize>) {
1346        if self.is_open {
1347            (self.num_messages, None)
1348        } else {
1349            (self.num_messages, Some(self.num_messages))
1350        }
1351    }
1352}
1353
1354/*
1355 *
1356 * ===== Helpers =====
1357 *
1358 */
1359
1360fn decode_state(num: usize) -> State {
1361    State { is_open: num & OPEN_MASK == OPEN_MASK, num_messages: num & MAX_CAPACITY }
1362}
1363
1364fn encode_state(state: &State) -> usize {
1365    let mut num = state.num_messages;
1366
1367    if state.is_open {
1368        num |= OPEN_MASK;
1369    }
1370
1371    num
1372}