#![deny(missing_docs)]
use crossbeam::queue::SegQueue;
use futures::channel::mpsc;
use futures::lock::Mutex;
use futures::sink::SinkExt;
use futures::stream::FusedStream;
use futures::Stream;
use std::pin::Pin;
use std::sync::{Arc, Weak};
use std::task::{Context, Poll};
pub const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 100;
#[derive(Clone)]
pub struct Sender<T> {
inner: Arc<Mutex<Vec<mpsc::Sender<T>>>>,
enqueued_senders: Arc<SegQueue<mpsc::Sender<T>>>,
buffer_size: usize,
}
impl<T> Default for Sender<T> {
fn default() -> Self {
Sender {
inner: Arc::default(),
enqueued_senders: Arc::default(),
buffer_size: DEFAULT_CHANNEL_BUFFER_SIZE,
}
}
}
impl<T: Clone> Sender<T> {
pub fn with_buffer_size(buffer_size: usize) -> Self {
Self { buffer_size, ..Default::default() }
}
pub async fn send(&self, payload: T) {
let mut inner = self.inner.lock().await;
while let Some(new_sender) = self.enqueued_senders.pop() {
inner.push(new_sender);
}
let mut living_senders = vec![];
for mut sender in inner.drain(0..) {
let should_live = match sender.try_send(payload.clone()).err() {
None => true,
Some(send_error) if send_error.is_disconnected() => false,
Some(e) => {
let payload = e.into_inner();
sender.send(payload).await.is_ok()
}
};
if should_live {
living_senders.push(sender);
}
}
inner.append(&mut living_senders);
}
pub fn new_receiver(&self) -> Receiver<T> {
let (sender, receiver) = mpsc::channel(self.buffer_size);
self.enqueued_senders.push(sender);
Receiver {
sources: Arc::downgrade(&self.enqueued_senders),
inner: receiver,
buffer_size: self.buffer_size,
}
}
}
pub struct Receiver<T> {
sources: Weak<SegQueue<mpsc::Sender<T>>>,
inner: mpsc::Receiver<T>,
buffer_size: usize,
}
impl<T: Clone> Clone for Receiver<T> {
fn clone(&self) -> Self {
if let Some(sender_set) = self.sources.upgrade() {
let (sender, receiver) = mpsc::channel(self.buffer_size);
let sources = sender_set;
sources.push(sender);
Self {
sources: Arc::downgrade(&sources),
inner: receiver,
buffer_size: self.buffer_size,
}
} else {
let (_, receiver) = mpsc::channel(1);
Self { sources: Weak::new(), inner: receiver, buffer_size: 1 }
}
}
}
impl<T: Clone> Stream for Receiver<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Stream::poll_next(Pin::new(&mut self.inner), cx)
}
}
impl<T: Clone> FusedStream for Receiver<T> {
fn is_terminated(&self) -> bool {
self.inner.is_terminated()
}
}