1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
45//! Additional functionality for use with asynchronous channels (futures::channel::mpsc).
67use core::pin::Pin;
8use core::task::{Context, Poll};
9use futures::channel::mpsc;
10use futures::{ready, Future};
1112/// Extends the functionality of a channel to include `try_send_fut`.
13pub trait TrySend<Item> {
14/// Returns a future that will complete successfully when the item has been buffered in the
15 /// channel or unsuccessfully when the receiving end has been dropped.
16 ///
17 /// The item is returned to the sender if it could not be sent. This is distinct from the
18 /// functionality found in the `futures` library which will consume the item regardless of
19 /// whether the item was successfully sent.
20 ///
21 /// NOTE: even in the event of successful completion, there is no guarantee that the receiver
22 /// has consumed the message. It is possible for the receiver to drop its end of the channel
23 /// without consuming all queued items.
24fn try_send_fut(&mut self, item: Item) -> TrySendFut<'_, Item>;
25}
2627impl<Item> TrySend<Item> for mpsc::Sender<Item> {
28fn try_send_fut(&mut self, item: Item) -> TrySendFut<'_, Item> {
29 TrySendFut::new(item, self)
30 }
31}
3233/// A Future that represents an ongoing send over a channel.
34/// It completes when the send is complete or the send failed due to channel closure.
35#[must_use]
36pub struct TrySendFut<'a, Item> {
37// item must always be constructed with Some value to prevent a panic.
38item: Option<Item>,
39 channel: &'a mut mpsc::Sender<Item>,
40}
4142/// Returns a future that will complete successfully when the PeerTask has received the
43/// message or unsuccessfully when the PeerTask has dropped its receiver.
44impl<'a, Item> TrySendFut<'a, Item> {
45/// Construct a new `TrySendFut`.
46fn new(item: Item, channel: &'a mut mpsc::Sender<Item>) -> Self {
47Self { item: Some(item), channel }
48 }
49}
5051impl<'a, Item> Unpin for TrySendFut<'a, Item> {}
5253impl<'a, Item> Future for TrySendFut<'a, Item> {
54type Output = Result<(), Item>;
55fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
56loop {
57let ready = ready!(self.channel.poll_ready(cx));
58// `self.item` cannot be `None` because it is initialized as `Some`,
59 // and the value is reinserted before continuing below.
60let item = self.item.take().expect("Cannot poll without `Some` item");
61match ready {
62Err(e) => {
63// `mpsc::Sender::poll_ready` only errors on disconnection.
64assert!(e.is_disconnected(), "{}", e);
65return Poll::Ready(Err(item));
66 }
67Ok(()) => {}
68 }
69match self.channel.try_send(item) {
70Ok(()) => return Poll::Ready(Ok(())),
71Err(e) => {
72if e.is_disconnected() {
73return Poll::Ready(Err(e.into_inner()));
74 } else {
75// We raced with a competing sender; reset `self.item`
76 // and wait again.
77assert!(e.is_full(), "{}", e);
78self.item = Some(e.into_inner());
79continue;
80 }
81 }
82 }
83 }
84 }
85}
8687#[cfg(test)]
88mod tests {
89use super::*;
90use fuchsia_async as fasync;
91use futures::future::join;
92use futures::StreamExt;
9394#[fasync::run_until_stalled(test)]
95async fn item_future_completes_on_receive() {
96// Vec is chosen as the item type because it is !Copy and the primary use of this
97 // method is for items that are not implicitly copiable.
98let (mut sender, mut receiver) = mpsc::channel(0);
99let (send_result, receive_result) =
100 join(sender.try_send_fut(vec![1]), receiver.next()).await;
101assert_eq!(send_result, Ok(()));
102assert_eq!(receive_result, Some(vec![1]));
103 }
104105#[fasync::run_until_stalled(test)]
106async fn item_future_errors_on_receiver_closed() {
107let (mut sender, receiver) = mpsc::channel(0);
108// Drop receiving end to force an error.
109drop(receiver);
110let send_result = sender.try_send_fut(vec![1]).await;
111assert_eq!(send_result, Err(vec![1]));
112 }
113114#[test]
115fn item_future_pending_on_buffer_full() {
116let mut exec = fasync::TestExecutor::new();
117let (mut sender, mut receiver) = mpsc::channel(0);
118119let send_result = exec.run_singlethreaded(sender.try_send_fut(vec![1]));
120assert_eq!(send_result, Ok(()));
121122// Send a second item while the first is still in the channel.
123let mut send_fut = sender.try_send_fut(vec![2]);
124let send_poll_result = exec.run_until_stalled(&mut send_fut);
125assert_eq!(send_poll_result, Poll::Pending);
126127// Consume the first item;
128let receive_poll_result = exec.run_until_stalled(&mut receiver.next());
129assert_eq!(receive_poll_result, Poll::Ready(Some(vec![1])));
130131// Now there is room in the channel for the second item
132let send_poll_result = exec.run_until_stalled(&mut send_fut);
133assert_eq!(send_poll_result, Poll::Ready(Ok(())));
134135// The second item is received
136let receive_poll_result = exec.run_until_stalled(&mut receiver.next());
137assert_eq!(receive_poll_result, Poll::Ready(Some(vec![2])));
138 }
139}