tokio/sync/broadcast.rs
1//! A multi-producer, multi-consumer broadcast queue. Each sent value is seen by
2//! all consumers.
3//!
4//! A [`Sender`] is used to broadcast values to **all** connected [`Receiver`]
5//! values. [`Sender`] handles are clone-able, allowing concurrent send and
6//! receive actions. [`Sender`] and [`Receiver`] are both `Send` and `Sync` as
7//! long as `T` is `Send`.
8//!
9//! When a value is sent, **all** [`Receiver`] handles are notified and will
10//! receive the value. The value is stored once inside the channel and cloned on
11//! demand for each receiver. Once all receivers have received a clone of the
12//! value, the value is released from the channel.
13//!
14//! A channel is created by calling [`channel`], specifying the maximum number
15//! of messages the channel can retain at any given time.
16//!
17//! New [`Receiver`] handles are created by calling [`Sender::subscribe`]. The
18//! returned [`Receiver`] will receive values sent **after** the call to
19//! `subscribe`.
20//!
21//! This channel is also suitable for the single-producer multi-consumer
22//! use-case, where a single sender broadcasts values to many receivers.
23//!
24//! ## Lagging
25//!
26//! As sent messages must be retained until **all** [`Receiver`] handles receive
27//! a clone, broadcast channels are susceptible to the "slow receiver" problem.
28//! In this case, all but one receiver are able to receive values at the rate
29//! they are sent. Because one receiver is stalled, the channel starts to fill
30//! up.
31//!
32//! This broadcast channel implementation handles this case by setting a hard
33//! upper bound on the number of values the channel may retain at any given
34//! time. This upper bound is passed to the [`channel`] function as an argument.
35//!
36//! If a value is sent when the channel is at capacity, the oldest value
37//! currently held by the channel is released. This frees up space for the new
38//! value. Any receiver that has not yet seen the released value will return
39//! [`RecvError::Lagged`] the next time [`recv`] is called.
40//!
41//! Once [`RecvError::Lagged`] is returned, the lagging receiver's position is
42//! updated to the oldest value contained by the channel. The next call to
43//! [`recv`] will return this value.
44//!
45//! This behavior enables a receiver to detect when it has lagged so far behind
46//! that data has been dropped. The caller may decide how to respond to this:
47//! either by aborting its task or by tolerating lost messages and resuming
48//! consumption of the channel.
49//!
50//! ## Closing
51//!
52//! When **all** [`Sender`] handles have been dropped, no new values may be
53//! sent. At this point, the channel is "closed". Once a receiver has received
54//! all values retained by the channel, the next call to [`recv`] will return
55//! with [`RecvError::Closed`].
56//!
57//! When a [`Receiver`] handle is dropped, any messages not read by the receiver
58//! will be marked as read. If this receiver was the only one not to have read
59//! that message, the message will be dropped at this point.
60//!
61//! [`Sender`]: crate::sync::broadcast::Sender
62//! [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
63//! [`Receiver`]: crate::sync::broadcast::Receiver
64//! [`channel`]: crate::sync::broadcast::channel
65//! [`RecvError::Lagged`]: crate::sync::broadcast::error::RecvError::Lagged
66//! [`RecvError::Closed`]: crate::sync::broadcast::error::RecvError::Closed
67//! [`recv`]: crate::sync::broadcast::Receiver::recv
68//!
69//! # Examples
70//!
71//! Basic usage
72//!
73//! ```
74//! use tokio::sync::broadcast;
75//!
76//! #[tokio::main]
77//! async fn main() {
78//! let (tx, mut rx1) = broadcast::channel(16);
79//! let mut rx2 = tx.subscribe();
80//!
81//! tokio::spawn(async move {
82//! assert_eq!(rx1.recv().await.unwrap(), 10);
83//! assert_eq!(rx1.recv().await.unwrap(), 20);
84//! });
85//!
86//! tokio::spawn(async move {
87//! assert_eq!(rx2.recv().await.unwrap(), 10);
88//! assert_eq!(rx2.recv().await.unwrap(), 20);
89//! });
90//!
91//! tx.send(10).unwrap();
92//! tx.send(20).unwrap();
93//! }
94//! ```
95//!
96//! Handling lag
97//!
98//! ```
99//! use tokio::sync::broadcast;
100//!
101//! #[tokio::main]
102//! async fn main() {
103//! let (tx, mut rx) = broadcast::channel(2);
104//!
105//! tx.send(10).unwrap();
106//! tx.send(20).unwrap();
107//! tx.send(30).unwrap();
108//!
109//! // The receiver lagged behind
110//! assert!(rx.recv().await.is_err());
111//!
112//! // At this point, we can abort or continue with lost messages
113//!
114//! assert_eq!(20, rx.recv().await.unwrap());
115//! assert_eq!(30, rx.recv().await.unwrap());
116//! }
117//! ```
118
119use crate::loom::cell::UnsafeCell;
120use crate::loom::sync::atomic::{AtomicBool, AtomicUsize};
121use crate::loom::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard};
122use crate::util::linked_list::{self, GuardedLinkedList, LinkedList};
123use crate::util::WakeList;
124
125use std::fmt;
126use std::future::Future;
127use std::marker::PhantomPinned;
128use std::pin::Pin;
129use std::ptr::NonNull;
130use std::sync::atomic::Ordering::{Acquire, Relaxed, Release, SeqCst};
131use std::task::{Context, Poll, Waker};
132
133/// Sending-half of the [`broadcast`] channel.
134///
135/// May be used from many threads. Messages can be sent with
136/// [`send`][Sender::send].
137///
138/// # Examples
139///
140/// ```
141/// use tokio::sync::broadcast;
142///
143/// #[tokio::main]
144/// async fn main() {
145/// let (tx, mut rx1) = broadcast::channel(16);
146/// let mut rx2 = tx.subscribe();
147///
148/// tokio::spawn(async move {
149/// assert_eq!(rx1.recv().await.unwrap(), 10);
150/// assert_eq!(rx1.recv().await.unwrap(), 20);
151/// });
152///
153/// tokio::spawn(async move {
154/// assert_eq!(rx2.recv().await.unwrap(), 10);
155/// assert_eq!(rx2.recv().await.unwrap(), 20);
156/// });
157///
158/// tx.send(10).unwrap();
159/// tx.send(20).unwrap();
160/// }
161/// ```
162///
163/// [`broadcast`]: crate::sync::broadcast
164pub struct Sender<T> {
165 shared: Arc<Shared<T>>,
166}
167
168/// Receiving-half of the [`broadcast`] channel.
169///
170/// Must not be used concurrently. Messages may be retrieved using
171/// [`recv`][Receiver::recv].
172///
173/// To turn this receiver into a `Stream`, you can use the [`BroadcastStream`]
174/// wrapper.
175///
176/// [`BroadcastStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.BroadcastStream.html
177///
178/// # Examples
179///
180/// ```
181/// use tokio::sync::broadcast;
182///
183/// #[tokio::main]
184/// async fn main() {
185/// let (tx, mut rx1) = broadcast::channel(16);
186/// let mut rx2 = tx.subscribe();
187///
188/// tokio::spawn(async move {
189/// assert_eq!(rx1.recv().await.unwrap(), 10);
190/// assert_eq!(rx1.recv().await.unwrap(), 20);
191/// });
192///
193/// tokio::spawn(async move {
194/// assert_eq!(rx2.recv().await.unwrap(), 10);
195/// assert_eq!(rx2.recv().await.unwrap(), 20);
196/// });
197///
198/// tx.send(10).unwrap();
199/// tx.send(20).unwrap();
200/// }
201/// ```
202///
203/// [`broadcast`]: crate::sync::broadcast
204pub struct Receiver<T> {
205 /// State shared with all receivers and senders.
206 shared: Arc<Shared<T>>,
207
208 /// Next position to read from
209 next: u64,
210}
211
212pub mod error {
213 //! Broadcast error types
214
215 use std::fmt;
216
217 /// Error returned by the [`send`] function on a [`Sender`].
218 ///
219 /// A **send** operation can only fail if there are no active receivers,
220 /// implying that the message could never be received. The error contains the
221 /// message being sent as a payload so it can be recovered.
222 ///
223 /// [`send`]: crate::sync::broadcast::Sender::send
224 /// [`Sender`]: crate::sync::broadcast::Sender
225 #[derive(Debug)]
226 pub struct SendError<T>(pub T);
227
228 impl<T> fmt::Display for SendError<T> {
229 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
230 write!(f, "channel closed")
231 }
232 }
233
234 impl<T: fmt::Debug> std::error::Error for SendError<T> {}
235
236 /// An error returned from the [`recv`] function on a [`Receiver`].
237 ///
238 /// [`recv`]: crate::sync::broadcast::Receiver::recv
239 /// [`Receiver`]: crate::sync::broadcast::Receiver
240 #[derive(Debug, PartialEq, Eq, Clone)]
241 pub enum RecvError {
242 /// There are no more active senders implying no further messages will ever
243 /// be sent.
244 Closed,
245
246 /// The receiver lagged too far behind. Attempting to receive again will
247 /// return the oldest message still retained by the channel.
248 ///
249 /// Includes the number of skipped messages.
250 Lagged(u64),
251 }
252
253 impl fmt::Display for RecvError {
254 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
255 match self {
256 RecvError::Closed => write!(f, "channel closed"),
257 RecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt),
258 }
259 }
260 }
261
262 impl std::error::Error for RecvError {}
263
264 /// An error returned from the [`try_recv`] function on a [`Receiver`].
265 ///
266 /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
267 /// [`Receiver`]: crate::sync::broadcast::Receiver
268 #[derive(Debug, PartialEq, Eq, Clone)]
269 pub enum TryRecvError {
270 /// The channel is currently empty. There are still active
271 /// [`Sender`] handles, so data may yet become available.
272 ///
273 /// [`Sender`]: crate::sync::broadcast::Sender
274 Empty,
275
276 /// There are no more active senders implying no further messages will ever
277 /// be sent.
278 Closed,
279
280 /// The receiver lagged too far behind and has been forcibly disconnected.
281 /// Attempting to receive again will return the oldest message still
282 /// retained by the channel.
283 ///
284 /// Includes the number of skipped messages.
285 Lagged(u64),
286 }
287
288 impl fmt::Display for TryRecvError {
289 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
290 match self {
291 TryRecvError::Empty => write!(f, "channel empty"),
292 TryRecvError::Closed => write!(f, "channel closed"),
293 TryRecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt),
294 }
295 }
296 }
297
298 impl std::error::Error for TryRecvError {}
299}
300
301use self::error::{RecvError, SendError, TryRecvError};
302
303/// Data shared between senders and receivers.
304struct Shared<T> {
305 /// slots in the channel.
306 buffer: Box<[RwLock<Slot<T>>]>,
307
308 /// Mask a position -> index.
309 mask: usize,
310
311 /// Tail of the queue. Includes the rx wait list.
312 tail: Mutex<Tail>,
313
314 /// Number of outstanding Sender handles.
315 num_tx: AtomicUsize,
316}
317
318/// Next position to write a value.
319struct Tail {
320 /// Next position to write to.
321 pos: u64,
322
323 /// Number of active receivers.
324 rx_cnt: usize,
325
326 /// True if the channel is closed.
327 closed: bool,
328
329 /// Receivers waiting for a value.
330 waiters: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
331}
332
333/// Slot in the buffer.
334struct Slot<T> {
335 /// Remaining number of receivers that are expected to see this value.
336 ///
337 /// When this goes to zero, the value is released.
338 ///
339 /// An atomic is used as it is mutated concurrently with the slot read lock
340 /// acquired.
341 rem: AtomicUsize,
342
343 /// Uniquely identifies the `send` stored in the slot.
344 pos: u64,
345
346 /// The value being broadcast.
347 ///
348 /// The value is set by `send` when the write lock is held. When a reader
349 /// drops, `rem` is decremented. When it hits zero, the value is dropped.
350 val: UnsafeCell<Option<T>>,
351}
352
353/// An entry in the wait queue.
354struct Waiter {
355 /// True if queued.
356 queued: AtomicBool,
357
358 /// Task waiting on the broadcast channel.
359 waker: Option<Waker>,
360
361 /// Intrusive linked-list pointers.
362 pointers: linked_list::Pointers<Waiter>,
363
364 /// Should not be `Unpin`.
365 _p: PhantomPinned,
366}
367
368impl Waiter {
369 fn new() -> Self {
370 Self {
371 queued: AtomicBool::new(false),
372 waker: None,
373 pointers: linked_list::Pointers::new(),
374 _p: PhantomPinned,
375 }
376 }
377}
378
379generate_addr_of_methods! {
380 impl<> Waiter {
381 unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
382 &self.pointers
383 }
384 }
385}
386
387struct RecvGuard<'a, T> {
388 slot: RwLockReadGuard<'a, Slot<T>>,
389}
390
391/// Receive a value future.
392struct Recv<'a, T> {
393 /// Receiver being waited on.
394 receiver: &'a mut Receiver<T>,
395
396 /// Entry in the waiter `LinkedList`.
397 waiter: UnsafeCell<Waiter>,
398}
399
400unsafe impl<'a, T: Send> Send for Recv<'a, T> {}
401unsafe impl<'a, T: Send> Sync for Recv<'a, T> {}
402
403/// Max number of receivers. Reserve space to lock.
404const MAX_RECEIVERS: usize = usize::MAX >> 2;
405
406/// Create a bounded, multi-producer, multi-consumer channel where each sent
407/// value is broadcasted to all active receivers.
408///
409/// **Note:** The actual capacity may be greater than the provided `capacity`.
410///
411/// All data sent on [`Sender`] will become available on every active
412/// [`Receiver`] in the same order as it was sent.
413///
414/// The `Sender` can be cloned to `send` to the same channel from multiple
415/// points in the process or it can be used concurrently from an `Arc`. New
416/// `Receiver` handles are created by calling [`Sender::subscribe`].
417///
418/// If all [`Receiver`] handles are dropped, the `send` method will return a
419/// [`SendError`]. Similarly, if all [`Sender`] handles are dropped, the [`recv`]
420/// method will return a [`RecvError`].
421///
422/// [`Sender`]: crate::sync::broadcast::Sender
423/// [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
424/// [`Receiver`]: crate::sync::broadcast::Receiver
425/// [`recv`]: crate::sync::broadcast::Receiver::recv
426/// [`SendError`]: crate::sync::broadcast::error::SendError
427/// [`RecvError`]: crate::sync::broadcast::error::RecvError
428///
429/// # Examples
430///
431/// ```
432/// use tokio::sync::broadcast;
433///
434/// #[tokio::main]
435/// async fn main() {
436/// let (tx, mut rx1) = broadcast::channel(16);
437/// let mut rx2 = tx.subscribe();
438///
439/// tokio::spawn(async move {
440/// assert_eq!(rx1.recv().await.unwrap(), 10);
441/// assert_eq!(rx1.recv().await.unwrap(), 20);
442/// });
443///
444/// tokio::spawn(async move {
445/// assert_eq!(rx2.recv().await.unwrap(), 10);
446/// assert_eq!(rx2.recv().await.unwrap(), 20);
447/// });
448///
449/// tx.send(10).unwrap();
450/// tx.send(20).unwrap();
451/// }
452/// ```
453///
454/// # Panics
455///
456/// This will panic if `capacity` is equal to `0` or larger
457/// than `usize::MAX / 2`.
458#[track_caller]
459pub fn channel<T: Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
460 // SAFETY: In the line below we are creating one extra receiver, so there will be 1 in total.
461 let tx = unsafe { Sender::new_with_receiver_count(1, capacity) };
462 let rx = Receiver {
463 shared: tx.shared.clone(),
464 next: 0,
465 };
466 (tx, rx)
467}
468
469unsafe impl<T: Send> Send for Sender<T> {}
470unsafe impl<T: Send> Sync for Sender<T> {}
471
472unsafe impl<T: Send> Send for Receiver<T> {}
473unsafe impl<T: Send> Sync for Receiver<T> {}
474
475impl<T> Sender<T> {
476 /// Creates the sending-half of the [`broadcast`] channel.
477 ///
478 /// See the documentation of [`broadcast::channel`] for more information on this method.
479 ///
480 /// [`broadcast`]: crate::sync::broadcast
481 /// [`broadcast::channel`]: crate::sync::broadcast::channel
482 #[track_caller]
483 pub fn new(capacity: usize) -> Self {
484 // SAFETY: We don't create extra receivers, so there are 0.
485 unsafe { Self::new_with_receiver_count(0, capacity) }
486 }
487
488 /// Creates the sending-half of the [`broadcast`](self) channel, and provide the receiver
489 /// count.
490 ///
491 /// See the documentation of [`broadcast::channel`](self::channel) for more errors when
492 /// calling this function.
493 ///
494 /// # Safety:
495 ///
496 /// The caller must ensure that the amount of receivers for this Sender is correct before
497 /// the channel functionalities are used, the count is zero by default, as this function
498 /// does not create any receivers by itself.
499 #[track_caller]
500 unsafe fn new_with_receiver_count(receiver_count: usize, mut capacity: usize) -> Self {
501 assert!(capacity > 0, "broadcast channel capacity cannot be zero");
502 assert!(
503 capacity <= usize::MAX >> 1,
504 "broadcast channel capacity exceeded `usize::MAX / 2`"
505 );
506
507 // Round to a power of two
508 capacity = capacity.next_power_of_two();
509
510 let mut buffer = Vec::with_capacity(capacity);
511
512 for i in 0..capacity {
513 buffer.push(RwLock::new(Slot {
514 rem: AtomicUsize::new(0),
515 pos: (i as u64).wrapping_sub(capacity as u64),
516 val: UnsafeCell::new(None),
517 }));
518 }
519
520 let shared = Arc::new(Shared {
521 buffer: buffer.into_boxed_slice(),
522 mask: capacity - 1,
523 tail: Mutex::new(Tail {
524 pos: 0,
525 rx_cnt: receiver_count,
526 closed: false,
527 waiters: LinkedList::new(),
528 }),
529 num_tx: AtomicUsize::new(1),
530 });
531
532 Sender { shared }
533 }
534
535 /// Attempts to send a value to all active [`Receiver`] handles, returning
536 /// it back if it could not be sent.
537 ///
538 /// A successful send occurs when there is at least one active [`Receiver`]
539 /// handle. An unsuccessful send would be one where all associated
540 /// [`Receiver`] handles have already been dropped.
541 ///
542 /// # Return
543 ///
544 /// On success, the number of subscribed [`Receiver`] handles is returned.
545 /// This does not mean that this number of receivers will see the message as
546 /// a receiver may drop or lag ([see lagging](self#lagging)) before receiving
547 /// the message.
548 ///
549 /// # Note
550 ///
551 /// A return value of `Ok` **does not** mean that the sent value will be
552 /// observed by all or any of the active [`Receiver`] handles. [`Receiver`]
553 /// handles may be dropped before receiving the sent message.
554 ///
555 /// A return value of `Err` **does not** mean that future calls to `send`
556 /// will fail. New [`Receiver`] handles may be created by calling
557 /// [`subscribe`].
558 ///
559 /// [`Receiver`]: crate::sync::broadcast::Receiver
560 /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
561 ///
562 /// # Examples
563 ///
564 /// ```
565 /// use tokio::sync::broadcast;
566 ///
567 /// #[tokio::main]
568 /// async fn main() {
569 /// let (tx, mut rx1) = broadcast::channel(16);
570 /// let mut rx2 = tx.subscribe();
571 ///
572 /// tokio::spawn(async move {
573 /// assert_eq!(rx1.recv().await.unwrap(), 10);
574 /// assert_eq!(rx1.recv().await.unwrap(), 20);
575 /// });
576 ///
577 /// tokio::spawn(async move {
578 /// assert_eq!(rx2.recv().await.unwrap(), 10);
579 /// assert_eq!(rx2.recv().await.unwrap(), 20);
580 /// });
581 ///
582 /// tx.send(10).unwrap();
583 /// tx.send(20).unwrap();
584 /// }
585 /// ```
586 pub fn send(&self, value: T) -> Result<usize, SendError<T>> {
587 let mut tail = self.shared.tail.lock();
588
589 if tail.rx_cnt == 0 {
590 return Err(SendError(value));
591 }
592
593 // Position to write into
594 let pos = tail.pos;
595 let rem = tail.rx_cnt;
596 let idx = (pos & self.shared.mask as u64) as usize;
597
598 // Update the tail position
599 tail.pos = tail.pos.wrapping_add(1);
600
601 // Get the slot
602 let mut slot = self.shared.buffer[idx].write().unwrap();
603
604 // Track the position
605 slot.pos = pos;
606
607 // Set remaining receivers
608 slot.rem.with_mut(|v| *v = rem);
609
610 // Write the value
611 slot.val = UnsafeCell::new(Some(value));
612
613 // Release the slot lock before notifying the receivers.
614 drop(slot);
615
616 // Notify and release the mutex. This must happen after the slot lock is
617 // released, otherwise the writer lock bit could be cleared while another
618 // thread is in the critical section.
619 self.shared.notify_rx(tail);
620
621 Ok(rem)
622 }
623
624 /// Creates a new [`Receiver`] handle that will receive values sent **after**
625 /// this call to `subscribe`.
626 ///
627 /// # Examples
628 ///
629 /// ```
630 /// use tokio::sync::broadcast;
631 ///
632 /// #[tokio::main]
633 /// async fn main() {
634 /// let (tx, _rx) = broadcast::channel(16);
635 ///
636 /// // Will not be seen
637 /// tx.send(10).unwrap();
638 ///
639 /// let mut rx = tx.subscribe();
640 ///
641 /// tx.send(20).unwrap();
642 ///
643 /// let value = rx.recv().await.unwrap();
644 /// assert_eq!(20, value);
645 /// }
646 /// ```
647 pub fn subscribe(&self) -> Receiver<T> {
648 let shared = self.shared.clone();
649 new_receiver(shared)
650 }
651
652 /// Returns the number of queued values.
653 ///
654 /// A value is queued until it has either been seen by all receivers that were alive at the time
655 /// it was sent, or has been evicted from the queue by subsequent sends that exceeded the
656 /// queue's capacity.
657 ///
658 /// # Note
659 ///
660 /// In contrast to [`Receiver::len`], this method only reports queued values and not values that
661 /// have been evicted from the queue before being seen by all receivers.
662 ///
663 /// # Examples
664 ///
665 /// ```
666 /// use tokio::sync::broadcast;
667 ///
668 /// #[tokio::main]
669 /// async fn main() {
670 /// let (tx, mut rx1) = broadcast::channel(16);
671 /// let mut rx2 = tx.subscribe();
672 ///
673 /// tx.send(10).unwrap();
674 /// tx.send(20).unwrap();
675 /// tx.send(30).unwrap();
676 ///
677 /// assert_eq!(tx.len(), 3);
678 ///
679 /// rx1.recv().await.unwrap();
680 ///
681 /// // The len is still 3 since rx2 hasn't seen the first value yet.
682 /// assert_eq!(tx.len(), 3);
683 ///
684 /// rx2.recv().await.unwrap();
685 ///
686 /// assert_eq!(tx.len(), 2);
687 /// }
688 /// ```
689 pub fn len(&self) -> usize {
690 let tail = self.shared.tail.lock();
691
692 let base_idx = (tail.pos & self.shared.mask as u64) as usize;
693 let mut low = 0;
694 let mut high = self.shared.buffer.len();
695 while low < high {
696 let mid = low + (high - low) / 2;
697 let idx = base_idx.wrapping_add(mid) & self.shared.mask;
698 if self.shared.buffer[idx].read().unwrap().rem.load(SeqCst) == 0 {
699 low = mid + 1;
700 } else {
701 high = mid;
702 }
703 }
704
705 self.shared.buffer.len() - low
706 }
707
708 /// Returns true if there are no queued values.
709 ///
710 /// # Examples
711 ///
712 /// ```
713 /// use tokio::sync::broadcast;
714 ///
715 /// #[tokio::main]
716 /// async fn main() {
717 /// let (tx, mut rx1) = broadcast::channel(16);
718 /// let mut rx2 = tx.subscribe();
719 ///
720 /// assert!(tx.is_empty());
721 ///
722 /// tx.send(10).unwrap();
723 ///
724 /// assert!(!tx.is_empty());
725 ///
726 /// rx1.recv().await.unwrap();
727 ///
728 /// // The queue is still not empty since rx2 hasn't seen the value.
729 /// assert!(!tx.is_empty());
730 ///
731 /// rx2.recv().await.unwrap();
732 ///
733 /// assert!(tx.is_empty());
734 /// }
735 /// ```
736 pub fn is_empty(&self) -> bool {
737 let tail = self.shared.tail.lock();
738
739 let idx = (tail.pos.wrapping_sub(1) & self.shared.mask as u64) as usize;
740 self.shared.buffer[idx].read().unwrap().rem.load(SeqCst) == 0
741 }
742
743 /// Returns the number of active receivers.
744 ///
745 /// An active receiver is a [`Receiver`] handle returned from [`channel`] or
746 /// [`subscribe`]. These are the handles that will receive values sent on
747 /// this [`Sender`].
748 ///
749 /// # Note
750 ///
751 /// It is not guaranteed that a sent message will reach this number of
752 /// receivers. Active receivers may never call [`recv`] again before
753 /// dropping.
754 ///
755 /// [`recv`]: crate::sync::broadcast::Receiver::recv
756 /// [`Receiver`]: crate::sync::broadcast::Receiver
757 /// [`Sender`]: crate::sync::broadcast::Sender
758 /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
759 /// [`channel`]: crate::sync::broadcast::channel
760 ///
761 /// # Examples
762 ///
763 /// ```
764 /// use tokio::sync::broadcast;
765 ///
766 /// #[tokio::main]
767 /// async fn main() {
768 /// let (tx, _rx1) = broadcast::channel(16);
769 ///
770 /// assert_eq!(1, tx.receiver_count());
771 ///
772 /// let mut _rx2 = tx.subscribe();
773 ///
774 /// assert_eq!(2, tx.receiver_count());
775 ///
776 /// tx.send(10).unwrap();
777 /// }
778 /// ```
779 pub fn receiver_count(&self) -> usize {
780 let tail = self.shared.tail.lock();
781 tail.rx_cnt
782 }
783
784 /// Returns `true` if senders belong to the same channel.
785 ///
786 /// # Examples
787 ///
788 /// ```
789 /// use tokio::sync::broadcast;
790 ///
791 /// #[tokio::main]
792 /// async fn main() {
793 /// let (tx, _rx) = broadcast::channel::<()>(16);
794 /// let tx2 = tx.clone();
795 ///
796 /// assert!(tx.same_channel(&tx2));
797 ///
798 /// let (tx3, _rx3) = broadcast::channel::<()>(16);
799 ///
800 /// assert!(!tx3.same_channel(&tx2));
801 /// }
802 /// ```
803 pub fn same_channel(&self, other: &Self) -> bool {
804 Arc::ptr_eq(&self.shared, &other.shared)
805 }
806
807 fn close_channel(&self) {
808 let mut tail = self.shared.tail.lock();
809 tail.closed = true;
810
811 self.shared.notify_rx(tail);
812 }
813}
814
815/// Create a new `Receiver` which reads starting from the tail.
816fn new_receiver<T>(shared: Arc<Shared<T>>) -> Receiver<T> {
817 let mut tail = shared.tail.lock();
818
819 assert!(tail.rx_cnt != MAX_RECEIVERS, "max receivers");
820
821 tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow");
822
823 let next = tail.pos;
824
825 drop(tail);
826
827 Receiver { shared, next }
828}
829
830/// List used in `Shared::notify_rx`. It wraps a guarded linked list
831/// and gates the access to it on the `Shared.tail` mutex. It also empties
832/// the list on drop.
833struct WaitersList<'a, T> {
834 list: GuardedLinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
835 is_empty: bool,
836 shared: &'a Shared<T>,
837}
838
839impl<'a, T> Drop for WaitersList<'a, T> {
840 fn drop(&mut self) {
841 // If the list is not empty, we unlink all waiters from it.
842 // We do not wake the waiters to avoid double panics.
843 if !self.is_empty {
844 let _lock_guard = self.shared.tail.lock();
845 while self.list.pop_back().is_some() {}
846 }
847 }
848}
849
850impl<'a, T> WaitersList<'a, T> {
851 fn new(
852 unguarded_list: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
853 guard: Pin<&'a Waiter>,
854 shared: &'a Shared<T>,
855 ) -> Self {
856 let guard_ptr = NonNull::from(guard.get_ref());
857 let list = unguarded_list.into_guarded(guard_ptr);
858 WaitersList {
859 list,
860 is_empty: false,
861 shared,
862 }
863 }
864
865 /// Removes the last element from the guarded list. Modifying this list
866 /// requires an exclusive access to the main list in `Notify`.
867 fn pop_back_locked(&mut self, _tail: &mut Tail) -> Option<NonNull<Waiter>> {
868 let result = self.list.pop_back();
869 if result.is_none() {
870 // Save information about emptiness to avoid waiting for lock
871 // in the destructor.
872 self.is_empty = true;
873 }
874 result
875 }
876}
877
878impl<T> Shared<T> {
879 fn notify_rx<'a, 'b: 'a>(&'b self, mut tail: MutexGuard<'a, Tail>) {
880 // It is critical for `GuardedLinkedList` safety that the guard node is
881 // pinned in memory and is not dropped until the guarded list is dropped.
882 let guard = Waiter::new();
883 pin!(guard);
884
885 // We move all waiters to a secondary list. It uses a `GuardedLinkedList`
886 // underneath to allow every waiter to safely remove itself from it.
887 //
888 // * This list will be still guarded by the `waiters` lock.
889 // `NotifyWaitersList` wrapper makes sure we hold the lock to modify it.
890 // * This wrapper will empty the list on drop. It is critical for safety
891 // that we will not leave any list entry with a pointer to the local
892 // guard node after this function returns / panics.
893 let mut list = WaitersList::new(std::mem::take(&mut tail.waiters), guard.as_ref(), self);
894
895 let mut wakers = WakeList::new();
896 'outer: loop {
897 while wakers.can_push() {
898 match list.pop_back_locked(&mut tail) {
899 Some(waiter) => {
900 unsafe {
901 // Safety: accessing `waker` is safe because
902 // the tail lock is held.
903 if let Some(waker) = (*waiter.as_ptr()).waker.take() {
904 wakers.push(waker);
905 }
906
907 // Safety: `queued` is atomic.
908 let queued = &(*waiter.as_ptr()).queued;
909 // `Relaxed` suffices because the tail lock is held.
910 assert!(queued.load(Relaxed));
911 // `Release` is needed to synchronize with `Recv::drop`.
912 // It is critical to set this variable **after** waker
913 // is extracted, otherwise we may data race with `Recv::drop`.
914 queued.store(false, Release);
915 }
916 }
917 None => {
918 break 'outer;
919 }
920 }
921 }
922
923 // Release the lock before waking.
924 drop(tail);
925
926 // Before we acquire the lock again all sorts of things can happen:
927 // some waiters may remove themselves from the list and new waiters
928 // may be added. This is fine since at worst we will unnecessarily
929 // wake up waiters which will then queue themselves again.
930
931 wakers.wake_all();
932
933 // Acquire the lock again.
934 tail = self.tail.lock();
935 }
936
937 // Release the lock before waking.
938 drop(tail);
939
940 wakers.wake_all();
941 }
942}
943
944impl<T> Clone for Sender<T> {
945 fn clone(&self) -> Sender<T> {
946 let shared = self.shared.clone();
947 shared.num_tx.fetch_add(1, SeqCst);
948
949 Sender { shared }
950 }
951}
952
953impl<T> Drop for Sender<T> {
954 fn drop(&mut self) {
955 if 1 == self.shared.num_tx.fetch_sub(1, SeqCst) {
956 self.close_channel();
957 }
958 }
959}
960
961impl<T> Receiver<T> {
962 /// Returns the number of messages that were sent into the channel and that
963 /// this [`Receiver`] has yet to receive.
964 ///
965 /// If the returned value from `len` is larger than the next largest power of 2
966 /// of the capacity of the channel any call to [`recv`] will return an
967 /// `Err(RecvError::Lagged)` and any call to [`try_recv`] will return an
968 /// `Err(TryRecvError::Lagged)`, e.g. if the capacity of the channel is 10,
969 /// [`recv`] will start to return `Err(RecvError::Lagged)` once `len` returns
970 /// values larger than 16.
971 ///
972 /// [`Receiver`]: crate::sync::broadcast::Receiver
973 /// [`recv`]: crate::sync::broadcast::Receiver::recv
974 /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
975 ///
976 /// # Examples
977 ///
978 /// ```
979 /// use tokio::sync::broadcast;
980 ///
981 /// #[tokio::main]
982 /// async fn main() {
983 /// let (tx, mut rx1) = broadcast::channel(16);
984 ///
985 /// tx.send(10).unwrap();
986 /// tx.send(20).unwrap();
987 ///
988 /// assert_eq!(rx1.len(), 2);
989 /// assert_eq!(rx1.recv().await.unwrap(), 10);
990 /// assert_eq!(rx1.len(), 1);
991 /// assert_eq!(rx1.recv().await.unwrap(), 20);
992 /// assert_eq!(rx1.len(), 0);
993 /// }
994 /// ```
995 pub fn len(&self) -> usize {
996 let next_send_pos = self.shared.tail.lock().pos;
997 (next_send_pos - self.next) as usize
998 }
999
1000 /// Returns true if there aren't any messages in the channel that the [`Receiver`]
1001 /// has yet to receive.
1002 ///
1003 /// [`Receiver]: create::sync::broadcast::Receiver
1004 ///
1005 /// # Examples
1006 ///
1007 /// ```
1008 /// use tokio::sync::broadcast;
1009 ///
1010 /// #[tokio::main]
1011 /// async fn main() {
1012 /// let (tx, mut rx1) = broadcast::channel(16);
1013 ///
1014 /// assert!(rx1.is_empty());
1015 ///
1016 /// tx.send(10).unwrap();
1017 /// tx.send(20).unwrap();
1018 ///
1019 /// assert!(!rx1.is_empty());
1020 /// assert_eq!(rx1.recv().await.unwrap(), 10);
1021 /// assert_eq!(rx1.recv().await.unwrap(), 20);
1022 /// assert!(rx1.is_empty());
1023 /// }
1024 /// ```
1025 pub fn is_empty(&self) -> bool {
1026 self.len() == 0
1027 }
1028
1029 /// Returns `true` if receivers belong to the same channel.
1030 ///
1031 /// # Examples
1032 ///
1033 /// ```
1034 /// use tokio::sync::broadcast;
1035 ///
1036 /// #[tokio::main]
1037 /// async fn main() {
1038 /// let (tx, rx) = broadcast::channel::<()>(16);
1039 /// let rx2 = tx.subscribe();
1040 ///
1041 /// assert!(rx.same_channel(&rx2));
1042 ///
1043 /// let (_tx3, rx3) = broadcast::channel::<()>(16);
1044 ///
1045 /// assert!(!rx3.same_channel(&rx2));
1046 /// }
1047 /// ```
1048 pub fn same_channel(&self, other: &Self) -> bool {
1049 Arc::ptr_eq(&self.shared, &other.shared)
1050 }
1051
1052 /// Locks the next value if there is one.
1053 fn recv_ref(
1054 &mut self,
1055 waiter: Option<(&UnsafeCell<Waiter>, &Waker)>,
1056 ) -> Result<RecvGuard<'_, T>, TryRecvError> {
1057 let idx = (self.next & self.shared.mask as u64) as usize;
1058
1059 // The slot holding the next value to read
1060 let mut slot = self.shared.buffer[idx].read().unwrap();
1061
1062 if slot.pos != self.next {
1063 // Release the `slot` lock before attempting to acquire the `tail`
1064 // lock. This is required because `send2` acquires the tail lock
1065 // first followed by the slot lock. Acquiring the locks in reverse
1066 // order here would result in a potential deadlock: `recv_ref`
1067 // acquires the `slot` lock and attempts to acquire the `tail` lock
1068 // while `send2` acquired the `tail` lock and attempts to acquire
1069 // the slot lock.
1070 drop(slot);
1071
1072 let mut old_waker = None;
1073
1074 let mut tail = self.shared.tail.lock();
1075
1076 // Acquire slot lock again
1077 slot = self.shared.buffer[idx].read().unwrap();
1078
1079 // Make sure the position did not change. This could happen in the
1080 // unlikely event that the buffer is wrapped between dropping the
1081 // read lock and acquiring the tail lock.
1082 if slot.pos != self.next {
1083 let next_pos = slot.pos.wrapping_add(self.shared.buffer.len() as u64);
1084
1085 if next_pos == self.next {
1086 // At this point the channel is empty for *this* receiver. If
1087 // it's been closed, then that's what we return, otherwise we
1088 // set a waker and return empty.
1089 if tail.closed {
1090 return Err(TryRecvError::Closed);
1091 }
1092
1093 // Store the waker
1094 if let Some((waiter, waker)) = waiter {
1095 // Safety: called while locked.
1096 unsafe {
1097 // Only queue if not already queued
1098 waiter.with_mut(|ptr| {
1099 // If there is no waker **or** if the currently
1100 // stored waker references a **different** task,
1101 // track the tasks' waker to be notified on
1102 // receipt of a new value.
1103 match (*ptr).waker {
1104 Some(ref w) if w.will_wake(waker) => {}
1105 _ => {
1106 old_waker = std::mem::replace(
1107 &mut (*ptr).waker,
1108 Some(waker.clone()),
1109 );
1110 }
1111 }
1112
1113 // If the waiter is not already queued, enqueue it.
1114 // `Relaxed` order suffices: we have synchronized with
1115 // all writers through the tail lock that we hold.
1116 if !(*ptr).queued.load(Relaxed) {
1117 // `Relaxed` order suffices: all the readers will
1118 // synchronize with this write through the tail lock.
1119 (*ptr).queued.store(true, Relaxed);
1120 tail.waiters.push_front(NonNull::new_unchecked(&mut *ptr));
1121 }
1122 });
1123 }
1124 }
1125
1126 // Drop the old waker after releasing the locks.
1127 drop(slot);
1128 drop(tail);
1129 drop(old_waker);
1130
1131 return Err(TryRecvError::Empty);
1132 }
1133
1134 // At this point, the receiver has lagged behind the sender by
1135 // more than the channel capacity. The receiver will attempt to
1136 // catch up by skipping dropped messages and setting the
1137 // internal cursor to the **oldest** message stored by the
1138 // channel.
1139 let next = tail.pos.wrapping_sub(self.shared.buffer.len() as u64);
1140
1141 let missed = next.wrapping_sub(self.next);
1142
1143 drop(tail);
1144
1145 // The receiver is slow but no values have been missed
1146 if missed == 0 {
1147 self.next = self.next.wrapping_add(1);
1148
1149 return Ok(RecvGuard { slot });
1150 }
1151
1152 self.next = next;
1153
1154 return Err(TryRecvError::Lagged(missed));
1155 }
1156 }
1157
1158 self.next = self.next.wrapping_add(1);
1159
1160 Ok(RecvGuard { slot })
1161 }
1162}
1163
1164impl<T: Clone> Receiver<T> {
1165 /// Re-subscribes to the channel starting from the current tail element.
1166 ///
1167 /// This [`Receiver`] handle will receive a clone of all values sent
1168 /// **after** it has resubscribed. This will not include elements that are
1169 /// in the queue of the current receiver. Consider the following example.
1170 ///
1171 /// # Examples
1172 ///
1173 /// ```
1174 /// use tokio::sync::broadcast;
1175 ///
1176 /// #[tokio::main]
1177 /// async fn main() {
1178 /// let (tx, mut rx) = broadcast::channel(2);
1179 ///
1180 /// tx.send(1).unwrap();
1181 /// let mut rx2 = rx.resubscribe();
1182 /// tx.send(2).unwrap();
1183 ///
1184 /// assert_eq!(rx2.recv().await.unwrap(), 2);
1185 /// assert_eq!(rx.recv().await.unwrap(), 1);
1186 /// }
1187 /// ```
1188 pub fn resubscribe(&self) -> Self {
1189 let shared = self.shared.clone();
1190 new_receiver(shared)
1191 }
1192 /// Receives the next value for this receiver.
1193 ///
1194 /// Each [`Receiver`] handle will receive a clone of all values sent
1195 /// **after** it has subscribed.
1196 ///
1197 /// `Err(RecvError::Closed)` is returned when all `Sender` halves have
1198 /// dropped, indicating that no further values can be sent on the channel.
1199 ///
1200 /// If the [`Receiver`] handle falls behind, once the channel is full, newly
1201 /// sent values will overwrite old values. At this point, a call to [`recv`]
1202 /// will return with `Err(RecvError::Lagged)` and the [`Receiver`]'s
1203 /// internal cursor is updated to point to the oldest value still held by
1204 /// the channel. A subsequent call to [`recv`] will return this value
1205 /// **unless** it has been since overwritten.
1206 ///
1207 /// # Cancel safety
1208 ///
1209 /// This method is cancel safe. If `recv` is used as the event in a
1210 /// [`tokio::select!`](crate::select) statement and some other branch
1211 /// completes first, it is guaranteed that no messages were received on this
1212 /// channel.
1213 ///
1214 /// [`Receiver`]: crate::sync::broadcast::Receiver
1215 /// [`recv`]: crate::sync::broadcast::Receiver::recv
1216 ///
1217 /// # Examples
1218 ///
1219 /// ```
1220 /// use tokio::sync::broadcast;
1221 ///
1222 /// #[tokio::main]
1223 /// async fn main() {
1224 /// let (tx, mut rx1) = broadcast::channel(16);
1225 /// let mut rx2 = tx.subscribe();
1226 ///
1227 /// tokio::spawn(async move {
1228 /// assert_eq!(rx1.recv().await.unwrap(), 10);
1229 /// assert_eq!(rx1.recv().await.unwrap(), 20);
1230 /// });
1231 ///
1232 /// tokio::spawn(async move {
1233 /// assert_eq!(rx2.recv().await.unwrap(), 10);
1234 /// assert_eq!(rx2.recv().await.unwrap(), 20);
1235 /// });
1236 ///
1237 /// tx.send(10).unwrap();
1238 /// tx.send(20).unwrap();
1239 /// }
1240 /// ```
1241 ///
1242 /// Handling lag
1243 ///
1244 /// ```
1245 /// use tokio::sync::broadcast;
1246 ///
1247 /// #[tokio::main]
1248 /// async fn main() {
1249 /// let (tx, mut rx) = broadcast::channel(2);
1250 ///
1251 /// tx.send(10).unwrap();
1252 /// tx.send(20).unwrap();
1253 /// tx.send(30).unwrap();
1254 ///
1255 /// // The receiver lagged behind
1256 /// assert!(rx.recv().await.is_err());
1257 ///
1258 /// // At this point, we can abort or continue with lost messages
1259 ///
1260 /// assert_eq!(20, rx.recv().await.unwrap());
1261 /// assert_eq!(30, rx.recv().await.unwrap());
1262 /// }
1263 /// ```
1264 pub async fn recv(&mut self) -> Result<T, RecvError> {
1265 let fut = Recv::new(self);
1266 fut.await
1267 }
1268
1269 /// Attempts to return a pending value on this receiver without awaiting.
1270 ///
1271 /// This is useful for a flavor of "optimistic check" before deciding to
1272 /// await on a receiver.
1273 ///
1274 /// Compared with [`recv`], this function has three failure cases instead of two
1275 /// (one for closed, one for an empty buffer, one for a lagging receiver).
1276 ///
1277 /// `Err(TryRecvError::Closed)` is returned when all `Sender` halves have
1278 /// dropped, indicating that no further values can be sent on the channel.
1279 ///
1280 /// If the [`Receiver`] handle falls behind, once the channel is full, newly
1281 /// sent values will overwrite old values. At this point, a call to [`recv`]
1282 /// will return with `Err(TryRecvError::Lagged)` and the [`Receiver`]'s
1283 /// internal cursor is updated to point to the oldest value still held by
1284 /// the channel. A subsequent call to [`try_recv`] will return this value
1285 /// **unless** it has been since overwritten. If there are no values to
1286 /// receive, `Err(TryRecvError::Empty)` is returned.
1287 ///
1288 /// [`recv`]: crate::sync::broadcast::Receiver::recv
1289 /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
1290 /// [`Receiver`]: crate::sync::broadcast::Receiver
1291 ///
1292 /// # Examples
1293 ///
1294 /// ```
1295 /// use tokio::sync::broadcast;
1296 ///
1297 /// #[tokio::main]
1298 /// async fn main() {
1299 /// let (tx, mut rx) = broadcast::channel(16);
1300 ///
1301 /// assert!(rx.try_recv().is_err());
1302 ///
1303 /// tx.send(10).unwrap();
1304 ///
1305 /// let value = rx.try_recv().unwrap();
1306 /// assert_eq!(10, value);
1307 /// }
1308 /// ```
1309 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1310 let guard = self.recv_ref(None)?;
1311 guard.clone_value().ok_or(TryRecvError::Closed)
1312 }
1313
1314 /// Blocking receive to call outside of asynchronous contexts.
1315 ///
1316 /// # Panics
1317 ///
1318 /// This function panics if called within an asynchronous execution
1319 /// context.
1320 ///
1321 /// # Examples
1322 /// ```
1323 /// use std::thread;
1324 /// use tokio::sync::broadcast;
1325 ///
1326 /// #[tokio::main]
1327 /// async fn main() {
1328 /// let (tx, mut rx) = broadcast::channel(16);
1329 ///
1330 /// let sync_code = thread::spawn(move || {
1331 /// assert_eq!(rx.blocking_recv(), Ok(10));
1332 /// });
1333 ///
1334 /// let _ = tx.send(10);
1335 /// sync_code.join().unwrap();
1336 /// }
1337 /// ```
1338 pub fn blocking_recv(&mut self) -> Result<T, RecvError> {
1339 crate::future::block_on(self.recv())
1340 }
1341}
1342
1343impl<T> Drop for Receiver<T> {
1344 fn drop(&mut self) {
1345 let mut tail = self.shared.tail.lock();
1346
1347 tail.rx_cnt -= 1;
1348 let until = tail.pos;
1349
1350 drop(tail);
1351
1352 while self.next < until {
1353 match self.recv_ref(None) {
1354 Ok(_) => {}
1355 // The channel is closed
1356 Err(TryRecvError::Closed) => break,
1357 // Ignore lagging, we will catch up
1358 Err(TryRecvError::Lagged(..)) => {}
1359 // Can't be empty
1360 Err(TryRecvError::Empty) => panic!("unexpected empty broadcast channel"),
1361 }
1362 }
1363 }
1364}
1365
1366impl<'a, T> Recv<'a, T> {
1367 fn new(receiver: &'a mut Receiver<T>) -> Recv<'a, T> {
1368 Recv {
1369 receiver,
1370 waiter: UnsafeCell::new(Waiter {
1371 queued: AtomicBool::new(false),
1372 waker: None,
1373 pointers: linked_list::Pointers::new(),
1374 _p: PhantomPinned,
1375 }),
1376 }
1377 }
1378
1379 /// A custom `project` implementation is used in place of `pin-project-lite`
1380 /// as a custom drop implementation is needed.
1381 fn project(self: Pin<&mut Self>) -> (&mut Receiver<T>, &UnsafeCell<Waiter>) {
1382 unsafe {
1383 // Safety: Receiver is Unpin
1384 is_unpin::<&mut Receiver<T>>();
1385
1386 let me = self.get_unchecked_mut();
1387 (me.receiver, &me.waiter)
1388 }
1389 }
1390}
1391
1392impl<'a, T> Future for Recv<'a, T>
1393where
1394 T: Clone,
1395{
1396 type Output = Result<T, RecvError>;
1397
1398 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
1399 ready!(crate::trace::trace_leaf(cx));
1400
1401 let (receiver, waiter) = self.project();
1402
1403 let guard = match receiver.recv_ref(Some((waiter, cx.waker()))) {
1404 Ok(value) => value,
1405 Err(TryRecvError::Empty) => return Poll::Pending,
1406 Err(TryRecvError::Lagged(n)) => return Poll::Ready(Err(RecvError::Lagged(n))),
1407 Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError::Closed)),
1408 };
1409
1410 Poll::Ready(guard.clone_value().ok_or(RecvError::Closed))
1411 }
1412}
1413
1414impl<'a, T> Drop for Recv<'a, T> {
1415 fn drop(&mut self) {
1416 // Safety: `waiter.queued` is atomic.
1417 // Acquire ordering is required to synchronize with
1418 // `Shared::notify_rx` before we drop the object.
1419 let queued = self
1420 .waiter
1421 .with(|ptr| unsafe { (*ptr).queued.load(Acquire) });
1422
1423 // If the waiter is queued, we need to unlink it from the waiters list.
1424 // If not, no further synchronization is required, since the waiter
1425 // is not in the list and, as such, is not shared with any other threads.
1426 if queued {
1427 // Acquire the tail lock. This is required for safety before accessing
1428 // the waiter node.
1429 let mut tail = self.receiver.shared.tail.lock();
1430
1431 // Safety: tail lock is held.
1432 // `Relaxed` order suffices because we hold the tail lock.
1433 let queued = self
1434 .waiter
1435 .with_mut(|ptr| unsafe { (*ptr).queued.load(Relaxed) });
1436
1437 if queued {
1438 // Remove the node
1439 //
1440 // safety: tail lock is held and the wait node is verified to be in
1441 // the list.
1442 unsafe {
1443 self.waiter.with_mut(|ptr| {
1444 tail.waiters.remove((&mut *ptr).into());
1445 });
1446 }
1447 }
1448 }
1449 }
1450}
1451
1452/// # Safety
1453///
1454/// `Waiter` is forced to be !Unpin.
1455unsafe impl linked_list::Link for Waiter {
1456 type Handle = NonNull<Waiter>;
1457 type Target = Waiter;
1458
1459 fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
1460 *handle
1461 }
1462
1463 unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
1464 ptr
1465 }
1466
1467 unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
1468 Waiter::addr_of_pointers(target)
1469 }
1470}
1471
1472impl<T> fmt::Debug for Sender<T> {
1473 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1474 write!(fmt, "broadcast::Sender")
1475 }
1476}
1477
1478impl<T> fmt::Debug for Receiver<T> {
1479 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1480 write!(fmt, "broadcast::Receiver")
1481 }
1482}
1483
1484impl<'a, T> RecvGuard<'a, T> {
1485 fn clone_value(&self) -> Option<T>
1486 where
1487 T: Clone,
1488 {
1489 self.slot.val.with(|ptr| unsafe { (*ptr).clone() })
1490 }
1491}
1492
1493impl<'a, T> Drop for RecvGuard<'a, T> {
1494 fn drop(&mut self) {
1495 // Decrement the remaining counter
1496 if 1 == self.slot.rem.fetch_sub(1, SeqCst) {
1497 // Safety: Last receiver, drop the value
1498 self.slot.val.with_mut(|ptr| unsafe { *ptr = None });
1499 }
1500 }
1501}
1502
1503fn is_unpin<T: Unpin>() {}
1504
1505#[cfg(not(loom))]
1506#[cfg(test)]
1507mod tests {
1508 use super::*;
1509
1510 #[test]
1511 fn receiver_count_on_sender_constructor() {
1512 let sender = Sender::<i32>::new(16);
1513 assert_eq!(sender.receiver_count(), 0);
1514
1515 let rx_1 = sender.subscribe();
1516 assert_eq!(sender.receiver_count(), 1);
1517
1518 let rx_2 = rx_1.resubscribe();
1519 assert_eq!(sender.receiver_count(), 2);
1520
1521 let rx_3 = sender.subscribe();
1522 assert_eq!(sender.receiver_count(), 3);
1523
1524 drop(rx_3);
1525 drop(rx_1);
1526 assert_eq!(sender.receiver_count(), 1);
1527
1528 drop(rx_2);
1529 assert_eq!(sender.receiver_count(), 0);
1530 }
1531
1532 #[cfg(not(loom))]
1533 #[test]
1534 fn receiver_count_on_channel_constructor() {
1535 let (sender, rx) = channel::<i32>(16);
1536 assert_eq!(sender.receiver_count(), 1);
1537
1538 let _rx_2 = rx.resubscribe();
1539 assert_eq!(sender.receiver_count(), 2);
1540 }
1541}