mpmc/
lib.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![deny(missing_docs)]
6
7//! A library with futures-aware mpmc channels.
8
9use crossbeam::queue::SegQueue;
10use futures::channel::mpsc;
11use futures::lock::Mutex;
12use futures::sink::SinkExt;
13use futures::stream::FusedStream;
14use futures::Stream;
15use std::pin::Pin;
16use std::sync::{Arc, Weak};
17use std::task::{Context, Poll};
18
19/// The default number of messages that will be buffered per-receiver.
20pub const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 100;
21
22/// An async sender end of an mpmc channel. Messages sent on this are received by
23/// _all_ receivers connected to it (they are duplicated).
24#[derive(Clone)]
25pub struct Sender<T> {
26    inner: Arc<Mutex<Vec<mpsc::Sender<T>>>>,
27    enqueued_senders: Arc<SegQueue<mpsc::Sender<T>>>,
28    buffer_size: usize,
29}
30
31impl<T> Default for Sender<T> {
32    fn default() -> Self {
33        Sender {
34            inner: Arc::default(),
35            enqueued_senders: Arc::default(),
36            buffer_size: DEFAULT_CHANNEL_BUFFER_SIZE,
37        }
38    }
39}
40
41impl<T: Clone> Sender<T> {
42    /// Construct a sender whose receivers will buffer the given number of messages.
43    pub fn with_buffer_size(buffer_size: usize) -> Self {
44        Self { buffer_size, ..Default::default() }
45    }
46
47    /// Sends `payload` to all receivers that exist at the time of send.
48    ///
49    /// Sending is never an error, even if there are no receivers.
50    pub async fn send(&self, payload: T) {
51        let mut inner = self.inner.lock().await;
52        while let Some(new_sender) = self.enqueued_senders.pop() {
53            inner.push(new_sender);
54        }
55
56        let mut living_senders = vec![];
57        for mut sender in inner.drain(0..) {
58            let should_live = match sender.try_send(payload.clone()).err() {
59                None => true,
60                Some(send_error) if send_error.is_disconnected() => false,
61                Some(e) => {
62                    // The receiver is full. Apply backpressure.
63                    let payload = e.into_inner();
64                    sender.send(payload).await.is_ok()
65                }
66            };
67
68            if should_live {
69                living_senders.push(sender);
70            }
71        }
72        inner.append(&mut living_senders);
73    }
74
75    /// Creates a new receiver who will receive a copy of all messages sent after its creation.
76    pub fn new_receiver(&self) -> Receiver<T> {
77        let (sender, receiver) = mpsc::channel(self.buffer_size);
78        self.enqueued_senders.push(sender);
79        Receiver {
80            sources: Arc::downgrade(&self.enqueued_senders),
81            inner: receiver,
82            buffer_size: self.buffer_size,
83        }
84    }
85}
86
87/// An async receiver end of an mpmc channel. All receivers connected to the same
88/// sender receive the same duplicated message sequence.
89///
90/// The message sequence is duplicated starting from the beginning of the
91/// instance's lifetime; messages sent before the receiver is added to the
92/// channel are not duplicated.
93pub struct Receiver<T> {
94    sources: Weak<SegQueue<mpsc::Sender<T>>>,
95    inner: mpsc::Receiver<T>,
96    buffer_size: usize,
97}
98
99impl<T: Clone> Clone for Receiver<T> {
100    fn clone(&self) -> Self {
101        if let Some(sender_set) = self.sources.upgrade() {
102            let (sender, receiver) = mpsc::channel(self.buffer_size);
103            let sources = sender_set;
104            sources.push(sender);
105            Self {
106                sources: Arc::downgrade(&sources),
107                inner: receiver,
108                buffer_size: self.buffer_size,
109            }
110        } else {
111            // The senders have all been dropped; clone to a dummy channel that just yields `None`
112            // to be consistent.
113            let (_, receiver) = mpsc::channel(1);
114            Self { sources: Weak::new(), inner: receiver, buffer_size: 1 }
115        }
116    }
117}
118
119impl<T: Clone> Stream for Receiver<T> {
120    type Item = T;
121    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
122        Stream::poll_next(Pin::new(&mut self.inner), cx)
123    }
124}
125
126impl<T: Clone> FusedStream for Receiver<T> {
127    fn is_terminated(&self) -> bool {
128        self.inner.is_terminated()
129    }
130}