sandbox/
receiver.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::Message;
6use derivative::Derivative;
7use fidl::endpoints::ServerEnd;
8use fidl_fuchsia_io as fio;
9use futures::channel::mpsc::{self, UnboundedReceiver};
10use futures::lock::Mutex;
11use futures::StreamExt;
12use std::sync::Arc;
13
14/// Type that represents the receiving end of a [Connector]. Every [Connector] is coupled to
15/// some [Receiver] to which connection requests to that [Connector] (or any of its clones) are
16/// delivered.
17#[derive(Derivative)]
18#[derivative(Debug)]
19pub struct Receiver {
20    /// `inner` uses an async mutex because it will be locked across an await point
21    /// when asynchronously waiting for the next message.
22    inner: Arc<Mutex<UnboundedReceiver<Message>>>,
23}
24
25impl Clone for Receiver {
26    fn clone(&self) -> Self {
27        Self { inner: self.inner.clone() }
28    }
29}
30
31impl Receiver {
32    pub fn new(receiver: mpsc::UnboundedReceiver<crate::Message>) -> Self {
33        Self { inner: Arc::new(Mutex::new(receiver)) }
34    }
35
36    /// Waits to receive a message, or return `None` if there are no more messages and all
37    /// senders are dropped.
38    pub async fn receive(&self) -> Option<Message> {
39        let mut receiver_guard = self.inner.lock().await;
40        receiver_guard.next().await
41    }
42}
43
44/// Type that represents the receiving end of a [DirConnector]. The [DirConnector] counterpart of
45/// [Receiver]. Every [DirConnector] is coupled to some [Receiver] to which connection requests to
46/// that [DirConnector] (or any of its clones) are delivered.
47#[derive(Derivative)]
48#[derivative(Debug)]
49pub struct DirReceiver {
50    /// `inner` uses an async mutex because it will be locked across an await point
51    /// when asynchronously waiting for the next message.
52    inner: Arc<Mutex<UnboundedReceiver<ServerEnd<fio::DirectoryMarker>>>>,
53}
54
55impl Clone for DirReceiver {
56    fn clone(&self) -> Self {
57        Self { inner: self.inner.clone() }
58    }
59}
60
61impl DirReceiver {
62    pub fn new(receiver: mpsc::UnboundedReceiver<ServerEnd<fio::DirectoryMarker>>) -> Self {
63        Self { inner: Arc::new(Mutex::new(receiver)) }
64    }
65
66    /// Waits to receive a message, or return `None` if there are no more messages and all
67    /// senders are dropped.
68    pub async fn receive(&self) -> Option<ServerEnd<fio::DirectoryMarker>> {
69        let mut receiver_guard = self.inner.lock().await;
70        receiver_guard.next().await
71    }
72}
73
74// These tests do not run on host because the `wait_handle` function below is not supported in the
75// handle emulation layer.
76#[cfg(target_os = "fuchsia")]
77#[cfg(test)]
78mod tests {
79    use crate::Connector;
80    use assert_matches::assert_matches;
81    use futures::future::{self, Either};
82    use std::pin::pin;
83    use zx::{self as zx, AsHandleRef, Peered};
84    use {fidl_fuchsia_component_sandbox as fsandbox, fuchsia_async as fasync};
85
86    use super::*;
87
88    #[fuchsia::test]
89    async fn send_and_receive() {
90        let (receiver, sender) = Connector::new();
91
92        let (ch1, ch2) = zx::Channel::create();
93        sender.send_channel(ch1).unwrap();
94
95        let message = receiver.receive().await.unwrap();
96
97        // Check connectivity.
98        message.channel.signal_peer(zx::Signals::empty(), zx::Signals::USER_1).unwrap();
99        ch2.wait_handle(zx::Signals::USER_1, zx::MonotonicInstant::INFINITE).unwrap();
100    }
101
102    #[fuchsia::test]
103    async fn send_fail_when_receiver_dropped() {
104        let (receiver, sender) = Connector::new();
105
106        drop(receiver);
107
108        let (ch1, _ch2) = zx::Channel::create();
109        sender.send_channel(ch1).unwrap_err();
110    }
111
112    #[test]
113    fn receive_blocks_while_connector_alive() {
114        let mut ex = fasync::TestExecutor::new();
115        let (receiver, sender) = Connector::new();
116
117        {
118            let mut fut = std::pin::pin!(receiver.receive());
119            assert!(ex.run_until_stalled(&mut fut).is_pending());
120        }
121
122        drop(sender);
123
124        let mut fut = std::pin::pin!(receiver.receive());
125        let output = ex.run_until_stalled(&mut fut);
126        assert_matches!(output, std::task::Poll::Ready(None));
127    }
128
129    /// It should be possible to conclusively ensure that no more messages will arrive.
130    #[fuchsia::test]
131    async fn drain_receiver() {
132        let (receiver, sender) = Connector::new();
133
134        let (ch1, _ch2) = zx::Channel::create();
135        sender.send_channel(ch1).unwrap();
136
137        // Even if all the senders are closed after sending a message, it should still be
138        // possible to receive that message.
139        drop(sender);
140
141        // Receive the message.
142        assert!(receiver.receive().await.is_some());
143
144        // Receiving again will fail.
145        assert!(receiver.receive().await.is_none());
146    }
147
148    #[fuchsia::test]
149    async fn receiver_fidl() {
150        let (receiver, sender) = Connector::new();
151
152        let (ch1, ch2) = zx::Channel::create();
153        sender.send_channel(ch1).unwrap();
154
155        let (receiver_proxy, mut receiver_stream) =
156            fidl::endpoints::create_proxy_and_stream::<fsandbox::ReceiverMarker>();
157
158        let handler_fut = receiver.handle_receiver(receiver_proxy);
159        let receive_fut = receiver_stream.next();
160        let Either::Right((message, _)) =
161            future::select(pin!(handler_fut), pin!(receive_fut)).await
162        else {
163            panic!("Handler should not finish");
164        };
165        let message = message.unwrap().unwrap();
166        match message {
167            fsandbox::ReceiverRequest::Receive { channel, .. } => {
168                // Check connectivity.
169                channel.signal_peer(zx::Signals::empty(), zx::Signals::USER_1).unwrap();
170                ch2.wait_handle(zx::Signals::USER_1, zx::MonotonicInstant::INFINITE).unwrap();
171            }
172            _ => panic!("Unexpected message"),
173        }
174    }
175}