async_helpers/
responding_channel.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! A multi-producer, single-consumer queue for sending requests across asynchronous tasks.
6//!
7//! Channel creation provides `Receiver` and `Sender` handles. `Sender` can make requests that
8//! await a response from the `Receiver`. Every message sent across the channel is packaged with
9//! a `Responder` that is used to respond to that request. A `Sender` will wait until a response is
10//! received before `Sender::request` completes.
11//!
12//! ### Disconnection
13//! When all `Sender` handles have been dropped, it is no longer possible to send requests into the
14//! channel. As such, `Receiver::receive` will return an error.
15//!
16//! ### Clean Shutdown
17//! If a `Receiver` is dropped, it is possible for there to be messages in the channel that will
18//! never be processed. If a clean shutdown is desired, a receiver can first call `Receiver::close`
19//! to prevent further messages from being sent into the channel. Then, the receiver can handle all
20//! messages in the channel and be dropped.
21
22use anyhow::Error;
23use futures::channel::{mpsc, oneshot};
24use futures::stream::{FusedStream, Stream};
25use futures::SinkExt;
26use std::pin::Pin;
27use std::task::{Context, Poll};
28
29/// The requesting end of a channel.
30pub struct Sender<Req, Resp> {
31    inner: mpsc::Sender<(Req, Responder<Resp>)>,
32}
33
34impl<Req, Resp> Clone for Sender<Req, Resp> {
35    fn clone(&self) -> Self {
36        Self { inner: self.inner.clone() }
37    }
38}
39
40impl<Req, Resp> Sender<Req, Resp> {
41    /// Send a request on the channel and wait for a response from the responding end of the
42    /// channel.
43    /// An error is returned if the `Receiver` has been dropped or the `Receiver` drops the
44    /// `Responder` for this request.
45    pub async fn request(&mut self, value: Req) -> Result<Resp, Error> {
46        let (responder, response) = oneshot::channel();
47        self.inner.send((value, Responder { inner: responder })).await?;
48        Ok(response.await?)
49    }
50}
51
52/// Responds to a single request with a value.
53pub struct Responder<Resp> {
54    inner: oneshot::Sender<Resp>,
55}
56
57impl<Resp> Responder<Resp> {
58    /// Send a response value. If the `Sender` is no longer waiting on a response because the
59    /// request future has been dropped, this method will return the original response `value` as
60    /// an `Err`.
61    pub fn respond(self, value: Resp) -> Result<(), Resp> {
62        self.inner.send(value)
63    }
64}
65
66/// The responding end of a channel.
67// TODO(https://fxbug.dev/42162679): Consider replacing this with this alias:
68//
69//   pub type Receiver<Req, Resp> = mpsc::Receiver<(Req, Responder<Resp>)>;
70pub struct Receiver<Req, Resp> {
71    inner: mpsc::Receiver<(Req, Responder<Resp>)>,
72}
73
74impl<Req, Resp> Receiver<Req, Resp> {
75    /// Close the responding end of the channel.
76    ///
77    /// This prevents further messages from being sent on the channel while still enabling the
78    /// receiver to drain messages that are buffered.
79    pub fn close(&mut self) {
80        self.inner.close();
81    }
82
83    /// Try to receive the next message without notifying a context if empty.
84    ///
85    /// This function will panic if called after `try_next` has returned `None` or `receive` has
86    /// returned an `Err`.
87    pub fn try_receive(&mut self) -> Result<Option<(Req, Responder<Resp>)>, Error> {
88        Ok(self.inner.try_next()?)
89    }
90}
91
92impl<Req, Resp> Stream for Receiver<Req, Resp> {
93    type Item = (Req, Responder<Resp>);
94
95    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
96        Pin::new(&mut self.inner).poll_next(cx)
97    }
98}
99
100impl<Req, Resp> FusedStream for Receiver<Req, Resp> {
101    fn is_terminated(&self) -> bool {
102        self.inner.is_terminated()
103    }
104}
105
106/// Create a new asynchronous channel with a bounded capacity, returning the sender/receiver
107/// halves.
108///
109/// This channel follows the semantics of a futures::mpsc::channel when at capacity.
110pub fn channel<Req, Resp>(buffer: usize) -> (Sender<Req, Resp>, Receiver<Req, Resp>) {
111    let (inner_sender, inner_receiver) = mpsc::channel(buffer);
112    (Sender { inner: inner_sender }, Receiver { inner: inner_receiver })
113}
114
115#[cfg(test)]
116mod tests {
117    use super::*;
118    use fuchsia_async as fasync;
119    use futures::StreamExt;
120    use std::pin::pin;
121
122    macro_rules! unwrap_ready {
123        ($poll:expr) => {
124            match $poll {
125                Poll::Ready(value) => value,
126                Poll::Pending => panic!("not ready"),
127            }
128        };
129    }
130
131    #[test]
132    fn sender_receives_response() {
133        let mut ex = fasync::TestExecutor::new();
134        let (mut sender, mut receiver) = channel(0);
135
136        let received = receiver.next();
137        let mut received = pin!(received);
138        assert!(ex.run_until_stalled(&mut received).is_pending());
139
140        let request = sender.request(());
141        let mut request = pin!(request);
142        assert!(ex.run_until_stalled(&mut request).is_pending());
143
144        let ((), responder) = unwrap_ready!(ex.run_until_stalled(&mut received)).unwrap();
145
146        assert!(ex.run_until_stalled(&mut request).is_pending());
147
148        responder.respond(()).unwrap();
149
150        unwrap_ready!(ex.run_until_stalled(&mut request)).unwrap();
151    }
152
153    #[test]
154    fn cloned_senders_go_to_same_receiver() {
155        let mut ex = fasync::TestExecutor::new();
156        let (mut sender, mut receiver) = channel(0);
157        let mut sender2 = sender.clone();
158
159        let received = receiver.next();
160        let mut received = pin!(received);
161        assert!(ex.run_until_stalled(&mut received).is_pending());
162
163        let request = sender.request(());
164        let mut request = pin!(request);
165        assert!(ex.run_until_stalled(&mut request).is_pending());
166
167        let ((), responder) = unwrap_ready!(ex.run_until_stalled(&mut received)).unwrap();
168
169        assert!(ex.run_until_stalled(&mut request).is_pending());
170
171        responder.respond(()).unwrap();
172
173        unwrap_ready!(ex.run_until_stalled(&mut request)).unwrap();
174
175        let received = receiver.next();
176        let mut received = pin!(received);
177        assert!(ex.run_until_stalled(&mut received).is_pending());
178
179        let request = sender2.request(());
180        let mut request = pin!(request);
181        assert!(ex.run_until_stalled(&mut request).is_pending());
182
183        let ((), responder) = unwrap_ready!(ex.run_until_stalled(&mut received)).unwrap();
184
185        assert!(ex.run_until_stalled(&mut request).is_pending());
186
187        responder.respond(()).unwrap();
188
189        unwrap_ready!(ex.run_until_stalled(&mut request)).unwrap();
190    }
191
192    #[test]
193    fn sender_receives_error_on_dropped_receiver() {
194        let mut ex = fasync::TestExecutor::new();
195        let (mut sender, receiver) = channel::<(), ()>(0);
196
197        let request = sender.request(());
198        let mut request = pin!(request);
199        assert!(ex.run_until_stalled(&mut request).is_pending());
200
201        drop(receiver);
202
203        assert!(unwrap_ready!(ex.run_until_stalled(&mut request)).is_err());
204    }
205
206    #[test]
207    fn sender_receives_error_on_dropped_responder() {
208        let mut ex = fasync::TestExecutor::new();
209        let (mut sender, mut receiver) = channel::<(), ()>(0);
210
211        let request = sender.request(());
212        let mut request = pin!(request);
213        assert!(ex.run_until_stalled(&mut request).is_pending());
214
215        let received = receiver.next();
216        let mut received = pin!(received);
217        let ((), responder) = unwrap_ready!(ex.run_until_stalled(&mut received)).unwrap();
218
219        assert!(ex.run_until_stalled(&mut request).is_pending());
220        drop(responder);
221
222        assert!(unwrap_ready!(ex.run_until_stalled(&mut request)).is_err());
223    }
224
225    #[test]
226    fn receiver_receives_error_on_dropped_sender() {
227        let mut ex = fasync::TestExecutor::new();
228        let (sender, mut receiver) = channel::<(), ()>(0);
229
230        let received = receiver.next();
231        let mut received = pin!(received);
232        assert!(ex.run_until_stalled(&mut received).is_pending());
233
234        drop(sender);
235
236        assert!(unwrap_ready!(ex.run_until_stalled(&mut received)).is_none());
237    }
238
239    #[test]
240    fn responder_returns_error_on_dropped_sender() {
241        let mut ex = fasync::TestExecutor::new();
242        let (mut sender, mut receiver) = channel(0);
243
244        {
245            let request = sender.request(());
246            let mut request = pin!(request);
247            assert!(ex.run_until_stalled(&mut request).is_pending());
248        } // request is dropped at the end of the block
249
250        let received = receiver.next();
251        let mut received = pin!(received);
252        let ((), responder) = unwrap_ready!(ex.run_until_stalled(&mut received)).unwrap();
253
254        drop(sender);
255
256        assert!(responder.respond(()).is_err());
257    }
258
259    #[fasync::run_until_stalled(test)]
260    async fn cannot_request_after_receiver_closed() {
261        let (mut sender, mut receiver) = channel::<(), ()>(0);
262        receiver.close();
263        assert!(sender.request(()).await.is_err());
264    }
265
266    #[test]
267    fn try_receive_returns_none_when_channel_is_empty() {
268        let (_, mut receiver) = channel::<(), ()>(0);
269        assert!(receiver.try_receive().unwrap().is_none());
270    }
271
272    #[test]
273    fn try_receive_returns_none_after_none_result() {
274        let (_, mut receiver) = channel::<(), ()>(0);
275        assert!(receiver.try_receive().unwrap().is_none());
276        assert!(receiver.try_receive().unwrap().is_none());
277    }
278
279    #[test]
280    fn try_receive_returns_value_when_channel_has_value() {
281        let mut ex = fasync::TestExecutor::new();
282        let (mut sender, mut receiver) = channel::<(), ()>(0);
283
284        let request = sender.request(());
285        let mut request = pin!(request);
286        assert!(ex.run_until_stalled(&mut request).is_pending());
287
288        assert!(receiver.try_receive().unwrap().is_some());
289    }
290}