tokio/sync/
broadcast.rs

1//! A multi-producer, multi-consumer broadcast queue. Each sent value is seen by
2//! all consumers.
3//!
4//! A [`Sender`] is used to broadcast values to **all** connected [`Receiver`]
5//! values. [`Sender`] handles are clone-able, allowing concurrent send and
6//! receive actions. [`Sender`] and [`Receiver`] are both `Send` and `Sync` as
7//! long as `T` is `Send`.
8//!
9//! When a value is sent, **all** [`Receiver`] handles are notified and will
10//! receive the value. The value is stored once inside the channel and cloned on
11//! demand for each receiver. Once all receivers have received a clone of the
12//! value, the value is released from the channel.
13//!
14//! A channel is created by calling [`channel`], specifying the maximum number
15//! of messages the channel can retain at any given time.
16//!
17//! New [`Receiver`] handles are created by calling [`Sender::subscribe`]. The
18//! returned [`Receiver`] will receive values sent **after** the call to
19//! `subscribe`.
20//!
21//! This channel is also suitable for the single-producer multi-consumer
22//! use-case, where a single sender broadcasts values to many receivers.
23//!
24//! ## Lagging
25//!
26//! As sent messages must be retained until **all** [`Receiver`] handles receive
27//! a clone, broadcast channels are susceptible to the "slow receiver" problem.
28//! In this case, all but one receiver are able to receive values at the rate
29//! they are sent. Because one receiver is stalled, the channel starts to fill
30//! up.
31//!
32//! This broadcast channel implementation handles this case by setting a hard
33//! upper bound on the number of values the channel may retain at any given
34//! time. This upper bound is passed to the [`channel`] function as an argument.
35//!
36//! If a value is sent when the channel is at capacity, the oldest value
37//! currently held by the channel is released. This frees up space for the new
38//! value. Any receiver that has not yet seen the released value will return
39//! [`RecvError::Lagged`] the next time [`recv`] is called.
40//!
41//! Once [`RecvError::Lagged`] is returned, the lagging receiver's position is
42//! updated to the oldest value contained by the channel. The next call to
43//! [`recv`] will return this value.
44//!
45//! This behavior enables a receiver to detect when it has lagged so far behind
46//! that data has been dropped. The caller may decide how to respond to this:
47//! either by aborting its task or by tolerating lost messages and resuming
48//! consumption of the channel.
49//!
50//! ## Closing
51//!
52//! When **all** [`Sender`] handles have been dropped, no new values may be
53//! sent. At this point, the channel is "closed". Once a receiver has received
54//! all values retained by the channel, the next call to [`recv`] will return
55//! with [`RecvError::Closed`].
56//!
57//! When a [`Receiver`] handle is dropped, any messages not read by the receiver
58//! will be marked as read. If this receiver was the only one not to have read
59//! that message, the message will be dropped at this point.
60//!
61//! [`Sender`]: crate::sync::broadcast::Sender
62//! [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
63//! [`Receiver`]: crate::sync::broadcast::Receiver
64//! [`channel`]: crate::sync::broadcast::channel
65//! [`RecvError::Lagged`]: crate::sync::broadcast::error::RecvError::Lagged
66//! [`RecvError::Closed`]: crate::sync::broadcast::error::RecvError::Closed
67//! [`recv`]: crate::sync::broadcast::Receiver::recv
68//!
69//! # Examples
70//!
71//! Basic usage
72//!
73//! ```
74//! use tokio::sync::broadcast;
75//!
76//! #[tokio::main]
77//! async fn main() {
78//!     let (tx, mut rx1) = broadcast::channel(16);
79//!     let mut rx2 = tx.subscribe();
80//!
81//!     tokio::spawn(async move {
82//!         assert_eq!(rx1.recv().await.unwrap(), 10);
83//!         assert_eq!(rx1.recv().await.unwrap(), 20);
84//!     });
85//!
86//!     tokio::spawn(async move {
87//!         assert_eq!(rx2.recv().await.unwrap(), 10);
88//!         assert_eq!(rx2.recv().await.unwrap(), 20);
89//!     });
90//!
91//!     tx.send(10).unwrap();
92//!     tx.send(20).unwrap();
93//! }
94//! ```
95//!
96//! Handling lag
97//!
98//! ```
99//! use tokio::sync::broadcast;
100//!
101//! #[tokio::main]
102//! async fn main() {
103//!     let (tx, mut rx) = broadcast::channel(2);
104//!
105//!     tx.send(10).unwrap();
106//!     tx.send(20).unwrap();
107//!     tx.send(30).unwrap();
108//!
109//!     // The receiver lagged behind
110//!     assert!(rx.recv().await.is_err());
111//!
112//!     // At this point, we can abort or continue with lost messages
113//!
114//!     assert_eq!(20, rx.recv().await.unwrap());
115//!     assert_eq!(30, rx.recv().await.unwrap());
116//! }
117//! ```
118
119use crate::loom::cell::UnsafeCell;
120use crate::loom::sync::atomic::{AtomicBool, AtomicUsize};
121use crate::loom::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard};
122use crate::util::linked_list::{self, GuardedLinkedList, LinkedList};
123use crate::util::WakeList;
124
125use std::fmt;
126use std::future::Future;
127use std::marker::PhantomPinned;
128use std::pin::Pin;
129use std::ptr::NonNull;
130use std::sync::atomic::Ordering::{Acquire, Relaxed, Release, SeqCst};
131use std::task::{Context, Poll, Waker};
132
133/// Sending-half of the [`broadcast`] channel.
134///
135/// May be used from many threads. Messages can be sent with
136/// [`send`][Sender::send].
137///
138/// # Examples
139///
140/// ```
141/// use tokio::sync::broadcast;
142///
143/// #[tokio::main]
144/// async fn main() {
145///     let (tx, mut rx1) = broadcast::channel(16);
146///     let mut rx2 = tx.subscribe();
147///
148///     tokio::spawn(async move {
149///         assert_eq!(rx1.recv().await.unwrap(), 10);
150///         assert_eq!(rx1.recv().await.unwrap(), 20);
151///     });
152///
153///     tokio::spawn(async move {
154///         assert_eq!(rx2.recv().await.unwrap(), 10);
155///         assert_eq!(rx2.recv().await.unwrap(), 20);
156///     });
157///
158///     tx.send(10).unwrap();
159///     tx.send(20).unwrap();
160/// }
161/// ```
162///
163/// [`broadcast`]: crate::sync::broadcast
164pub struct Sender<T> {
165    shared: Arc<Shared<T>>,
166}
167
168/// Receiving-half of the [`broadcast`] channel.
169///
170/// Must not be used concurrently. Messages may be retrieved using
171/// [`recv`][Receiver::recv].
172///
173/// To turn this receiver into a `Stream`, you can use the [`BroadcastStream`]
174/// wrapper.
175///
176/// [`BroadcastStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.BroadcastStream.html
177///
178/// # Examples
179///
180/// ```
181/// use tokio::sync::broadcast;
182///
183/// #[tokio::main]
184/// async fn main() {
185///     let (tx, mut rx1) = broadcast::channel(16);
186///     let mut rx2 = tx.subscribe();
187///
188///     tokio::spawn(async move {
189///         assert_eq!(rx1.recv().await.unwrap(), 10);
190///         assert_eq!(rx1.recv().await.unwrap(), 20);
191///     });
192///
193///     tokio::spawn(async move {
194///         assert_eq!(rx2.recv().await.unwrap(), 10);
195///         assert_eq!(rx2.recv().await.unwrap(), 20);
196///     });
197///
198///     tx.send(10).unwrap();
199///     tx.send(20).unwrap();
200/// }
201/// ```
202///
203/// [`broadcast`]: crate::sync::broadcast
204pub struct Receiver<T> {
205    /// State shared with all receivers and senders.
206    shared: Arc<Shared<T>>,
207
208    /// Next position to read from
209    next: u64,
210}
211
212pub mod error {
213    //! Broadcast error types
214
215    use std::fmt;
216
217    /// Error returned by the [`send`] function on a [`Sender`].
218    ///
219    /// A **send** operation can only fail if there are no active receivers,
220    /// implying that the message could never be received. The error contains the
221    /// message being sent as a payload so it can be recovered.
222    ///
223    /// [`send`]: crate::sync::broadcast::Sender::send
224    /// [`Sender`]: crate::sync::broadcast::Sender
225    #[derive(Debug)]
226    pub struct SendError<T>(pub T);
227
228    impl<T> fmt::Display for SendError<T> {
229        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
230            write!(f, "channel closed")
231        }
232    }
233
234    impl<T: fmt::Debug> std::error::Error for SendError<T> {}
235
236    /// An error returned from the [`recv`] function on a [`Receiver`].
237    ///
238    /// [`recv`]: crate::sync::broadcast::Receiver::recv
239    /// [`Receiver`]: crate::sync::broadcast::Receiver
240    #[derive(Debug, PartialEq, Eq, Clone)]
241    pub enum RecvError {
242        /// There are no more active senders implying no further messages will ever
243        /// be sent.
244        Closed,
245
246        /// The receiver lagged too far behind. Attempting to receive again will
247        /// return the oldest message still retained by the channel.
248        ///
249        /// Includes the number of skipped messages.
250        Lagged(u64),
251    }
252
253    impl fmt::Display for RecvError {
254        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
255            match self {
256                RecvError::Closed => write!(f, "channel closed"),
257                RecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt),
258            }
259        }
260    }
261
262    impl std::error::Error for RecvError {}
263
264    /// An error returned from the [`try_recv`] function on a [`Receiver`].
265    ///
266    /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
267    /// [`Receiver`]: crate::sync::broadcast::Receiver
268    #[derive(Debug, PartialEq, Eq, Clone)]
269    pub enum TryRecvError {
270        /// The channel is currently empty. There are still active
271        /// [`Sender`] handles, so data may yet become available.
272        ///
273        /// [`Sender`]: crate::sync::broadcast::Sender
274        Empty,
275
276        /// There are no more active senders implying no further messages will ever
277        /// be sent.
278        Closed,
279
280        /// The receiver lagged too far behind and has been forcibly disconnected.
281        /// Attempting to receive again will return the oldest message still
282        /// retained by the channel.
283        ///
284        /// Includes the number of skipped messages.
285        Lagged(u64),
286    }
287
288    impl fmt::Display for TryRecvError {
289        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
290            match self {
291                TryRecvError::Empty => write!(f, "channel empty"),
292                TryRecvError::Closed => write!(f, "channel closed"),
293                TryRecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt),
294            }
295        }
296    }
297
298    impl std::error::Error for TryRecvError {}
299}
300
301use self::error::{RecvError, SendError, TryRecvError};
302
303/// Data shared between senders and receivers.
304struct Shared<T> {
305    /// slots in the channel.
306    buffer: Box<[RwLock<Slot<T>>]>,
307
308    /// Mask a position -> index.
309    mask: usize,
310
311    /// Tail of the queue. Includes the rx wait list.
312    tail: Mutex<Tail>,
313
314    /// Number of outstanding Sender handles.
315    num_tx: AtomicUsize,
316}
317
318/// Next position to write a value.
319struct Tail {
320    /// Next position to write to.
321    pos: u64,
322
323    /// Number of active receivers.
324    rx_cnt: usize,
325
326    /// True if the channel is closed.
327    closed: bool,
328
329    /// Receivers waiting for a value.
330    waiters: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
331}
332
333/// Slot in the buffer.
334struct Slot<T> {
335    /// Remaining number of receivers that are expected to see this value.
336    ///
337    /// When this goes to zero, the value is released.
338    ///
339    /// An atomic is used as it is mutated concurrently with the slot read lock
340    /// acquired.
341    rem: AtomicUsize,
342
343    /// Uniquely identifies the `send` stored in the slot.
344    pos: u64,
345
346    /// The value being broadcast.
347    ///
348    /// The value is set by `send` when the write lock is held. When a reader
349    /// drops, `rem` is decremented. When it hits zero, the value is dropped.
350    val: UnsafeCell<Option<T>>,
351}
352
353/// An entry in the wait queue.
354struct Waiter {
355    /// True if queued.
356    queued: AtomicBool,
357
358    /// Task waiting on the broadcast channel.
359    waker: Option<Waker>,
360
361    /// Intrusive linked-list pointers.
362    pointers: linked_list::Pointers<Waiter>,
363
364    /// Should not be `Unpin`.
365    _p: PhantomPinned,
366}
367
368impl Waiter {
369    fn new() -> Self {
370        Self {
371            queued: AtomicBool::new(false),
372            waker: None,
373            pointers: linked_list::Pointers::new(),
374            _p: PhantomPinned,
375        }
376    }
377}
378
379generate_addr_of_methods! {
380    impl<> Waiter {
381        unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
382            &self.pointers
383        }
384    }
385}
386
387struct RecvGuard<'a, T> {
388    slot: RwLockReadGuard<'a, Slot<T>>,
389}
390
391/// Receive a value future.
392struct Recv<'a, T> {
393    /// Receiver being waited on.
394    receiver: &'a mut Receiver<T>,
395
396    /// Entry in the waiter `LinkedList`.
397    waiter: UnsafeCell<Waiter>,
398}
399
400unsafe impl<'a, T: Send> Send for Recv<'a, T> {}
401unsafe impl<'a, T: Send> Sync for Recv<'a, T> {}
402
403/// Max number of receivers. Reserve space to lock.
404const MAX_RECEIVERS: usize = usize::MAX >> 2;
405
406/// Create a bounded, multi-producer, multi-consumer channel where each sent
407/// value is broadcasted to all active receivers.
408///
409/// **Note:** The actual capacity may be greater than the provided `capacity`.
410///
411/// All data sent on [`Sender`] will become available on every active
412/// [`Receiver`] in the same order as it was sent.
413///
414/// The `Sender` can be cloned to `send` to the same channel from multiple
415/// points in the process or it can be used concurrently from an `Arc`. New
416/// `Receiver` handles are created by calling [`Sender::subscribe`].
417///
418/// If all [`Receiver`] handles are dropped, the `send` method will return a
419/// [`SendError`]. Similarly, if all [`Sender`] handles are dropped, the [`recv`]
420/// method will return a [`RecvError`].
421///
422/// [`Sender`]: crate::sync::broadcast::Sender
423/// [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
424/// [`Receiver`]: crate::sync::broadcast::Receiver
425/// [`recv`]: crate::sync::broadcast::Receiver::recv
426/// [`SendError`]: crate::sync::broadcast::error::SendError
427/// [`RecvError`]: crate::sync::broadcast::error::RecvError
428///
429/// # Examples
430///
431/// ```
432/// use tokio::sync::broadcast;
433///
434/// #[tokio::main]
435/// async fn main() {
436///     let (tx, mut rx1) = broadcast::channel(16);
437///     let mut rx2 = tx.subscribe();
438///
439///     tokio::spawn(async move {
440///         assert_eq!(rx1.recv().await.unwrap(), 10);
441///         assert_eq!(rx1.recv().await.unwrap(), 20);
442///     });
443///
444///     tokio::spawn(async move {
445///         assert_eq!(rx2.recv().await.unwrap(), 10);
446///         assert_eq!(rx2.recv().await.unwrap(), 20);
447///     });
448///
449///     tx.send(10).unwrap();
450///     tx.send(20).unwrap();
451/// }
452/// ```
453///
454/// # Panics
455///
456/// This will panic if `capacity` is equal to `0` or larger
457/// than `usize::MAX / 2`.
458#[track_caller]
459pub fn channel<T: Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
460    // SAFETY: In the line below we are creating one extra receiver, so there will be 1 in total.
461    let tx = unsafe { Sender::new_with_receiver_count(1, capacity) };
462    let rx = Receiver {
463        shared: tx.shared.clone(),
464        next: 0,
465    };
466    (tx, rx)
467}
468
469unsafe impl<T: Send> Send for Sender<T> {}
470unsafe impl<T: Send> Sync for Sender<T> {}
471
472unsafe impl<T: Send> Send for Receiver<T> {}
473unsafe impl<T: Send> Sync for Receiver<T> {}
474
475impl<T> Sender<T> {
476    /// Creates the sending-half of the [`broadcast`] channel.
477    ///
478    /// See the documentation of [`broadcast::channel`] for more information on this method.
479    ///
480    /// [`broadcast`]: crate::sync::broadcast
481    /// [`broadcast::channel`]: crate::sync::broadcast::channel
482    #[track_caller]
483    pub fn new(capacity: usize) -> Self {
484        // SAFETY: We don't create extra receivers, so there are 0.
485        unsafe { Self::new_with_receiver_count(0, capacity) }
486    }
487
488    /// Creates the sending-half of the [`broadcast`](self) channel, and provide the receiver
489    /// count.
490    ///
491    /// See the documentation of [`broadcast::channel`](self::channel) for more errors when
492    /// calling this function.
493    ///
494    /// # Safety:
495    ///
496    /// The caller must ensure that the amount of receivers for this Sender is correct before
497    /// the channel functionalities are used, the count is zero by default, as this function
498    /// does not create any receivers by itself.
499    #[track_caller]
500    unsafe fn new_with_receiver_count(receiver_count: usize, mut capacity: usize) -> Self {
501        assert!(capacity > 0, "broadcast channel capacity cannot be zero");
502        assert!(
503            capacity <= usize::MAX >> 1,
504            "broadcast channel capacity exceeded `usize::MAX / 2`"
505        );
506
507        // Round to a power of two
508        capacity = capacity.next_power_of_two();
509
510        let mut buffer = Vec::with_capacity(capacity);
511
512        for i in 0..capacity {
513            buffer.push(RwLock::new(Slot {
514                rem: AtomicUsize::new(0),
515                pos: (i as u64).wrapping_sub(capacity as u64),
516                val: UnsafeCell::new(None),
517            }));
518        }
519
520        let shared = Arc::new(Shared {
521            buffer: buffer.into_boxed_slice(),
522            mask: capacity - 1,
523            tail: Mutex::new(Tail {
524                pos: 0,
525                rx_cnt: receiver_count,
526                closed: false,
527                waiters: LinkedList::new(),
528            }),
529            num_tx: AtomicUsize::new(1),
530        });
531
532        Sender { shared }
533    }
534
535    /// Attempts to send a value to all active [`Receiver`] handles, returning
536    /// it back if it could not be sent.
537    ///
538    /// A successful send occurs when there is at least one active [`Receiver`]
539    /// handle. An unsuccessful send would be one where all associated
540    /// [`Receiver`] handles have already been dropped.
541    ///
542    /// # Return
543    ///
544    /// On success, the number of subscribed [`Receiver`] handles is returned.
545    /// This does not mean that this number of receivers will see the message as
546    /// a receiver may drop or lag ([see lagging](self#lagging)) before receiving
547    /// the message.
548    ///
549    /// # Note
550    ///
551    /// A return value of `Ok` **does not** mean that the sent value will be
552    /// observed by all or any of the active [`Receiver`] handles. [`Receiver`]
553    /// handles may be dropped before receiving the sent message.
554    ///
555    /// A return value of `Err` **does not** mean that future calls to `send`
556    /// will fail. New [`Receiver`] handles may be created by calling
557    /// [`subscribe`].
558    ///
559    /// [`Receiver`]: crate::sync::broadcast::Receiver
560    /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
561    ///
562    /// # Examples
563    ///
564    /// ```
565    /// use tokio::sync::broadcast;
566    ///
567    /// #[tokio::main]
568    /// async fn main() {
569    ///     let (tx, mut rx1) = broadcast::channel(16);
570    ///     let mut rx2 = tx.subscribe();
571    ///
572    ///     tokio::spawn(async move {
573    ///         assert_eq!(rx1.recv().await.unwrap(), 10);
574    ///         assert_eq!(rx1.recv().await.unwrap(), 20);
575    ///     });
576    ///
577    ///     tokio::spawn(async move {
578    ///         assert_eq!(rx2.recv().await.unwrap(), 10);
579    ///         assert_eq!(rx2.recv().await.unwrap(), 20);
580    ///     });
581    ///
582    ///     tx.send(10).unwrap();
583    ///     tx.send(20).unwrap();
584    /// }
585    /// ```
586    pub fn send(&self, value: T) -> Result<usize, SendError<T>> {
587        let mut tail = self.shared.tail.lock();
588
589        if tail.rx_cnt == 0 {
590            return Err(SendError(value));
591        }
592
593        // Position to write into
594        let pos = tail.pos;
595        let rem = tail.rx_cnt;
596        let idx = (pos & self.shared.mask as u64) as usize;
597
598        // Update the tail position
599        tail.pos = tail.pos.wrapping_add(1);
600
601        // Get the slot
602        let mut slot = self.shared.buffer[idx].write().unwrap();
603
604        // Track the position
605        slot.pos = pos;
606
607        // Set remaining receivers
608        slot.rem.with_mut(|v| *v = rem);
609
610        // Write the value
611        slot.val = UnsafeCell::new(Some(value));
612
613        // Release the slot lock before notifying the receivers.
614        drop(slot);
615
616        // Notify and release the mutex. This must happen after the slot lock is
617        // released, otherwise the writer lock bit could be cleared while another
618        // thread is in the critical section.
619        self.shared.notify_rx(tail);
620
621        Ok(rem)
622    }
623
624    /// Creates a new [`Receiver`] handle that will receive values sent **after**
625    /// this call to `subscribe`.
626    ///
627    /// # Examples
628    ///
629    /// ```
630    /// use tokio::sync::broadcast;
631    ///
632    /// #[tokio::main]
633    /// async fn main() {
634    ///     let (tx, _rx) = broadcast::channel(16);
635    ///
636    ///     // Will not be seen
637    ///     tx.send(10).unwrap();
638    ///
639    ///     let mut rx = tx.subscribe();
640    ///
641    ///     tx.send(20).unwrap();
642    ///
643    ///     let value = rx.recv().await.unwrap();
644    ///     assert_eq!(20, value);
645    /// }
646    /// ```
647    pub fn subscribe(&self) -> Receiver<T> {
648        let shared = self.shared.clone();
649        new_receiver(shared)
650    }
651
652    /// Returns the number of queued values.
653    ///
654    /// A value is queued until it has either been seen by all receivers that were alive at the time
655    /// it was sent, or has been evicted from the queue by subsequent sends that exceeded the
656    /// queue's capacity.
657    ///
658    /// # Note
659    ///
660    /// In contrast to [`Receiver::len`], this method only reports queued values and not values that
661    /// have been evicted from the queue before being seen by all receivers.
662    ///
663    /// # Examples
664    ///
665    /// ```
666    /// use tokio::sync::broadcast;
667    ///
668    /// #[tokio::main]
669    /// async fn main() {
670    ///     let (tx, mut rx1) = broadcast::channel(16);
671    ///     let mut rx2 = tx.subscribe();
672    ///
673    ///     tx.send(10).unwrap();
674    ///     tx.send(20).unwrap();
675    ///     tx.send(30).unwrap();
676    ///
677    ///     assert_eq!(tx.len(), 3);
678    ///
679    ///     rx1.recv().await.unwrap();
680    ///
681    ///     // The len is still 3 since rx2 hasn't seen the first value yet.
682    ///     assert_eq!(tx.len(), 3);
683    ///
684    ///     rx2.recv().await.unwrap();
685    ///
686    ///     assert_eq!(tx.len(), 2);
687    /// }
688    /// ```
689    pub fn len(&self) -> usize {
690        let tail = self.shared.tail.lock();
691
692        let base_idx = (tail.pos & self.shared.mask as u64) as usize;
693        let mut low = 0;
694        let mut high = self.shared.buffer.len();
695        while low < high {
696            let mid = low + (high - low) / 2;
697            let idx = base_idx.wrapping_add(mid) & self.shared.mask;
698            if self.shared.buffer[idx].read().unwrap().rem.load(SeqCst) == 0 {
699                low = mid + 1;
700            } else {
701                high = mid;
702            }
703        }
704
705        self.shared.buffer.len() - low
706    }
707
708    /// Returns true if there are no queued values.
709    ///
710    /// # Examples
711    ///
712    /// ```
713    /// use tokio::sync::broadcast;
714    ///
715    /// #[tokio::main]
716    /// async fn main() {
717    ///     let (tx, mut rx1) = broadcast::channel(16);
718    ///     let mut rx2 = tx.subscribe();
719    ///
720    ///     assert!(tx.is_empty());
721    ///
722    ///     tx.send(10).unwrap();
723    ///
724    ///     assert!(!tx.is_empty());
725    ///
726    ///     rx1.recv().await.unwrap();
727    ///
728    ///     // The queue is still not empty since rx2 hasn't seen the value.
729    ///     assert!(!tx.is_empty());
730    ///
731    ///     rx2.recv().await.unwrap();
732    ///
733    ///     assert!(tx.is_empty());
734    /// }
735    /// ```
736    pub fn is_empty(&self) -> bool {
737        let tail = self.shared.tail.lock();
738
739        let idx = (tail.pos.wrapping_sub(1) & self.shared.mask as u64) as usize;
740        self.shared.buffer[idx].read().unwrap().rem.load(SeqCst) == 0
741    }
742
743    /// Returns the number of active receivers.
744    ///
745    /// An active receiver is a [`Receiver`] handle returned from [`channel`] or
746    /// [`subscribe`]. These are the handles that will receive values sent on
747    /// this [`Sender`].
748    ///
749    /// # Note
750    ///
751    /// It is not guaranteed that a sent message will reach this number of
752    /// receivers. Active receivers may never call [`recv`] again before
753    /// dropping.
754    ///
755    /// [`recv`]: crate::sync::broadcast::Receiver::recv
756    /// [`Receiver`]: crate::sync::broadcast::Receiver
757    /// [`Sender`]: crate::sync::broadcast::Sender
758    /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
759    /// [`channel`]: crate::sync::broadcast::channel
760    ///
761    /// # Examples
762    ///
763    /// ```
764    /// use tokio::sync::broadcast;
765    ///
766    /// #[tokio::main]
767    /// async fn main() {
768    ///     let (tx, _rx1) = broadcast::channel(16);
769    ///
770    ///     assert_eq!(1, tx.receiver_count());
771    ///
772    ///     let mut _rx2 = tx.subscribe();
773    ///
774    ///     assert_eq!(2, tx.receiver_count());
775    ///
776    ///     tx.send(10).unwrap();
777    /// }
778    /// ```
779    pub fn receiver_count(&self) -> usize {
780        let tail = self.shared.tail.lock();
781        tail.rx_cnt
782    }
783
784    /// Returns `true` if senders belong to the same channel.
785    ///
786    /// # Examples
787    ///
788    /// ```
789    /// use tokio::sync::broadcast;
790    ///
791    /// #[tokio::main]
792    /// async fn main() {
793    ///     let (tx, _rx) = broadcast::channel::<()>(16);
794    ///     let tx2 = tx.clone();
795    ///
796    ///     assert!(tx.same_channel(&tx2));
797    ///
798    ///     let (tx3, _rx3) = broadcast::channel::<()>(16);
799    ///
800    ///     assert!(!tx3.same_channel(&tx2));
801    /// }
802    /// ```
803    pub fn same_channel(&self, other: &Self) -> bool {
804        Arc::ptr_eq(&self.shared, &other.shared)
805    }
806
807    fn close_channel(&self) {
808        let mut tail = self.shared.tail.lock();
809        tail.closed = true;
810
811        self.shared.notify_rx(tail);
812    }
813}
814
815/// Create a new `Receiver` which reads starting from the tail.
816fn new_receiver<T>(shared: Arc<Shared<T>>) -> Receiver<T> {
817    let mut tail = shared.tail.lock();
818
819    assert!(tail.rx_cnt != MAX_RECEIVERS, "max receivers");
820
821    tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow");
822
823    let next = tail.pos;
824
825    drop(tail);
826
827    Receiver { shared, next }
828}
829
830/// List used in `Shared::notify_rx`. It wraps a guarded linked list
831/// and gates the access to it on the `Shared.tail` mutex. It also empties
832/// the list on drop.
833struct WaitersList<'a, T> {
834    list: GuardedLinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
835    is_empty: bool,
836    shared: &'a Shared<T>,
837}
838
839impl<'a, T> Drop for WaitersList<'a, T> {
840    fn drop(&mut self) {
841        // If the list is not empty, we unlink all waiters from it.
842        // We do not wake the waiters to avoid double panics.
843        if !self.is_empty {
844            let _lock_guard = self.shared.tail.lock();
845            while self.list.pop_back().is_some() {}
846        }
847    }
848}
849
850impl<'a, T> WaitersList<'a, T> {
851    fn new(
852        unguarded_list: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
853        guard: Pin<&'a Waiter>,
854        shared: &'a Shared<T>,
855    ) -> Self {
856        let guard_ptr = NonNull::from(guard.get_ref());
857        let list = unguarded_list.into_guarded(guard_ptr);
858        WaitersList {
859            list,
860            is_empty: false,
861            shared,
862        }
863    }
864
865    /// Removes the last element from the guarded list. Modifying this list
866    /// requires an exclusive access to the main list in `Notify`.
867    fn pop_back_locked(&mut self, _tail: &mut Tail) -> Option<NonNull<Waiter>> {
868        let result = self.list.pop_back();
869        if result.is_none() {
870            // Save information about emptiness to avoid waiting for lock
871            // in the destructor.
872            self.is_empty = true;
873        }
874        result
875    }
876}
877
878impl<T> Shared<T> {
879    fn notify_rx<'a, 'b: 'a>(&'b self, mut tail: MutexGuard<'a, Tail>) {
880        // It is critical for `GuardedLinkedList` safety that the guard node is
881        // pinned in memory and is not dropped until the guarded list is dropped.
882        let guard = Waiter::new();
883        pin!(guard);
884
885        // We move all waiters to a secondary list. It uses a `GuardedLinkedList`
886        // underneath to allow every waiter to safely remove itself from it.
887        //
888        // * This list will be still guarded by the `waiters` lock.
889        //   `NotifyWaitersList` wrapper makes sure we hold the lock to modify it.
890        // * This wrapper will empty the list on drop. It is critical for safety
891        //   that we will not leave any list entry with a pointer to the local
892        //   guard node after this function returns / panics.
893        let mut list = WaitersList::new(std::mem::take(&mut tail.waiters), guard.as_ref(), self);
894
895        let mut wakers = WakeList::new();
896        'outer: loop {
897            while wakers.can_push() {
898                match list.pop_back_locked(&mut tail) {
899                    Some(waiter) => {
900                        unsafe {
901                            // Safety: accessing `waker` is safe because
902                            // the tail lock is held.
903                            if let Some(waker) = (*waiter.as_ptr()).waker.take() {
904                                wakers.push(waker);
905                            }
906
907                            // Safety: `queued` is atomic.
908                            let queued = &(*waiter.as_ptr()).queued;
909                            // `Relaxed` suffices because the tail lock is held.
910                            assert!(queued.load(Relaxed));
911                            // `Release` is needed to synchronize with `Recv::drop`.
912                            // It is critical to set this variable **after** waker
913                            // is extracted, otherwise we may data race with `Recv::drop`.
914                            queued.store(false, Release);
915                        }
916                    }
917                    None => {
918                        break 'outer;
919                    }
920                }
921            }
922
923            // Release the lock before waking.
924            drop(tail);
925
926            // Before we acquire the lock again all sorts of things can happen:
927            // some waiters may remove themselves from the list and new waiters
928            // may be added. This is fine since at worst we will unnecessarily
929            // wake up waiters which will then queue themselves again.
930
931            wakers.wake_all();
932
933            // Acquire the lock again.
934            tail = self.tail.lock();
935        }
936
937        // Release the lock before waking.
938        drop(tail);
939
940        wakers.wake_all();
941    }
942}
943
944impl<T> Clone for Sender<T> {
945    fn clone(&self) -> Sender<T> {
946        let shared = self.shared.clone();
947        shared.num_tx.fetch_add(1, SeqCst);
948
949        Sender { shared }
950    }
951}
952
953impl<T> Drop for Sender<T> {
954    fn drop(&mut self) {
955        if 1 == self.shared.num_tx.fetch_sub(1, SeqCst) {
956            self.close_channel();
957        }
958    }
959}
960
961impl<T> Receiver<T> {
962    /// Returns the number of messages that were sent into the channel and that
963    /// this [`Receiver`] has yet to receive.
964    ///
965    /// If the returned value from `len` is larger than the next largest power of 2
966    /// of the capacity of the channel any call to [`recv`] will return an
967    /// `Err(RecvError::Lagged)` and any call to [`try_recv`] will return an
968    /// `Err(TryRecvError::Lagged)`, e.g. if the capacity of the channel is 10,
969    /// [`recv`] will start to return `Err(RecvError::Lagged)` once `len` returns
970    /// values larger than 16.
971    ///
972    /// [`Receiver`]: crate::sync::broadcast::Receiver
973    /// [`recv`]: crate::sync::broadcast::Receiver::recv
974    /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
975    ///
976    /// # Examples
977    ///
978    /// ```
979    /// use tokio::sync::broadcast;
980    ///
981    /// #[tokio::main]
982    /// async fn main() {
983    ///     let (tx, mut rx1) = broadcast::channel(16);
984    ///
985    ///     tx.send(10).unwrap();
986    ///     tx.send(20).unwrap();
987    ///
988    ///     assert_eq!(rx1.len(), 2);
989    ///     assert_eq!(rx1.recv().await.unwrap(), 10);
990    ///     assert_eq!(rx1.len(), 1);
991    ///     assert_eq!(rx1.recv().await.unwrap(), 20);
992    ///     assert_eq!(rx1.len(), 0);
993    /// }
994    /// ```
995    pub fn len(&self) -> usize {
996        let next_send_pos = self.shared.tail.lock().pos;
997        (next_send_pos - self.next) as usize
998    }
999
1000    /// Returns true if there aren't any messages in the channel that the [`Receiver`]
1001    /// has yet to receive.
1002    ///
1003    /// [`Receiver]: create::sync::broadcast::Receiver
1004    ///
1005    /// # Examples
1006    ///
1007    /// ```
1008    /// use tokio::sync::broadcast;
1009    ///
1010    /// #[tokio::main]
1011    /// async fn main() {
1012    ///     let (tx, mut rx1) = broadcast::channel(16);
1013    ///
1014    ///     assert!(rx1.is_empty());
1015    ///
1016    ///     tx.send(10).unwrap();
1017    ///     tx.send(20).unwrap();
1018    ///
1019    ///     assert!(!rx1.is_empty());
1020    ///     assert_eq!(rx1.recv().await.unwrap(), 10);
1021    ///     assert_eq!(rx1.recv().await.unwrap(), 20);
1022    ///     assert!(rx1.is_empty());
1023    /// }
1024    /// ```
1025    pub fn is_empty(&self) -> bool {
1026        self.len() == 0
1027    }
1028
1029    /// Returns `true` if receivers belong to the same channel.
1030    ///
1031    /// # Examples
1032    ///
1033    /// ```
1034    /// use tokio::sync::broadcast;
1035    ///
1036    /// #[tokio::main]
1037    /// async fn main() {
1038    ///     let (tx, rx) = broadcast::channel::<()>(16);
1039    ///     let rx2 = tx.subscribe();
1040    ///
1041    ///     assert!(rx.same_channel(&rx2));
1042    ///
1043    ///     let (_tx3, rx3) = broadcast::channel::<()>(16);
1044    ///
1045    ///     assert!(!rx3.same_channel(&rx2));
1046    /// }
1047    /// ```
1048    pub fn same_channel(&self, other: &Self) -> bool {
1049        Arc::ptr_eq(&self.shared, &other.shared)
1050    }
1051
1052    /// Locks the next value if there is one.
1053    fn recv_ref(
1054        &mut self,
1055        waiter: Option<(&UnsafeCell<Waiter>, &Waker)>,
1056    ) -> Result<RecvGuard<'_, T>, TryRecvError> {
1057        let idx = (self.next & self.shared.mask as u64) as usize;
1058
1059        // The slot holding the next value to read
1060        let mut slot = self.shared.buffer[idx].read().unwrap();
1061
1062        if slot.pos != self.next {
1063            // Release the `slot` lock before attempting to acquire the `tail`
1064            // lock. This is required because `send2` acquires the tail lock
1065            // first followed by the slot lock. Acquiring the locks in reverse
1066            // order here would result in a potential deadlock: `recv_ref`
1067            // acquires the `slot` lock and attempts to acquire the `tail` lock
1068            // while `send2` acquired the `tail` lock and attempts to acquire
1069            // the slot lock.
1070            drop(slot);
1071
1072            let mut old_waker = None;
1073
1074            let mut tail = self.shared.tail.lock();
1075
1076            // Acquire slot lock again
1077            slot = self.shared.buffer[idx].read().unwrap();
1078
1079            // Make sure the position did not change. This could happen in the
1080            // unlikely event that the buffer is wrapped between dropping the
1081            // read lock and acquiring the tail lock.
1082            if slot.pos != self.next {
1083                let next_pos = slot.pos.wrapping_add(self.shared.buffer.len() as u64);
1084
1085                if next_pos == self.next {
1086                    // At this point the channel is empty for *this* receiver. If
1087                    // it's been closed, then that's what we return, otherwise we
1088                    // set a waker and return empty.
1089                    if tail.closed {
1090                        return Err(TryRecvError::Closed);
1091                    }
1092
1093                    // Store the waker
1094                    if let Some((waiter, waker)) = waiter {
1095                        // Safety: called while locked.
1096                        unsafe {
1097                            // Only queue if not already queued
1098                            waiter.with_mut(|ptr| {
1099                                // If there is no waker **or** if the currently
1100                                // stored waker references a **different** task,
1101                                // track the tasks' waker to be notified on
1102                                // receipt of a new value.
1103                                match (*ptr).waker {
1104                                    Some(ref w) if w.will_wake(waker) => {}
1105                                    _ => {
1106                                        old_waker = std::mem::replace(
1107                                            &mut (*ptr).waker,
1108                                            Some(waker.clone()),
1109                                        );
1110                                    }
1111                                }
1112
1113                                // If the waiter is not already queued, enqueue it.
1114                                // `Relaxed` order suffices: we have synchronized with
1115                                // all writers through the tail lock that we hold.
1116                                if !(*ptr).queued.load(Relaxed) {
1117                                    // `Relaxed` order suffices: all the readers will
1118                                    // synchronize with this write through the tail lock.
1119                                    (*ptr).queued.store(true, Relaxed);
1120                                    tail.waiters.push_front(NonNull::new_unchecked(&mut *ptr));
1121                                }
1122                            });
1123                        }
1124                    }
1125
1126                    // Drop the old waker after releasing the locks.
1127                    drop(slot);
1128                    drop(tail);
1129                    drop(old_waker);
1130
1131                    return Err(TryRecvError::Empty);
1132                }
1133
1134                // At this point, the receiver has lagged behind the sender by
1135                // more than the channel capacity. The receiver will attempt to
1136                // catch up by skipping dropped messages and setting the
1137                // internal cursor to the **oldest** message stored by the
1138                // channel.
1139                let next = tail.pos.wrapping_sub(self.shared.buffer.len() as u64);
1140
1141                let missed = next.wrapping_sub(self.next);
1142
1143                drop(tail);
1144
1145                // The receiver is slow but no values have been missed
1146                if missed == 0 {
1147                    self.next = self.next.wrapping_add(1);
1148
1149                    return Ok(RecvGuard { slot });
1150                }
1151
1152                self.next = next;
1153
1154                return Err(TryRecvError::Lagged(missed));
1155            }
1156        }
1157
1158        self.next = self.next.wrapping_add(1);
1159
1160        Ok(RecvGuard { slot })
1161    }
1162}
1163
1164impl<T: Clone> Receiver<T> {
1165    /// Re-subscribes to the channel starting from the current tail element.
1166    ///
1167    /// This [`Receiver`] handle will receive a clone of all values sent
1168    /// **after** it has resubscribed. This will not include elements that are
1169    /// in the queue of the current receiver. Consider the following example.
1170    ///
1171    /// # Examples
1172    ///
1173    /// ```
1174    /// use tokio::sync::broadcast;
1175    ///
1176    /// #[tokio::main]
1177    /// async fn main() {
1178    ///   let (tx, mut rx) = broadcast::channel(2);
1179    ///
1180    ///   tx.send(1).unwrap();
1181    ///   let mut rx2 = rx.resubscribe();
1182    ///   tx.send(2).unwrap();
1183    ///
1184    ///   assert_eq!(rx2.recv().await.unwrap(), 2);
1185    ///   assert_eq!(rx.recv().await.unwrap(), 1);
1186    /// }
1187    /// ```
1188    pub fn resubscribe(&self) -> Self {
1189        let shared = self.shared.clone();
1190        new_receiver(shared)
1191    }
1192    /// Receives the next value for this receiver.
1193    ///
1194    /// Each [`Receiver`] handle will receive a clone of all values sent
1195    /// **after** it has subscribed.
1196    ///
1197    /// `Err(RecvError::Closed)` is returned when all `Sender` halves have
1198    /// dropped, indicating that no further values can be sent on the channel.
1199    ///
1200    /// If the [`Receiver`] handle falls behind, once the channel is full, newly
1201    /// sent values will overwrite old values. At this point, a call to [`recv`]
1202    /// will return with `Err(RecvError::Lagged)` and the [`Receiver`]'s
1203    /// internal cursor is updated to point to the oldest value still held by
1204    /// the channel. A subsequent call to [`recv`] will return this value
1205    /// **unless** it has been since overwritten.
1206    ///
1207    /// # Cancel safety
1208    ///
1209    /// This method is cancel safe. If `recv` is used as the event in a
1210    /// [`tokio::select!`](crate::select) statement and some other branch
1211    /// completes first, it is guaranteed that no messages were received on this
1212    /// channel.
1213    ///
1214    /// [`Receiver`]: crate::sync::broadcast::Receiver
1215    /// [`recv`]: crate::sync::broadcast::Receiver::recv
1216    ///
1217    /// # Examples
1218    ///
1219    /// ```
1220    /// use tokio::sync::broadcast;
1221    ///
1222    /// #[tokio::main]
1223    /// async fn main() {
1224    ///     let (tx, mut rx1) = broadcast::channel(16);
1225    ///     let mut rx2 = tx.subscribe();
1226    ///
1227    ///     tokio::spawn(async move {
1228    ///         assert_eq!(rx1.recv().await.unwrap(), 10);
1229    ///         assert_eq!(rx1.recv().await.unwrap(), 20);
1230    ///     });
1231    ///
1232    ///     tokio::spawn(async move {
1233    ///         assert_eq!(rx2.recv().await.unwrap(), 10);
1234    ///         assert_eq!(rx2.recv().await.unwrap(), 20);
1235    ///     });
1236    ///
1237    ///     tx.send(10).unwrap();
1238    ///     tx.send(20).unwrap();
1239    /// }
1240    /// ```
1241    ///
1242    /// Handling lag
1243    ///
1244    /// ```
1245    /// use tokio::sync::broadcast;
1246    ///
1247    /// #[tokio::main]
1248    /// async fn main() {
1249    ///     let (tx, mut rx) = broadcast::channel(2);
1250    ///
1251    ///     tx.send(10).unwrap();
1252    ///     tx.send(20).unwrap();
1253    ///     tx.send(30).unwrap();
1254    ///
1255    ///     // The receiver lagged behind
1256    ///     assert!(rx.recv().await.is_err());
1257    ///
1258    ///     // At this point, we can abort or continue with lost messages
1259    ///
1260    ///     assert_eq!(20, rx.recv().await.unwrap());
1261    ///     assert_eq!(30, rx.recv().await.unwrap());
1262    /// }
1263    /// ```
1264    pub async fn recv(&mut self) -> Result<T, RecvError> {
1265        let fut = Recv::new(self);
1266        fut.await
1267    }
1268
1269    /// Attempts to return a pending value on this receiver without awaiting.
1270    ///
1271    /// This is useful for a flavor of "optimistic check" before deciding to
1272    /// await on a receiver.
1273    ///
1274    /// Compared with [`recv`], this function has three failure cases instead of two
1275    /// (one for closed, one for an empty buffer, one for a lagging receiver).
1276    ///
1277    /// `Err(TryRecvError::Closed)` is returned when all `Sender` halves have
1278    /// dropped, indicating that no further values can be sent on the channel.
1279    ///
1280    /// If the [`Receiver`] handle falls behind, once the channel is full, newly
1281    /// sent values will overwrite old values. At this point, a call to [`recv`]
1282    /// will return with `Err(TryRecvError::Lagged)` and the [`Receiver`]'s
1283    /// internal cursor is updated to point to the oldest value still held by
1284    /// the channel. A subsequent call to [`try_recv`] will return this value
1285    /// **unless** it has been since overwritten. If there are no values to
1286    /// receive, `Err(TryRecvError::Empty)` is returned.
1287    ///
1288    /// [`recv`]: crate::sync::broadcast::Receiver::recv
1289    /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
1290    /// [`Receiver`]: crate::sync::broadcast::Receiver
1291    ///
1292    /// # Examples
1293    ///
1294    /// ```
1295    /// use tokio::sync::broadcast;
1296    ///
1297    /// #[tokio::main]
1298    /// async fn main() {
1299    ///     let (tx, mut rx) = broadcast::channel(16);
1300    ///
1301    ///     assert!(rx.try_recv().is_err());
1302    ///
1303    ///     tx.send(10).unwrap();
1304    ///
1305    ///     let value = rx.try_recv().unwrap();
1306    ///     assert_eq!(10, value);
1307    /// }
1308    /// ```
1309    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1310        let guard = self.recv_ref(None)?;
1311        guard.clone_value().ok_or(TryRecvError::Closed)
1312    }
1313
1314    /// Blocking receive to call outside of asynchronous contexts.
1315    ///
1316    /// # Panics
1317    ///
1318    /// This function panics if called within an asynchronous execution
1319    /// context.
1320    ///
1321    /// # Examples
1322    /// ```
1323    /// use std::thread;
1324    /// use tokio::sync::broadcast;
1325    ///
1326    /// #[tokio::main]
1327    /// async fn main() {
1328    ///     let (tx, mut rx) = broadcast::channel(16);
1329    ///
1330    ///     let sync_code = thread::spawn(move || {
1331    ///         assert_eq!(rx.blocking_recv(), Ok(10));
1332    ///     });
1333    ///
1334    ///     let _ = tx.send(10);
1335    ///     sync_code.join().unwrap();
1336    /// }
1337    /// ```
1338    pub fn blocking_recv(&mut self) -> Result<T, RecvError> {
1339        crate::future::block_on(self.recv())
1340    }
1341}
1342
1343impl<T> Drop for Receiver<T> {
1344    fn drop(&mut self) {
1345        let mut tail = self.shared.tail.lock();
1346
1347        tail.rx_cnt -= 1;
1348        let until = tail.pos;
1349
1350        drop(tail);
1351
1352        while self.next < until {
1353            match self.recv_ref(None) {
1354                Ok(_) => {}
1355                // The channel is closed
1356                Err(TryRecvError::Closed) => break,
1357                // Ignore lagging, we will catch up
1358                Err(TryRecvError::Lagged(..)) => {}
1359                // Can't be empty
1360                Err(TryRecvError::Empty) => panic!("unexpected empty broadcast channel"),
1361            }
1362        }
1363    }
1364}
1365
1366impl<'a, T> Recv<'a, T> {
1367    fn new(receiver: &'a mut Receiver<T>) -> Recv<'a, T> {
1368        Recv {
1369            receiver,
1370            waiter: UnsafeCell::new(Waiter {
1371                queued: AtomicBool::new(false),
1372                waker: None,
1373                pointers: linked_list::Pointers::new(),
1374                _p: PhantomPinned,
1375            }),
1376        }
1377    }
1378
1379    /// A custom `project` implementation is used in place of `pin-project-lite`
1380    /// as a custom drop implementation is needed.
1381    fn project(self: Pin<&mut Self>) -> (&mut Receiver<T>, &UnsafeCell<Waiter>) {
1382        unsafe {
1383            // Safety: Receiver is Unpin
1384            is_unpin::<&mut Receiver<T>>();
1385
1386            let me = self.get_unchecked_mut();
1387            (me.receiver, &me.waiter)
1388        }
1389    }
1390}
1391
1392impl<'a, T> Future for Recv<'a, T>
1393where
1394    T: Clone,
1395{
1396    type Output = Result<T, RecvError>;
1397
1398    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
1399        ready!(crate::trace::trace_leaf(cx));
1400
1401        let (receiver, waiter) = self.project();
1402
1403        let guard = match receiver.recv_ref(Some((waiter, cx.waker()))) {
1404            Ok(value) => value,
1405            Err(TryRecvError::Empty) => return Poll::Pending,
1406            Err(TryRecvError::Lagged(n)) => return Poll::Ready(Err(RecvError::Lagged(n))),
1407            Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError::Closed)),
1408        };
1409
1410        Poll::Ready(guard.clone_value().ok_or(RecvError::Closed))
1411    }
1412}
1413
1414impl<'a, T> Drop for Recv<'a, T> {
1415    fn drop(&mut self) {
1416        // Safety: `waiter.queued` is atomic.
1417        // Acquire ordering is required to synchronize with
1418        // `Shared::notify_rx` before we drop the object.
1419        let queued = self
1420            .waiter
1421            .with(|ptr| unsafe { (*ptr).queued.load(Acquire) });
1422
1423        // If the waiter is queued, we need to unlink it from the waiters list.
1424        // If not, no further synchronization is required, since the waiter
1425        // is not in the list and, as such, is not shared with any other threads.
1426        if queued {
1427            // Acquire the tail lock. This is required for safety before accessing
1428            // the waiter node.
1429            let mut tail = self.receiver.shared.tail.lock();
1430
1431            // Safety: tail lock is held.
1432            // `Relaxed` order suffices because we hold the tail lock.
1433            let queued = self
1434                .waiter
1435                .with_mut(|ptr| unsafe { (*ptr).queued.load(Relaxed) });
1436
1437            if queued {
1438                // Remove the node
1439                //
1440                // safety: tail lock is held and the wait node is verified to be in
1441                // the list.
1442                unsafe {
1443                    self.waiter.with_mut(|ptr| {
1444                        tail.waiters.remove((&mut *ptr).into());
1445                    });
1446                }
1447            }
1448        }
1449    }
1450}
1451
1452/// # Safety
1453///
1454/// `Waiter` is forced to be !Unpin.
1455unsafe impl linked_list::Link for Waiter {
1456    type Handle = NonNull<Waiter>;
1457    type Target = Waiter;
1458
1459    fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
1460        *handle
1461    }
1462
1463    unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
1464        ptr
1465    }
1466
1467    unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
1468        Waiter::addr_of_pointers(target)
1469    }
1470}
1471
1472impl<T> fmt::Debug for Sender<T> {
1473    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1474        write!(fmt, "broadcast::Sender")
1475    }
1476}
1477
1478impl<T> fmt::Debug for Receiver<T> {
1479    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1480        write!(fmt, "broadcast::Receiver")
1481    }
1482}
1483
1484impl<'a, T> RecvGuard<'a, T> {
1485    fn clone_value(&self) -> Option<T>
1486    where
1487        T: Clone,
1488    {
1489        self.slot.val.with(|ptr| unsafe { (*ptr).clone() })
1490    }
1491}
1492
1493impl<'a, T> Drop for RecvGuard<'a, T> {
1494    fn drop(&mut self) {
1495        // Decrement the remaining counter
1496        if 1 == self.slot.rem.fetch_sub(1, SeqCst) {
1497            // Safety: Last receiver, drop the value
1498            self.slot.val.with_mut(|ptr| unsafe { *ptr = None });
1499        }
1500    }
1501}
1502
1503fn is_unpin<T: Unpin>() {}
1504
1505#[cfg(not(loom))]
1506#[cfg(test)]
1507mod tests {
1508    use super::*;
1509
1510    #[test]
1511    fn receiver_count_on_sender_constructor() {
1512        let sender = Sender::<i32>::new(16);
1513        assert_eq!(sender.receiver_count(), 0);
1514
1515        let rx_1 = sender.subscribe();
1516        assert_eq!(sender.receiver_count(), 1);
1517
1518        let rx_2 = rx_1.resubscribe();
1519        assert_eq!(sender.receiver_count(), 2);
1520
1521        let rx_3 = sender.subscribe();
1522        assert_eq!(sender.receiver_count(), 3);
1523
1524        drop(rx_3);
1525        drop(rx_1);
1526        assert_eq!(sender.receiver_count(), 1);
1527
1528        drop(rx_2);
1529        assert_eq!(sender.receiver_count(), 0);
1530    }
1531
1532    #[cfg(not(loom))]
1533    #[test]
1534    fn receiver_count_on_channel_constructor() {
1535        let (sender, rx) = channel::<i32>(16);
1536        assert_eq!(sender.receiver_count(), 1);
1537
1538        let _rx_2 = rx.resubscribe();
1539        assert_eq!(sender.receiver_count(), 2);
1540    }
1541}