1#![deny(missing_docs)]
6
7use crossbeam::queue::SegQueue;
10use futures::channel::mpsc;
11use futures::lock::Mutex;
12use futures::sink::SinkExt;
13use futures::stream::FusedStream;
14use futures::Stream;
15use std::pin::Pin;
16use std::sync::{Arc, Weak};
17use std::task::{Context, Poll};
18
19pub const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 100;
21
22#[derive(Clone)]
25pub struct Sender<T> {
26 inner: Arc<Mutex<Vec<mpsc::Sender<T>>>>,
27 enqueued_senders: Arc<SegQueue<mpsc::Sender<T>>>,
28 buffer_size: usize,
29}
30
31impl<T> Default for Sender<T> {
32 fn default() -> Self {
33 Sender {
34 inner: Arc::default(),
35 enqueued_senders: Arc::default(),
36 buffer_size: DEFAULT_CHANNEL_BUFFER_SIZE,
37 }
38 }
39}
40
41impl<T: Clone> Sender<T> {
42 pub fn with_buffer_size(buffer_size: usize) -> Self {
44 Self { buffer_size, ..Default::default() }
45 }
46
47 pub async fn send(&self, payload: T) {
51 let mut inner = self.inner.lock().await;
52 while let Some(new_sender) = self.enqueued_senders.pop() {
53 inner.push(new_sender);
54 }
55
56 let mut living_senders = vec![];
57 for mut sender in inner.drain(0..) {
58 let should_live = match sender.try_send(payload.clone()).err() {
59 None => true,
60 Some(send_error) if send_error.is_disconnected() => false,
61 Some(e) => {
62 let payload = e.into_inner();
64 sender.send(payload).await.is_ok()
65 }
66 };
67
68 if should_live {
69 living_senders.push(sender);
70 }
71 }
72 inner.append(&mut living_senders);
73 }
74
75 pub fn new_receiver(&self) -> Receiver<T> {
77 let (sender, receiver) = mpsc::channel(self.buffer_size);
78 self.enqueued_senders.push(sender);
79 Receiver {
80 sources: Arc::downgrade(&self.enqueued_senders),
81 inner: receiver,
82 buffer_size: self.buffer_size,
83 }
84 }
85}
86
87pub struct Receiver<T> {
94 sources: Weak<SegQueue<mpsc::Sender<T>>>,
95 inner: mpsc::Receiver<T>,
96 buffer_size: usize,
97}
98
99impl<T: Clone> Clone for Receiver<T> {
100 fn clone(&self) -> Self {
101 if let Some(sender_set) = self.sources.upgrade() {
102 let (sender, receiver) = mpsc::channel(self.buffer_size);
103 let sources = sender_set;
104 sources.push(sender);
105 Self {
106 sources: Arc::downgrade(&sources),
107 inner: receiver,
108 buffer_size: self.buffer_size,
109 }
110 } else {
111 let (_, receiver) = mpsc::channel(1);
114 Self { sources: Weak::new(), inner: receiver, buffer_size: 1 }
115 }
116 }
117}
118
119impl<T: Clone> Stream for Receiver<T> {
120 type Item = T;
121 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
122 Stream::poll_next(Pin::new(&mut self.inner), cx)
123 }
124}
125
126impl<T: Clone> FusedStream for Receiver<T> {
127 fn is_terminated(&self) -> bool {
128 self.inner.is_terminated()
129 }
130}