async_helpers/
responding_channel.rsuse anyhow::Error;
use futures::channel::{mpsc, oneshot};
use futures::stream::{FusedStream, Stream};
use futures::SinkExt;
use std::pin::Pin;
use std::task::{Context, Poll};
pub struct Sender<Req, Resp> {
inner: mpsc::Sender<(Req, Responder<Resp>)>,
}
impl<Req, Resp> Clone for Sender<Req, Resp> {
fn clone(&self) -> Self {
Self { inner: self.inner.clone() }
}
}
impl<Req, Resp> Sender<Req, Resp> {
pub async fn request(&mut self, value: Req) -> Result<Resp, Error> {
let (responder, response) = oneshot::channel();
self.inner.send((value, Responder { inner: responder })).await?;
Ok(response.await?)
}
}
pub struct Responder<Resp> {
inner: oneshot::Sender<Resp>,
}
impl<Resp> Responder<Resp> {
pub fn respond(self, value: Resp) -> Result<(), Resp> {
self.inner.send(value)
}
}
pub struct Receiver<Req, Resp> {
inner: mpsc::Receiver<(Req, Responder<Resp>)>,
}
impl<Req, Resp> Receiver<Req, Resp> {
pub fn close(&mut self) {
self.inner.close();
}
pub fn try_receive(&mut self) -> Result<Option<(Req, Responder<Resp>)>, Error> {
Ok(self.inner.try_next()?)
}
}
impl<Req, Resp> Stream for Receiver<Req, Resp> {
type Item = (Req, Responder<Resp>);
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.inner).poll_next(cx)
}
}
impl<Req, Resp> FusedStream for Receiver<Req, Resp> {
fn is_terminated(&self) -> bool {
self.inner.is_terminated()
}
}
pub fn channel<Req, Resp>(buffer: usize) -> (Sender<Req, Resp>, Receiver<Req, Resp>) {
let (inner_sender, inner_receiver) = mpsc::channel(buffer);
(Sender { inner: inner_sender }, Receiver { inner: inner_receiver })
}
#[cfg(test)]
mod tests {
use super::*;
use fuchsia_async as fasync;
use futures::StreamExt;
use std::pin::pin;
macro_rules! unwrap_ready {
($poll:expr) => {
match $poll {
Poll::Ready(value) => value,
Poll::Pending => panic!("not ready"),
}
};
}
#[test]
fn sender_receives_response() {
let mut ex = fasync::TestExecutor::new();
let (mut sender, mut receiver) = channel(0);
let received = receiver.next();
let mut received = pin!(received);
assert!(ex.run_until_stalled(&mut received).is_pending());
let request = sender.request(());
let mut request = pin!(request);
assert!(ex.run_until_stalled(&mut request).is_pending());
let ((), responder) = unwrap_ready!(ex.run_until_stalled(&mut received)).unwrap();
assert!(ex.run_until_stalled(&mut request).is_pending());
responder.respond(()).unwrap();
unwrap_ready!(ex.run_until_stalled(&mut request)).unwrap();
}
#[test]
fn cloned_senders_go_to_same_receiver() {
let mut ex = fasync::TestExecutor::new();
let (mut sender, mut receiver) = channel(0);
let mut sender2 = sender.clone();
let received = receiver.next();
let mut received = pin!(received);
assert!(ex.run_until_stalled(&mut received).is_pending());
let request = sender.request(());
let mut request = pin!(request);
assert!(ex.run_until_stalled(&mut request).is_pending());
let ((), responder) = unwrap_ready!(ex.run_until_stalled(&mut received)).unwrap();
assert!(ex.run_until_stalled(&mut request).is_pending());
responder.respond(()).unwrap();
unwrap_ready!(ex.run_until_stalled(&mut request)).unwrap();
let received = receiver.next();
let mut received = pin!(received);
assert!(ex.run_until_stalled(&mut received).is_pending());
let request = sender2.request(());
let mut request = pin!(request);
assert!(ex.run_until_stalled(&mut request).is_pending());
let ((), responder) = unwrap_ready!(ex.run_until_stalled(&mut received)).unwrap();
assert!(ex.run_until_stalled(&mut request).is_pending());
responder.respond(()).unwrap();
unwrap_ready!(ex.run_until_stalled(&mut request)).unwrap();
}
#[test]
fn sender_receives_error_on_dropped_receiver() {
let mut ex = fasync::TestExecutor::new();
let (mut sender, receiver) = channel::<(), ()>(0);
let request = sender.request(());
let mut request = pin!(request);
assert!(ex.run_until_stalled(&mut request).is_pending());
drop(receiver);
assert!(unwrap_ready!(ex.run_until_stalled(&mut request)).is_err());
}
#[test]
fn sender_receives_error_on_dropped_responder() {
let mut ex = fasync::TestExecutor::new();
let (mut sender, mut receiver) = channel::<(), ()>(0);
let request = sender.request(());
let mut request = pin!(request);
assert!(ex.run_until_stalled(&mut request).is_pending());
let received = receiver.next();
let mut received = pin!(received);
let ((), responder) = unwrap_ready!(ex.run_until_stalled(&mut received)).unwrap();
assert!(ex.run_until_stalled(&mut request).is_pending());
drop(responder);
assert!(unwrap_ready!(ex.run_until_stalled(&mut request)).is_err());
}
#[test]
fn receiver_receives_error_on_dropped_sender() {
let mut ex = fasync::TestExecutor::new();
let (sender, mut receiver) = channel::<(), ()>(0);
let received = receiver.next();
let mut received = pin!(received);
assert!(ex.run_until_stalled(&mut received).is_pending());
drop(sender);
assert!(unwrap_ready!(ex.run_until_stalled(&mut received)).is_none());
}
#[test]
fn responder_returns_error_on_dropped_sender() {
let mut ex = fasync::TestExecutor::new();
let (mut sender, mut receiver) = channel(0);
{
let request = sender.request(());
let mut request = pin!(request);
assert!(ex.run_until_stalled(&mut request).is_pending());
} let received = receiver.next();
let mut received = pin!(received);
let ((), responder) = unwrap_ready!(ex.run_until_stalled(&mut received)).unwrap();
drop(sender);
assert!(responder.respond(()).is_err());
}
#[fasync::run_until_stalled(test)]
async fn cannot_request_after_receiver_closed() {
let (mut sender, mut receiver) = channel::<(), ()>(0);
receiver.close();
assert!(sender.request(()).await.is_err());
}
#[test]
fn try_receive_returns_none_when_channel_is_empty() {
let (_, mut receiver) = channel::<(), ()>(0);
assert!(receiver.try_receive().unwrap().is_none());
}
#[test]
fn try_receive_returns_none_after_none_result() {
let (_, mut receiver) = channel::<(), ()>(0);
assert!(receiver.try_receive().unwrap().is_none());
assert!(receiver.try_receive().unwrap().is_none());
}
#[test]
fn try_receive_returns_value_when_channel_has_value() {
let mut ex = fasync::TestExecutor::new();
let (mut sender, mut receiver) = channel::<(), ()>(0);
let request = sender.request(());
let mut request = pin!(request);
assert!(ex.run_until_stalled(&mut request).is_pending());
assert!(receiver.try_receive().unwrap().is_some());
}
}