1use crate::Message;
6use derivative::Derivative;
7use fidl::endpoints::ServerEnd;
8use fidl_fuchsia_io as fio;
9use futures::channel::mpsc::{self, UnboundedReceiver};
10use futures::lock::Mutex;
11use futures::StreamExt;
12use std::sync::Arc;
13
14#[derive(Derivative)]
18#[derivative(Debug)]
19pub struct Receiver {
20 inner: Arc<Mutex<UnboundedReceiver<Message>>>,
23}
24
25impl Clone for Receiver {
26 fn clone(&self) -> Self {
27 Self { inner: self.inner.clone() }
28 }
29}
30
31impl Receiver {
32 pub fn new(receiver: mpsc::UnboundedReceiver<crate::Message>) -> Self {
33 Self { inner: Arc::new(Mutex::new(receiver)) }
34 }
35
36 pub async fn receive(&self) -> Option<Message> {
39 let mut receiver_guard = self.inner.lock().await;
40 receiver_guard.next().await
41 }
42}
43
44#[derive(Derivative)]
48#[derivative(Debug)]
49pub struct DirReceiver {
50 inner: Arc<Mutex<UnboundedReceiver<ServerEnd<fio::DirectoryMarker>>>>,
53}
54
55impl Clone for DirReceiver {
56 fn clone(&self) -> Self {
57 Self { inner: self.inner.clone() }
58 }
59}
60
61impl DirReceiver {
62 pub fn new(receiver: mpsc::UnboundedReceiver<ServerEnd<fio::DirectoryMarker>>) -> Self {
63 Self { inner: Arc::new(Mutex::new(receiver)) }
64 }
65
66 pub async fn receive(&self) -> Option<ServerEnd<fio::DirectoryMarker>> {
69 let mut receiver_guard = self.inner.lock().await;
70 receiver_guard.next().await
71 }
72}
73
74#[cfg(target_os = "fuchsia")]
77#[cfg(test)]
78mod tests {
79 use crate::Connector;
80 use assert_matches::assert_matches;
81 use futures::future::{self, Either};
82 use std::pin::pin;
83 use zx::{self as zx, AsHandleRef, Peered};
84 use {fidl_fuchsia_component_sandbox as fsandbox, fuchsia_async as fasync};
85
86 use super::*;
87
88 #[fuchsia::test]
89 async fn send_and_receive() {
90 let (receiver, sender) = Connector::new();
91
92 let (ch1, ch2) = zx::Channel::create();
93 sender.send_channel(ch1).unwrap();
94
95 let message = receiver.receive().await.unwrap();
96
97 message.channel.signal_peer(zx::Signals::empty(), zx::Signals::USER_1).unwrap();
99 ch2.wait_handle(zx::Signals::USER_1, zx::MonotonicInstant::INFINITE).unwrap();
100 }
101
102 #[fuchsia::test]
103 async fn send_fail_when_receiver_dropped() {
104 let (receiver, sender) = Connector::new();
105
106 drop(receiver);
107
108 let (ch1, _ch2) = zx::Channel::create();
109 sender.send_channel(ch1).unwrap_err();
110 }
111
112 #[test]
113 fn receive_blocks_while_connector_alive() {
114 let mut ex = fasync::TestExecutor::new();
115 let (receiver, sender) = Connector::new();
116
117 {
118 let mut fut = std::pin::pin!(receiver.receive());
119 assert!(ex.run_until_stalled(&mut fut).is_pending());
120 }
121
122 drop(sender);
123
124 let mut fut = std::pin::pin!(receiver.receive());
125 let output = ex.run_until_stalled(&mut fut);
126 assert_matches!(output, std::task::Poll::Ready(None));
127 }
128
129 #[fuchsia::test]
131 async fn drain_receiver() {
132 let (receiver, sender) = Connector::new();
133
134 let (ch1, _ch2) = zx::Channel::create();
135 sender.send_channel(ch1).unwrap();
136
137 drop(sender);
140
141 assert!(receiver.receive().await.is_some());
143
144 assert!(receiver.receive().await.is_none());
146 }
147
148 #[fuchsia::test]
149 async fn receiver_fidl() {
150 let (receiver, sender) = Connector::new();
151
152 let (ch1, ch2) = zx::Channel::create();
153 sender.send_channel(ch1).unwrap();
154
155 let (receiver_proxy, mut receiver_stream) =
156 fidl::endpoints::create_proxy_and_stream::<fsandbox::ReceiverMarker>();
157
158 let handler_fut = receiver.handle_receiver(receiver_proxy);
159 let receive_fut = receiver_stream.next();
160 let Either::Right((message, _)) =
161 future::select(pin!(handler_fut), pin!(receive_fut)).await
162 else {
163 panic!("Handler should not finish");
164 };
165 let message = message.unwrap().unwrap();
166 match message {
167 fsandbox::ReceiverRequest::Receive { channel, .. } => {
168 channel.signal_peer(zx::Signals::empty(), zx::Signals::USER_1).unwrap();
170 ch2.wait_handle(zx::Signals::USER_1, zx::MonotonicInstant::INFINITE).unwrap();
171 }
172 _ => panic!("Unexpected message"),
173 }
174 }
175}