async_utils/
channel.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Additional functionality for use with asynchronous channels (futures::channel::mpsc).
6
7use core::pin::Pin;
8use core::task::{Context, Poll};
9use futures::channel::mpsc;
10use futures::{ready, Future};
11
12/// Extends the functionality of a channel to include `try_send_fut`.
13pub trait TrySend<Item> {
14    /// Returns a future that will complete successfully when the item has been buffered in the
15    /// channel or unsuccessfully when the receiving end has been dropped.
16    ///
17    /// The item is returned to the sender if it could not be sent. This is distinct from the
18    /// functionality found in the `futures` library which will consume the item regardless of
19    /// whether the item was successfully sent.
20    ///
21    /// NOTE: even in the event of successful completion, there is no guarantee that the receiver
22    /// has consumed the message. It is possible for the receiver to drop its end of the channel
23    /// without consuming all queued items.
24    fn try_send_fut(&mut self, item: Item) -> TrySendFut<'_, Item>;
25}
26
27impl<Item> TrySend<Item> for mpsc::Sender<Item> {
28    fn try_send_fut(&mut self, item: Item) -> TrySendFut<'_, Item> {
29        TrySendFut::new(item, self)
30    }
31}
32
33/// A Future that represents an ongoing send over a channel.
34/// It completes when the send is complete or the send failed due to channel closure.
35#[must_use]
36pub struct TrySendFut<'a, Item> {
37    // item must always be constructed with Some value to prevent a panic.
38    item: Option<Item>,
39    channel: &'a mut mpsc::Sender<Item>,
40}
41
42/// Returns a future that will complete successfully when the PeerTask has received the
43/// message or unsuccessfully when the PeerTask has dropped its receiver.
44impl<'a, Item> TrySendFut<'a, Item> {
45    /// Construct a new `TrySendFut`.
46    fn new(item: Item, channel: &'a mut mpsc::Sender<Item>) -> Self {
47        Self { item: Some(item), channel }
48    }
49}
50
51impl<'a, Item> Unpin for TrySendFut<'a, Item> {}
52
53impl<'a, Item> Future for TrySendFut<'a, Item> {
54    type Output = Result<(), Item>;
55    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
56        loop {
57            let ready = ready!(self.channel.poll_ready(cx));
58            // `self.item` cannot be `None` because it is initialized as `Some`,
59            // and the value is reinserted before continuing below.
60            let item = self.item.take().expect("Cannot poll without `Some` item");
61            match ready {
62                Err(e) => {
63                    // `mpsc::Sender::poll_ready` only errors on disconnection.
64                    assert!(e.is_disconnected(), "{}", e);
65                    return Poll::Ready(Err(item));
66                }
67                Ok(()) => {}
68            }
69            match self.channel.try_send(item) {
70                Ok(()) => return Poll::Ready(Ok(())),
71                Err(e) => {
72                    if e.is_disconnected() {
73                        return Poll::Ready(Err(e.into_inner()));
74                    } else {
75                        // We raced with a competing sender; reset `self.item`
76                        // and wait again.
77                        assert!(e.is_full(), "{}", e);
78                        self.item = Some(e.into_inner());
79                        continue;
80                    }
81                }
82            }
83        }
84    }
85}
86
87#[cfg(test)]
88mod tests {
89    use super::*;
90    use fuchsia_async as fasync;
91    use futures::future::join;
92    use futures::StreamExt;
93
94    #[fasync::run_until_stalled(test)]
95    async fn item_future_completes_on_receive() {
96        // Vec is chosen as the item type because it is !Copy and the primary use of this
97        // method is for items that are not implicitly copiable.
98        let (mut sender, mut receiver) = mpsc::channel(0);
99        let (send_result, receive_result) =
100            join(sender.try_send_fut(vec![1]), receiver.next()).await;
101        assert_eq!(send_result, Ok(()));
102        assert_eq!(receive_result, Some(vec![1]));
103    }
104
105    #[fasync::run_until_stalled(test)]
106    async fn item_future_errors_on_receiver_closed() {
107        let (mut sender, receiver) = mpsc::channel(0);
108        // Drop receiving end to force an error.
109        drop(receiver);
110        let send_result = sender.try_send_fut(vec![1]).await;
111        assert_eq!(send_result, Err(vec![1]));
112    }
113
114    #[test]
115    fn item_future_pending_on_buffer_full() {
116        let mut exec = fasync::TestExecutor::new();
117        let (mut sender, mut receiver) = mpsc::channel(0);
118
119        let send_result = exec.run_singlethreaded(sender.try_send_fut(vec![1]));
120        assert_eq!(send_result, Ok(()));
121
122        // Send a second item while the first is still in the channel.
123        let mut send_fut = sender.try_send_fut(vec![2]);
124        let send_poll_result = exec.run_until_stalled(&mut send_fut);
125        assert_eq!(send_poll_result, Poll::Pending);
126
127        // Consume the first item;
128        let receive_poll_result = exec.run_until_stalled(&mut receiver.next());
129        assert_eq!(receive_poll_result, Poll::Ready(Some(vec![1])));
130
131        // Now there is room in the channel for the second item
132        let send_poll_result = exec.run_until_stalled(&mut send_fut);
133        assert_eq!(send_poll_result, Poll::Ready(Ok(())));
134
135        // The second item is received
136        let receive_poll_result = exec.run_until_stalled(&mut receiver.next());
137        assert_eq!(receive_poll_result, Poll::Ready(Some(vec![2])));
138    }
139}