1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
45//! A multi-producer, single-consumer queue for sending requests across asynchronous tasks.
6//!
7//! Channel creation provides `Receiver` and `Sender` handles. `Sender` can make requests that
8//! await a response from the `Receiver`. Every message sent across the channel is packaged with
9//! a `Responder` that is used to respond to that request. A `Sender` will wait until a response is
10//! received before `Sender::request` completes.
11//!
12//! ### Disconnection
13//! When all `Sender` handles have been dropped, it is no longer possible to send requests into the
14//! channel. As such, `Receiver::receive` will return an error.
15//!
16//! ### Clean Shutdown
17//! If a `Receiver` is dropped, it is possible for there to be messages in the channel that will
18//! never be processed. If a clean shutdown is desired, a receiver can first call `Receiver::close`
19//! to prevent further messages from being sent into the channel. Then, the receiver can handle all
20//! messages in the channel and be dropped.
2122use anyhow::Error;
23use futures::channel::{mpsc, oneshot};
24use futures::stream::{FusedStream, Stream};
25use futures::SinkExt;
26use std::pin::Pin;
27use std::task::{Context, Poll};
2829/// The requesting end of a channel.
30pub struct Sender<Req, Resp> {
31 inner: mpsc::Sender<(Req, Responder<Resp>)>,
32}
3334impl<Req, Resp> Clone for Sender<Req, Resp> {
35fn clone(&self) -> Self {
36Self { inner: self.inner.clone() }
37 }
38}
3940impl<Req, Resp> Sender<Req, Resp> {
41/// Send a request on the channel and wait for a response from the responding end of the
42 /// channel.
43 /// An error is returned if the `Receiver` has been dropped or the `Receiver` drops the
44 /// `Responder` for this request.
45pub async fn request(&mut self, value: Req) -> Result<Resp, Error> {
46let (responder, response) = oneshot::channel();
47self.inner.send((value, Responder { inner: responder })).await?;
48Ok(response.await?)
49 }
50}
5152/// Responds to a single request with a value.
53pub struct Responder<Resp> {
54 inner: oneshot::Sender<Resp>,
55}
5657impl<Resp> Responder<Resp> {
58/// Send a response value. If the `Sender` is no longer waiting on a response because the
59 /// request future has been dropped, this method will return the original response `value` as
60 /// an `Err`.
61pub fn respond(self, value: Resp) -> Result<(), Resp> {
62self.inner.send(value)
63 }
64}
6566/// The responding end of a channel.
67// TODO(https://fxbug.dev/42162679): Consider replacing this with this alias:
68//
69// pub type Receiver<Req, Resp> = mpsc::Receiver<(Req, Responder<Resp>)>;
70pub struct Receiver<Req, Resp> {
71 inner: mpsc::Receiver<(Req, Responder<Resp>)>,
72}
7374impl<Req, Resp> Receiver<Req, Resp> {
75/// Close the responding end of the channel.
76 ///
77 /// This prevents further messages from being sent on the channel while still enabling the
78 /// receiver to drain messages that are buffered.
79pub fn close(&mut self) {
80self.inner.close();
81 }
8283/// Try to receive the next message without notifying a context if empty.
84 ///
85 /// This function will panic if called after `try_next` has returned `None` or `receive` has
86 /// returned an `Err`.
87pub fn try_receive(&mut self) -> Result<Option<(Req, Responder<Resp>)>, Error> {
88Ok(self.inner.try_next()?)
89 }
90}
9192impl<Req, Resp> Stream for Receiver<Req, Resp> {
93type Item = (Req, Responder<Resp>);
9495fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
96 Pin::new(&mut self.inner).poll_next(cx)
97 }
98}
99100impl<Req, Resp> FusedStream for Receiver<Req, Resp> {
101fn is_terminated(&self) -> bool {
102self.inner.is_terminated()
103 }
104}
105106/// Create a new asynchronous channel with a bounded capacity, returning the sender/receiver
107/// halves.
108///
109/// This channel follows the semantics of a futures::mpsc::channel when at capacity.
110pub fn channel<Req, Resp>(buffer: usize) -> (Sender<Req, Resp>, Receiver<Req, Resp>) {
111let (inner_sender, inner_receiver) = mpsc::channel(buffer);
112 (Sender { inner: inner_sender }, Receiver { inner: inner_receiver })
113}
114115#[cfg(test)]
116mod tests {
117use super::*;
118use fuchsia_async as fasync;
119use futures::StreamExt;
120use std::pin::pin;
121122macro_rules! unwrap_ready {
123 ($poll:expr) => {
124match $poll {
125 Poll::Ready(value) => value,
126 Poll::Pending => panic!("not ready"),
127 }
128 };
129 }
130131#[test]
132fn sender_receives_response() {
133let mut ex = fasync::TestExecutor::new();
134let (mut sender, mut receiver) = channel(0);
135136let received = receiver.next();
137let mut received = pin!(received);
138assert!(ex.run_until_stalled(&mut received).is_pending());
139140let request = sender.request(());
141let mut request = pin!(request);
142assert!(ex.run_until_stalled(&mut request).is_pending());
143144let ((), responder) = unwrap_ready!(ex.run_until_stalled(&mut received)).unwrap();
145146assert!(ex.run_until_stalled(&mut request).is_pending());
147148 responder.respond(()).unwrap();
149150unwrap_ready!(ex.run_until_stalled(&mut request)).unwrap();
151 }
152153#[test]
154fn cloned_senders_go_to_same_receiver() {
155let mut ex = fasync::TestExecutor::new();
156let (mut sender, mut receiver) = channel(0);
157let mut sender2 = sender.clone();
158159let received = receiver.next();
160let mut received = pin!(received);
161assert!(ex.run_until_stalled(&mut received).is_pending());
162163let request = sender.request(());
164let mut request = pin!(request);
165assert!(ex.run_until_stalled(&mut request).is_pending());
166167let ((), responder) = unwrap_ready!(ex.run_until_stalled(&mut received)).unwrap();
168169assert!(ex.run_until_stalled(&mut request).is_pending());
170171 responder.respond(()).unwrap();
172173unwrap_ready!(ex.run_until_stalled(&mut request)).unwrap();
174175let received = receiver.next();
176let mut received = pin!(received);
177assert!(ex.run_until_stalled(&mut received).is_pending());
178179let request = sender2.request(());
180let mut request = pin!(request);
181assert!(ex.run_until_stalled(&mut request).is_pending());
182183let ((), responder) = unwrap_ready!(ex.run_until_stalled(&mut received)).unwrap();
184185assert!(ex.run_until_stalled(&mut request).is_pending());
186187 responder.respond(()).unwrap();
188189unwrap_ready!(ex.run_until_stalled(&mut request)).unwrap();
190 }
191192#[test]
193fn sender_receives_error_on_dropped_receiver() {
194let mut ex = fasync::TestExecutor::new();
195let (mut sender, receiver) = channel::<(), ()>(0);
196197let request = sender.request(());
198let mut request = pin!(request);
199assert!(ex.run_until_stalled(&mut request).is_pending());
200201 drop(receiver);
202203assert!(unwrap_ready!(ex.run_until_stalled(&mut request)).is_err());
204 }
205206#[test]
207fn sender_receives_error_on_dropped_responder() {
208let mut ex = fasync::TestExecutor::new();
209let (mut sender, mut receiver) = channel::<(), ()>(0);
210211let request = sender.request(());
212let mut request = pin!(request);
213assert!(ex.run_until_stalled(&mut request).is_pending());
214215let received = receiver.next();
216let mut received = pin!(received);
217let ((), responder) = unwrap_ready!(ex.run_until_stalled(&mut received)).unwrap();
218219assert!(ex.run_until_stalled(&mut request).is_pending());
220 drop(responder);
221222assert!(unwrap_ready!(ex.run_until_stalled(&mut request)).is_err());
223 }
224225#[test]
226fn receiver_receives_error_on_dropped_sender() {
227let mut ex = fasync::TestExecutor::new();
228let (sender, mut receiver) = channel::<(), ()>(0);
229230let received = receiver.next();
231let mut received = pin!(received);
232assert!(ex.run_until_stalled(&mut received).is_pending());
233234 drop(sender);
235236assert!(unwrap_ready!(ex.run_until_stalled(&mut received)).is_none());
237 }
238239#[test]
240fn responder_returns_error_on_dropped_sender() {
241let mut ex = fasync::TestExecutor::new();
242let (mut sender, mut receiver) = channel(0);
243244 {
245let request = sender.request(());
246let mut request = pin!(request);
247assert!(ex.run_until_stalled(&mut request).is_pending());
248 } // request is dropped at the end of the block
249250let received = receiver.next();
251let mut received = pin!(received);
252let ((), responder) = unwrap_ready!(ex.run_until_stalled(&mut received)).unwrap();
253254 drop(sender);
255256assert!(responder.respond(()).is_err());
257 }
258259#[fasync::run_until_stalled(test)]
260async fn cannot_request_after_receiver_closed() {
261let (mut sender, mut receiver) = channel::<(), ()>(0);
262 receiver.close();
263assert!(sender.request(()).await.is_err());
264 }
265266#[test]
267fn try_receive_returns_none_when_channel_is_empty() {
268let (_, mut receiver) = channel::<(), ()>(0);
269assert!(receiver.try_receive().unwrap().is_none());
270 }
271272#[test]
273fn try_receive_returns_none_after_none_result() {
274let (_, mut receiver) = channel::<(), ()>(0);
275assert!(receiver.try_receive().unwrap().is_none());
276assert!(receiver.try_receive().unwrap().is_none());
277 }
278279#[test]
280fn try_receive_returns_value_when_channel_has_value() {
281let mut ex = fasync::TestExecutor::new();
282let (mut sender, mut receiver) = channel::<(), ()>(0);
283284let request = sender.request(());
285let mut request = pin!(request);
286assert!(ex.run_until_stalled(&mut request).is_pending());
287288assert!(receiver.try_receive().unwrap().is_some());
289 }
290}