runtime_capabilities/
receiver.rs1use crate::{DirConnectorMessage, Message};
6use derivative::Derivative;
7use futures::StreamExt;
8use futures::channel::mpsc::{self, UnboundedReceiver};
9use futures::lock::Mutex;
10use std::sync::Arc;
11
12#[derive(Derivative)]
16#[derivative(Debug)]
17pub struct Receiver {
18 inner: Arc<Mutex<UnboundedReceiver<Message>>>,
21}
22
23impl Clone for Receiver {
24 fn clone(&self) -> Self {
25 Self { inner: self.inner.clone() }
26 }
27}
28
29impl Receiver {
30 pub fn new(receiver: mpsc::UnboundedReceiver<crate::Message>) -> Self {
31 Self { inner: Arc::new(Mutex::new(receiver)) }
32 }
33
34 pub async fn receive(&self) -> Option<Message> {
37 let mut receiver_guard = self.inner.lock().await;
38 receiver_guard.next().await
39 }
40}
41
42#[derive(Derivative)]
46#[derivative(Debug)]
47pub struct DirReceiver {
48 inner: Arc<Mutex<UnboundedReceiver<DirConnectorMessage>>>,
51}
52
53impl Clone for DirReceiver {
54 fn clone(&self) -> Self {
55 Self { inner: self.inner.clone() }
56 }
57}
58
59impl DirReceiver {
60 pub fn new(receiver: mpsc::UnboundedReceiver<DirConnectorMessage>) -> Self {
61 Self { inner: Arc::new(Mutex::new(receiver)) }
62 }
63
64 pub async fn receive(&self) -> Option<DirConnectorMessage> {
67 let mut receiver_guard = self.inner.lock().await;
68 receiver_guard.next().await
69 }
70}
71
72#[cfg(target_os = "fuchsia")]
75#[cfg(test)]
76mod tests {
77 use crate::Connector;
78 use assert_matches::assert_matches;
79 use fidl_fuchsia_component_sandbox as fsandbox;
80 use fuchsia_async as fasync;
81 use futures::future::{self, Either};
82 use std::pin::pin;
83 use zx::{self as zx, Peered};
84
85 use super::*;
86
87 #[fuchsia::test]
88 async fn send_and_receive() {
89 let (receiver, sender) = Connector::new();
90
91 let (ch1, ch2) = zx::Channel::create();
92 sender.send_channel(ch1).unwrap();
93
94 let message = receiver.receive().await.unwrap();
95
96 message.channel.signal_peer(zx::Signals::empty(), zx::Signals::USER_1).unwrap();
98 ch2.wait_one(zx::Signals::USER_1, zx::MonotonicInstant::INFINITE).unwrap();
99 }
100
101 #[fuchsia::test]
102 async fn send_fail_when_receiver_dropped() {
103 let (receiver, sender) = Connector::new();
104
105 drop(receiver);
106
107 let (ch1, _ch2) = zx::Channel::create();
108 sender.send_channel(ch1).unwrap_err();
109 }
110
111 #[test]
112 fn receive_blocks_while_connector_alive() {
113 let mut ex = fasync::TestExecutor::new();
114 let (receiver, sender) = Connector::new();
115
116 {
117 let mut fut = std::pin::pin!(receiver.receive());
118 assert!(ex.run_until_stalled(&mut fut).is_pending());
119 }
120
121 drop(sender);
122
123 let mut fut = std::pin::pin!(receiver.receive());
124 let output = ex.run_until_stalled(&mut fut);
125 assert_matches!(output, std::task::Poll::Ready(None));
126 }
127
128 #[fuchsia::test]
130 async fn drain_receiver() {
131 let (receiver, sender) = Connector::new();
132
133 let (ch1, _ch2) = zx::Channel::create();
134 sender.send_channel(ch1).unwrap();
135
136 drop(sender);
139
140 assert!(receiver.receive().await.is_some());
142
143 assert!(receiver.receive().await.is_none());
145 }
146
147 #[fuchsia::test]
148 async fn receiver_fidl() {
149 let (receiver, sender) = Connector::new();
150
151 let (ch1, ch2) = zx::Channel::create();
152 sender.send_channel(ch1).unwrap();
153
154 let (receiver_proxy, mut receiver_stream) =
155 fidl::endpoints::create_proxy_and_stream::<fsandbox::ReceiverMarker>();
156
157 let handler_fut = receiver.handle_receiver(receiver_proxy);
158 let receive_fut = receiver_stream.next();
159 let Either::Right((message, _)) =
160 future::select(pin!(handler_fut), pin!(receive_fut)).await
161 else {
162 panic!("Handler should not finish");
163 };
164 let message = message.unwrap().unwrap();
165 match message {
166 fsandbox::ReceiverRequest::Receive { channel, .. } => {
167 channel.signal_peer(zx::Signals::empty(), zx::Signals::USER_1).unwrap();
169 ch2.wait_one(zx::Signals::USER_1, zx::MonotonicInstant::INFINITE).unwrap();
170 }
171 _ => panic!("Unexpected message"),
172 }
173 }
174}