1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

//! Additional functionality for use with asynchronous channels (futures::channel::mpsc).

use {
    core::{
        pin::Pin,
        task::{Context, Poll},
    },
    futures::{channel::mpsc, ready, Future},
};

/// Extends the functionality of a channel to include `try_send_fut`.
pub trait TrySend<Item> {
    /// Returns a future that will complete successfully when the item has been buffered in the
    /// channel or unsuccessfully when the receiving end has been dropped.
    ///
    /// The item is returned to the sender if it could not be sent. This is distinct from the
    /// functionality found in the `futures` library which will consume the item regardless of
    /// whether the item was successfully sent.
    ///
    /// NOTE: even in the event of successful completion, there is no guarantee that the receiver
    /// has consumed the message. It is possible for the receiver to drop its end of the channel
    /// without consuming all queued items.
    fn try_send_fut(&mut self, item: Item) -> TrySendFut<'_, Item>;
}

impl<Item> TrySend<Item> for mpsc::Sender<Item> {
    fn try_send_fut(&mut self, item: Item) -> TrySendFut<'_, Item> {
        TrySendFut::new(item, self)
    }
}

/// A Future that represents an ongoing send over a channel.
/// It completes when the send is complete or the send failed due to channel closure.
#[must_use]
pub struct TrySendFut<'a, Item> {
    // item must always be constructed with Some value to prevent a panic.
    item: Option<Item>,
    channel: &'a mut mpsc::Sender<Item>,
}

/// Returns a future that will complete successfully when the PeerTask has received the
/// message or unsuccessfully when the PeerTask has dropped its receiver.
impl<'a, Item> TrySendFut<'a, Item> {
    /// Construct a new `TrySendFut`.
    fn new(item: Item, channel: &'a mut mpsc::Sender<Item>) -> Self {
        Self { item: Some(item), channel }
    }
}

impl<'a, Item> Unpin for TrySendFut<'a, Item> {}

impl<'a, Item> Future for TrySendFut<'a, Item> {
    type Output = Result<(), Item>;
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        loop {
            let ready = ready!(self.channel.poll_ready(cx));
            // `self.item` cannot be `None` because it is initialized as `Some`,
            // and the value is reinserted before continuing below.
            let item = self.item.take().expect("Cannot poll without `Some` item");
            match ready {
                Err(e) => {
                    // `mpsc::Sender::poll_ready` only errors on disconnection.
                    assert!(e.is_disconnected(), "{}", e);
                    return Poll::Ready(Err(item));
                }
                Ok(()) => {}
            }
            match self.channel.try_send(item) {
                Ok(()) => return Poll::Ready(Ok(())),
                Err(e) => {
                    if e.is_disconnected() {
                        return Poll::Ready(Err(e.into_inner()));
                    } else {
                        // We raced with a competing sender; reset `self.item`
                        // and wait again.
                        assert!(e.is_full(), "{}", e);
                        self.item = Some(e.into_inner());
                        continue;
                    }
                }
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use {
        super::*,
        fuchsia_async as fasync,
        futures::{future::join, StreamExt},
    };

    #[fasync::run_until_stalled(test)]
    async fn item_future_completes_on_receive() {
        // Vec is chosen as the item type because it is !Copy and the primary use of this
        // method is for items that are not implicitly copiable.
        let (mut sender, mut receiver) = mpsc::channel(0);
        let (send_result, receive_result) =
            join(sender.try_send_fut(vec![1]), receiver.next()).await;
        assert_eq!(send_result, Ok(()));
        assert_eq!(receive_result, Some(vec![1]));
    }

    #[fasync::run_until_stalled(test)]
    async fn item_future_errors_on_receiver_closed() {
        let (mut sender, receiver) = mpsc::channel(0);
        // Drop receiving end to force an error.
        drop(receiver);
        let send_result = sender.try_send_fut(vec![1]).await;
        assert_eq!(send_result, Err(vec![1]));
    }

    #[test]
    fn item_future_pending_on_buffer_full() {
        let mut exec = fasync::TestExecutor::new();
        let (mut sender, mut receiver) = mpsc::channel(0);

        let send_result = exec.run_singlethreaded(sender.try_send_fut(vec![1]));
        assert_eq!(send_result, Ok(()));

        // Send a second item while the first is still in the channel.
        let mut send_fut = sender.try_send_fut(vec![2]);
        let send_poll_result = exec.run_until_stalled(&mut send_fut);
        assert_eq!(send_poll_result, Poll::Pending);

        // Consume the first item;
        let receive_poll_result = exec.run_until_stalled(&mut receiver.next());
        assert_eq!(receive_poll_result, Poll::Ready(Some(vec![1])));

        // Now there is room in the channel for the second item
        let send_poll_result = exec.run_until_stalled(&mut send_fut);
        assert_eq!(send_poll_result, Poll::Ready(Ok(())));

        // The second item is received
        let receive_poll_result = exec.run_until_stalled(&mut receiver.next());
        assert_eq!(receive_poll_result, Poll::Ready(Some(vec![2])));
    }
}