sandbox/
receiver.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::{DirConnectorMessage, Message};
6use derivative::Derivative;
7use futures::StreamExt;
8use futures::channel::mpsc::{self, UnboundedReceiver};
9use futures::lock::Mutex;
10use std::sync::Arc;
11
12/// Type that represents the receiving end of a [Connector]. Every [Connector] is coupled to
13/// some [Receiver] to which connection requests to that [Connector] (or any of its clones) are
14/// delivered.
15#[derive(Derivative)]
16#[derivative(Debug)]
17pub struct Receiver {
18    /// `inner` uses an async mutex because it will be locked across an await point
19    /// when asynchronously waiting for the next message.
20    inner: Arc<Mutex<UnboundedReceiver<Message>>>,
21}
22
23impl Clone for Receiver {
24    fn clone(&self) -> Self {
25        Self { inner: self.inner.clone() }
26    }
27}
28
29impl Receiver {
30    pub fn new(receiver: mpsc::UnboundedReceiver<crate::Message>) -> Self {
31        Self { inner: Arc::new(Mutex::new(receiver)) }
32    }
33
34    /// Waits to receive a message, or return `None` if there are no more messages and all
35    /// senders are dropped.
36    pub async fn receive(&self) -> Option<Message> {
37        let mut receiver_guard = self.inner.lock().await;
38        receiver_guard.next().await
39    }
40}
41
42/// Type that represents the receiving end of a [DirConnector]. The [DirConnector] counterpart of
43/// [Receiver]. Every [DirConnector] is coupled to some [Receiver] to which connection requests to
44/// that [DirConnector] (or any of its clones) are delivered.
45#[derive(Derivative)]
46#[derivative(Debug)]
47pub struct DirReceiver {
48    /// `inner` uses an async mutex because it will be locked across an await point
49    /// when asynchronously waiting for the next message.
50    inner: Arc<Mutex<UnboundedReceiver<DirConnectorMessage>>>,
51}
52
53impl Clone for DirReceiver {
54    fn clone(&self) -> Self {
55        Self { inner: self.inner.clone() }
56    }
57}
58
59impl DirReceiver {
60    pub fn new(receiver: mpsc::UnboundedReceiver<DirConnectorMessage>) -> Self {
61        Self { inner: Arc::new(Mutex::new(receiver)) }
62    }
63
64    /// Waits to receive a message, or return `None` if there are no more messages and all
65    /// senders are dropped.
66    pub async fn receive(&self) -> Option<DirConnectorMessage> {
67        let mut receiver_guard = self.inner.lock().await;
68        receiver_guard.next().await
69    }
70}
71
72// These tests do not run on host because the `wait_handle` function below is not supported in the
73// handle emulation layer.
74#[cfg(target_os = "fuchsia")]
75#[cfg(test)]
76mod tests {
77    use crate::Connector;
78    use assert_matches::assert_matches;
79    use futures::future::{self, Either};
80    use std::pin::pin;
81    use zx::{self as zx, AsHandleRef, Peered};
82    use {fidl_fuchsia_component_sandbox as fsandbox, fuchsia_async as fasync};
83
84    use super::*;
85
86    #[fuchsia::test]
87    async fn send_and_receive() {
88        let (receiver, sender) = Connector::new();
89
90        let (ch1, ch2) = zx::Channel::create();
91        sender.send_channel(ch1).unwrap();
92
93        let message = receiver.receive().await.unwrap();
94
95        // Check connectivity.
96        message.channel.signal_peer(zx::Signals::empty(), zx::Signals::USER_1).unwrap();
97        ch2.wait_handle(zx::Signals::USER_1, zx::MonotonicInstant::INFINITE).unwrap();
98    }
99
100    #[fuchsia::test]
101    async fn send_fail_when_receiver_dropped() {
102        let (receiver, sender) = Connector::new();
103
104        drop(receiver);
105
106        let (ch1, _ch2) = zx::Channel::create();
107        sender.send_channel(ch1).unwrap_err();
108    }
109
110    #[test]
111    fn receive_blocks_while_connector_alive() {
112        let mut ex = fasync::TestExecutor::new();
113        let (receiver, sender) = Connector::new();
114
115        {
116            let mut fut = std::pin::pin!(receiver.receive());
117            assert!(ex.run_until_stalled(&mut fut).is_pending());
118        }
119
120        drop(sender);
121
122        let mut fut = std::pin::pin!(receiver.receive());
123        let output = ex.run_until_stalled(&mut fut);
124        assert_matches!(output, std::task::Poll::Ready(None));
125    }
126
127    /// It should be possible to conclusively ensure that no more messages will arrive.
128    #[fuchsia::test]
129    async fn drain_receiver() {
130        let (receiver, sender) = Connector::new();
131
132        let (ch1, _ch2) = zx::Channel::create();
133        sender.send_channel(ch1).unwrap();
134
135        // Even if all the senders are closed after sending a message, it should still be
136        // possible to receive that message.
137        drop(sender);
138
139        // Receive the message.
140        assert!(receiver.receive().await.is_some());
141
142        // Receiving again will fail.
143        assert!(receiver.receive().await.is_none());
144    }
145
146    #[fuchsia::test]
147    async fn receiver_fidl() {
148        let (receiver, sender) = Connector::new();
149
150        let (ch1, ch2) = zx::Channel::create();
151        sender.send_channel(ch1).unwrap();
152
153        let (receiver_proxy, mut receiver_stream) =
154            fidl::endpoints::create_proxy_and_stream::<fsandbox::ReceiverMarker>();
155
156        let handler_fut = receiver.handle_receiver(receiver_proxy);
157        let receive_fut = receiver_stream.next();
158        let Either::Right((message, _)) =
159            future::select(pin!(handler_fut), pin!(receive_fut)).await
160        else {
161            panic!("Handler should not finish");
162        };
163        let message = message.unwrap().unwrap();
164        match message {
165            fsandbox::ReceiverRequest::Receive { channel, .. } => {
166                // Check connectivity.
167                channel.signal_peer(zx::Signals::empty(), zx::Signals::USER_1).unwrap();
168                ch2.wait_handle(zx::Signals::USER_1, zx::MonotonicInstant::INFINITE).unwrap();
169            }
170            _ => panic!("Unexpected message"),
171        }
172    }
173}