1#![recursion_limit = "512"]
6
7mod addr;
8mod port;
9mod service;
10
11pub use self::service::Vsock;
12
13#[cfg(test)]
14mod tests {
15 use super::*;
16 use fidl::endpoints::{self, Proxy};
17 use fidl_fuchsia_hardware_vsock::{
18 CallbacksProxy, DeviceMarker, DeviceRequest, DeviceRequestStream, VMADDR_CID_HOST,
19 VMADDR_CID_LOCAL,
20 };
21 use fidl_fuchsia_vsock::{
22 AcceptorMarker, AcceptorRequest, ConnectionMarker, ConnectionProxy, ConnectionTransport,
23 ConnectorMarker, ConnectorProxy, ListenerMarker,
24 };
25 use fuchsia_async as fasync;
26 use futures::{channel, future, FutureExt, StreamExt, TryFutureExt};
27 struct MockDriver {
28 client: DeviceRequestStream,
29 callbacks: CallbacksProxy,
30 }
31
32 impl MockDriver {
33 fn new(client: DeviceRequestStream, callbacks: CallbacksProxy) -> Self {
34 MockDriver { client, callbacks }
35 }
36 }
37
38 macro_rules! unwrap_msg {
39 ($msg:path{$($bindings:tt)*} from $stream:expr) => {
40 if let Some(Ok($msg{$($bindings)*})) = $stream.next().await {
41 ($($bindings)*)
42 } else {
43 panic!("Expected msg {}", stringify!($msg));
44 }
45 }
46 }
47
48 async fn common_setup() -> Result<(MockDriver, Vsock), anyhow::Error> {
49 let (driver_client, driver_server) = endpoints::create_endpoints::<DeviceMarker>();
50 let mut driver_server = driver_server.into_stream();
51
52 let (tx, rx) = channel::oneshot::channel();
58 fasync::Task::spawn(async move {
59 let (cb, responder) =
60 unwrap_msg!(DeviceRequest::Start{cb, responder} from driver_server);
61 let driver_callbacks = cb.into_proxy();
62 responder.send(Ok(())).unwrap();
63 let _ = tx.send((driver_server, driver_callbacks));
64 })
65 .detach();
66
67 let (service, event_loop) = Vsock::new(None, Some(driver_client.into_proxy())).await?;
68 fasync::Task::local(event_loop.map_err(|x| panic!("Event loop stopped {}", x)).map(|_| ()))
69 .detach();
70 let (driver_server, driver_callbacks) = rx.await?;
71 let driver = MockDriver::new(driver_server, driver_callbacks);
72 Ok((driver, service))
73 }
74
75 fn make_con() -> Result<(zx::Socket, ConnectionProxy, ConnectionTransport), anyhow::Error> {
76 let (client_socket, server_socket) = zx::Socket::create_stream();
77 let (client_end, server_end) = endpoints::create_endpoints::<ConnectionMarker>();
78 let client_end = client_end.into_proxy();
79 let con = ConnectionTransport { data: server_socket, con: server_end };
80 Ok((client_socket, client_end, con))
81 }
82
83 fn make_client(service: &Vsock) -> Result<ConnectorProxy, anyhow::Error> {
84 let (app_client, app_remote) = endpoints::create_endpoints::<ConnectorMarker>();
85 let app_client = app_client.into_proxy();
86 fasync::Task::local(
88 Vsock::run_client_connection(service.clone(), app_remote.into_stream())
89 .then(|_| future::ready(())),
90 )
91 .detach();
92 Ok(app_client)
93 }
94
95 #[fasync::run_until_stalled(test)]
96 async fn basic_listen() -> Result<(), anyhow::Error> {
97 let (mut driver, service) = common_setup().await?;
98
99 let app_client = make_client(&service)?;
100
101 {
103 let (acceptor_remote, _acceptor_client) =
104 endpoints::create_endpoints::<AcceptorMarker>();
105 assert_eq!(
106 app_client.listen(49152, acceptor_remote).await?,
107 Err(zx::sys::ZX_ERR_UNAVAILABLE)
108 );
109 }
110
111 let (acceptor_remote, acceptor_client) = endpoints::create_endpoints::<AcceptorMarker>();
113 assert_eq!(app_client.listen(8000, acceptor_remote).await?, Ok(()));
114 let mut acceptor_client = acceptor_client.into_stream();
115
116 {
118 let (acceptor_remote, _acceptor_client) =
119 endpoints::create_endpoints::<AcceptorMarker>();
120 assert_eq!(
121 app_client.listen(8000, acceptor_remote).await?,
122 Err(zx::sys::ZX_ERR_ALREADY_BOUND)
123 );
124 }
125
126 driver.callbacks.request(&*addr::Vsock::new(8000, 80, VMADDR_CID_HOST))?;
128 let (_data_socket, _client_end, con) = make_con()?;
129
130 let (_, responder) =
131 unwrap_msg!(AcceptorRequest::Accept{addr, responder} from acceptor_client);
132 responder.send(Some(con))?;
133
134 let (_, _server_data_socket, responder) =
136 unwrap_msg!(DeviceRequest::SendResponse{addr, data, responder} from driver.client);
137 responder.send(Ok(()))?;
138
139 Ok(())
140 }
141
142 #[fasync::run_until_stalled(test)]
143 async fn basic_bind_and_listen() -> Result<(), anyhow::Error> {
144 let (mut driver, service) = common_setup().await?;
145 let app_client = make_client(&service)?;
146 let (_, listener_remote) = endpoints::create_endpoints::<ListenerMarker>();
147
148 assert_eq!(
150 app_client.bind(VMADDR_CID_LOCAL, 49152, listener_remote).await?,
151 Err(zx::sys::ZX_ERR_UNAVAILABLE)
152 );
153
154 let (listener_client, listener_remote) = endpoints::create_endpoints::<ListenerMarker>();
155 assert_eq!(app_client.bind(VMADDR_CID_LOCAL, 8000, listener_remote).await?, Ok(()));
157
158 let (_, listener_remote2) = endpoints::create_endpoints::<ListenerMarker>();
160 assert_eq!(
161 app_client.bind(VMADDR_CID_LOCAL, 8000, listener_remote2).await?,
162 Err(zx::sys::ZX_ERR_ALREADY_BOUND)
163 );
164
165 let listener_client = listener_client.into_proxy();
166 assert_eq!(listener_client.listen(1).await?, Ok(()));
167
168 driver.callbacks.request(&*addr::Vsock::new(8000, 80, VMADDR_CID_LOCAL))?;
170 let (_data_socket, _client_end, con) = make_con()?;
171
172 let accept_fut = listener_client.accept(con);
173
174 let (_, _server_data_socket, responder) =
176 unwrap_msg!(DeviceRequest::SendResponse{addr, data, responder} from driver.client);
177 responder.send(Ok(()))?;
178
179 assert_eq!(accept_fut.await?, Ok(*addr::Vsock::new(8000, 80, VMADDR_CID_LOCAL)));
180
181 Ok(())
182 }
183
184 #[fasync::run_until_stalled(test)]
185 async fn reject_connection() -> Result<(), anyhow::Error> {
186 let (mut driver, service) = common_setup().await?;
187
188 let app_client = make_client(&service)?;
189
190 let (_data_socket, _client_end, con) = make_con()?;
192 let request = app_client.connect(VMADDR_CID_HOST, 8000, con);
193
194 {
196 let (addr, _server_data_socket, responder) =
197 unwrap_msg!(DeviceRequest::SendRequest{addr, data, responder} from driver.client);
198 responder.send(Ok(()))?;
199 driver.callbacks.rst(&addr)?;
201 }
203 assert_eq!(request.await?, Err(zx::sys::ZX_ERR_UNAVAILABLE));
205 Ok(())
206 }
207
208 #[fasync::run_until_stalled(test)]
209 async fn transport_reset() -> Result<(), anyhow::Error> {
210 let (mut driver, service) = common_setup().await?;
211
212 let app_client = make_client(&service)?;
213
214 let (_data_socket_request, client_end_request, con) = make_con()?;
216 let request = app_client.connect(VMADDR_CID_HOST, 8000, con);
217 let (addr, server_data_socket_request, responder) =
218 unwrap_msg!(DeviceRequest::SendRequest{addr, data, responder} from driver.client);
219 responder.send(Ok(()))?;
220 driver.callbacks.response(&addr)?;
221 let _ = request.await?.map_err(zx::Status::from_raw)?;
222
223 let (acceptor_remote, acceptor_client) = endpoints::create_endpoints::<AcceptorMarker>();
225 assert_eq!(app_client.listen(9000, acceptor_remote).await?, Ok(()));
226 let mut acceptor_client = acceptor_client.into_stream();
227
228 drop(server_data_socket_request);
230 driver.callbacks.transport_reset(7).await?;
231
232 client_end_request.on_closed().await?;
234
235 driver.callbacks.request(&*addr::Vsock::new(9000, 80, VMADDR_CID_HOST))?;
237 let (_data_socket, _client_end, _con) = make_con()?;
238
239 let (_addr, responder) =
240 unwrap_msg!(AcceptorRequest::Accept{addr, responder} from acceptor_client);
241 drop(responder); Ok(())
244 }
245}