Skip to main content

runtime_capabilities/
receiver.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::{DirConnectorMessage, Message};
6use derivative::Derivative;
7use futures::StreamExt;
8use futures::channel::mpsc::{self, UnboundedReceiver};
9use futures::lock::Mutex;
10use std::sync::Arc;
11
12/// Type that represents the receiving end of a [Connector]. Every [Connector] is coupled to
13/// some [Receiver] to which connection requests to that [Connector] (or any of its clones) are
14/// delivered.
15#[derive(Derivative)]
16#[derivative(Debug)]
17pub struct Receiver {
18    /// `inner` uses an async mutex because it will be locked across an await point
19    /// when asynchronously waiting for the next message.
20    inner: Arc<Mutex<UnboundedReceiver<Message>>>,
21}
22
23impl Clone for Receiver {
24    fn clone(&self) -> Self {
25        Self { inner: self.inner.clone() }
26    }
27}
28
29impl Receiver {
30    pub fn new(receiver: mpsc::UnboundedReceiver<crate::Message>) -> Self {
31        Self { inner: Arc::new(Mutex::new(receiver)) }
32    }
33
34    /// Waits to receive a message, or return `None` if there are no more messages and all
35    /// senders are dropped.
36    pub async fn receive(&self) -> Option<Message> {
37        let mut receiver_guard = self.inner.lock().await;
38        receiver_guard.next().await
39    }
40}
41
42/// Type that represents the receiving end of a [DirConnector]. The [DirConnector] counterpart of
43/// [Receiver]. Every [DirConnector] is coupled to some [Receiver] to which connection requests to
44/// that [DirConnector] (or any of its clones) are delivered.
45#[derive(Derivative)]
46#[derivative(Debug)]
47pub struct DirReceiver {
48    /// `inner` uses an async mutex because it will be locked across an await point
49    /// when asynchronously waiting for the next message.
50    inner: Arc<Mutex<UnboundedReceiver<DirConnectorMessage>>>,
51}
52
53impl Clone for DirReceiver {
54    fn clone(&self) -> Self {
55        Self { inner: self.inner.clone() }
56    }
57}
58
59impl DirReceiver {
60    pub fn new(receiver: mpsc::UnboundedReceiver<DirConnectorMessage>) -> Self {
61        Self { inner: Arc::new(Mutex::new(receiver)) }
62    }
63
64    /// Waits to receive a message, or return `None` if there are no more messages and all
65    /// senders are dropped.
66    pub async fn receive(&self) -> Option<DirConnectorMessage> {
67        let mut receiver_guard = self.inner.lock().await;
68        receiver_guard.next().await
69    }
70}
71
72// These tests do not run on host because the `wait_handle` function below is not supported in the
73// handle emulation layer.
74#[cfg(target_os = "fuchsia")]
75#[cfg(test)]
76mod tests {
77    use crate::Connector;
78    use assert_matches::assert_matches;
79    use fidl_fuchsia_component_sandbox as fsandbox;
80    use fuchsia_async as fasync;
81    use futures::future::{self, Either};
82    use std::pin::pin;
83    use zx::{self as zx, Peered};
84
85    use super::*;
86
87    #[fuchsia::test]
88    async fn send_and_receive() {
89        let (receiver, sender) = Connector::new();
90
91        let (ch1, ch2) = zx::Channel::create();
92        sender.send_channel(ch1).unwrap();
93
94        let message = receiver.receive().await.unwrap();
95
96        // Check connectivity.
97        message.channel.signal_peer(zx::Signals::empty(), zx::Signals::USER_1).unwrap();
98        ch2.wait_one(zx::Signals::USER_1, zx::MonotonicInstant::INFINITE).unwrap();
99    }
100
101    #[fuchsia::test]
102    async fn send_fail_when_receiver_dropped() {
103        let (receiver, sender) = Connector::new();
104
105        drop(receiver);
106
107        let (ch1, _ch2) = zx::Channel::create();
108        sender.send_channel(ch1).unwrap_err();
109    }
110
111    #[test]
112    fn receive_blocks_while_connector_alive() {
113        let mut ex = fasync::TestExecutor::new();
114        let (receiver, sender) = Connector::new();
115
116        {
117            let mut fut = std::pin::pin!(receiver.receive());
118            assert!(ex.run_until_stalled(&mut fut).is_pending());
119        }
120
121        drop(sender);
122
123        let mut fut = std::pin::pin!(receiver.receive());
124        let output = ex.run_until_stalled(&mut fut);
125        assert_matches!(output, std::task::Poll::Ready(None));
126    }
127
128    /// It should be possible to conclusively ensure that no more messages will arrive.
129    #[fuchsia::test]
130    async fn drain_receiver() {
131        let (receiver, sender) = Connector::new();
132
133        let (ch1, _ch2) = zx::Channel::create();
134        sender.send_channel(ch1).unwrap();
135
136        // Even if all the senders are closed after sending a message, it should still be
137        // possible to receive that message.
138        drop(sender);
139
140        // Receive the message.
141        assert!(receiver.receive().await.is_some());
142
143        // Receiving again will fail.
144        assert!(receiver.receive().await.is_none());
145    }
146
147    #[fuchsia::test]
148    async fn receiver_fidl() {
149        let (receiver, sender) = Connector::new();
150
151        let (ch1, ch2) = zx::Channel::create();
152        sender.send_channel(ch1).unwrap();
153
154        let (receiver_proxy, mut receiver_stream) =
155            fidl::endpoints::create_proxy_and_stream::<fsandbox::ReceiverMarker>();
156
157        let handler_fut = receiver.handle_receiver(receiver_proxy);
158        let receive_fut = receiver_stream.next();
159        let Either::Right((message, _)) =
160            future::select(pin!(handler_fut), pin!(receive_fut)).await
161        else {
162            panic!("Handler should not finish");
163        };
164        let message = message.unwrap().unwrap();
165        match message {
166            fsandbox::ReceiverRequest::Receive { channel, .. } => {
167                // Check connectivity.
168                channel.signal_peer(zx::Signals::empty(), zx::Signals::USER_1).unwrap();
169                ch2.wait_one(zx::Signals::USER_1, zx::MonotonicInstant::INFINITE).unwrap();
170            }
171            _ => panic!("Unexpected message"),
172        }
173    }
174}