1use crate::{DirConnectorMessage, Message};
6use derivative::Derivative;
7use futures::StreamExt;
8use futures::channel::mpsc::{self, UnboundedReceiver};
9use futures::lock::Mutex;
10use std::sync::Arc;
11
12#[derive(Derivative)]
16#[derivative(Debug)]
17pub struct Receiver {
18 inner: Arc<Mutex<UnboundedReceiver<Message>>>,
21}
22
23impl Clone for Receiver {
24 fn clone(&self) -> Self {
25 Self { inner: self.inner.clone() }
26 }
27}
28
29impl Receiver {
30 pub fn new(receiver: mpsc::UnboundedReceiver<crate::Message>) -> Self {
31 Self { inner: Arc::new(Mutex::new(receiver)) }
32 }
33
34 pub async fn receive(&self) -> Option<Message> {
37 let mut receiver_guard = self.inner.lock().await;
38 receiver_guard.next().await
39 }
40}
41
42#[derive(Derivative)]
46#[derivative(Debug)]
47pub struct DirReceiver {
48 inner: Arc<Mutex<UnboundedReceiver<DirConnectorMessage>>>,
51}
52
53impl Clone for DirReceiver {
54 fn clone(&self) -> Self {
55 Self { inner: self.inner.clone() }
56 }
57}
58
59impl DirReceiver {
60 pub fn new(receiver: mpsc::UnboundedReceiver<DirConnectorMessage>) -> Self {
61 Self { inner: Arc::new(Mutex::new(receiver)) }
62 }
63
64 pub async fn receive(&self) -> Option<DirConnectorMessage> {
67 let mut receiver_guard = self.inner.lock().await;
68 receiver_guard.next().await
69 }
70}
71
72#[cfg(target_os = "fuchsia")]
75#[cfg(test)]
76mod tests {
77 use crate::Connector;
78 use assert_matches::assert_matches;
79 use futures::future::{self, Either};
80 use std::pin::pin;
81 use zx::{self as zx, AsHandleRef, Peered};
82 use {fidl_fuchsia_component_sandbox as fsandbox, fuchsia_async as fasync};
83
84 use super::*;
85
86 #[fuchsia::test]
87 async fn send_and_receive() {
88 let (receiver, sender) = Connector::new();
89
90 let (ch1, ch2) = zx::Channel::create();
91 sender.send_channel(ch1).unwrap();
92
93 let message = receiver.receive().await.unwrap();
94
95 message.channel.signal_peer(zx::Signals::empty(), zx::Signals::USER_1).unwrap();
97 ch2.wait_handle(zx::Signals::USER_1, zx::MonotonicInstant::INFINITE).unwrap();
98 }
99
100 #[fuchsia::test]
101 async fn send_fail_when_receiver_dropped() {
102 let (receiver, sender) = Connector::new();
103
104 drop(receiver);
105
106 let (ch1, _ch2) = zx::Channel::create();
107 sender.send_channel(ch1).unwrap_err();
108 }
109
110 #[test]
111 fn receive_blocks_while_connector_alive() {
112 let mut ex = fasync::TestExecutor::new();
113 let (receiver, sender) = Connector::new();
114
115 {
116 let mut fut = std::pin::pin!(receiver.receive());
117 assert!(ex.run_until_stalled(&mut fut).is_pending());
118 }
119
120 drop(sender);
121
122 let mut fut = std::pin::pin!(receiver.receive());
123 let output = ex.run_until_stalled(&mut fut);
124 assert_matches!(output, std::task::Poll::Ready(None));
125 }
126
127 #[fuchsia::test]
129 async fn drain_receiver() {
130 let (receiver, sender) = Connector::new();
131
132 let (ch1, _ch2) = zx::Channel::create();
133 sender.send_channel(ch1).unwrap();
134
135 drop(sender);
138
139 assert!(receiver.receive().await.is_some());
141
142 assert!(receiver.receive().await.is_none());
144 }
145
146 #[fuchsia::test]
147 async fn receiver_fidl() {
148 let (receiver, sender) = Connector::new();
149
150 let (ch1, ch2) = zx::Channel::create();
151 sender.send_channel(ch1).unwrap();
152
153 let (receiver_proxy, mut receiver_stream) =
154 fidl::endpoints::create_proxy_and_stream::<fsandbox::ReceiverMarker>();
155
156 let handler_fut = receiver.handle_receiver(receiver_proxy);
157 let receive_fut = receiver_stream.next();
158 let Either::Right((message, _)) =
159 future::select(pin!(handler_fut), pin!(receive_fut)).await
160 else {
161 panic!("Handler should not finish");
162 };
163 let message = message.unwrap().unwrap();
164 match message {
165 fsandbox::ReceiverRequest::Receive { channel, .. } => {
166 channel.signal_peer(zx::Signals::empty(), zx::Signals::USER_1).unwrap();
168 ch2.wait_handle(zx::Signals::USER_1, zx::MonotonicInstant::INFINITE).unwrap();
169 }
170 _ => panic!("Unexpected message"),
171 }
172 }
173}