vsock_service_lib/
lib.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![recursion_limit = "512"]
6
7mod addr;
8mod port;
9mod service;
10
11pub use self::service::Vsock;
12
13#[cfg(test)]
14mod tests {
15    use super::*;
16    use fidl::endpoints::{self, Proxy};
17    use fidl_fuchsia_hardware_vsock::{
18        CallbacksProxy, DeviceMarker, DeviceRequest, DeviceRequestStream, VMADDR_CID_HOST,
19        VMADDR_CID_LOCAL,
20    };
21    use fidl_fuchsia_vsock::{
22        AcceptorMarker, AcceptorRequest, ConnectionMarker, ConnectionProxy, ConnectionTransport,
23        ConnectorMarker, ConnectorProxy, ListenerMarker,
24    };
25    use fuchsia_async as fasync;
26    use futures::{channel, future, FutureExt, StreamExt, TryFutureExt};
27    struct MockDriver {
28        client: DeviceRequestStream,
29        callbacks: CallbacksProxy,
30    }
31
32    impl MockDriver {
33        fn new(client: DeviceRequestStream, callbacks: CallbacksProxy) -> Self {
34            MockDriver { client, callbacks }
35        }
36    }
37
38    macro_rules! unwrap_msg {
39        ($msg:path{$($bindings:tt)*} from $stream:expr) => {
40            if let Some(Ok($msg{$($bindings)*})) = $stream.next().await {
41                ($($bindings)*)
42            } else {
43                panic!("Expected msg {}", stringify!($msg));
44            }
45        }
46    }
47
48    async fn common_setup() -> Result<(MockDriver, Vsock), anyhow::Error> {
49        let (driver_client, driver_server) = endpoints::create_endpoints::<DeviceMarker>();
50        let mut driver_server = driver_server.into_stream();
51
52        // Vsock::new expects to be able to communication with a running driver instance.
53        // As we don't have a driver instance we spin up an asynchronous thread that will
54        // momentarily pretend to be the driver to receive the callbacks, and then send
55        // those callbacks over the below oneshot channel that we can then receive after
56        // Vsock::new completes.
57        let (tx, rx) = channel::oneshot::channel();
58        fasync::Task::spawn(async move {
59            let (cb, responder) =
60                unwrap_msg!(DeviceRequest::Start{cb, responder} from driver_server);
61            let driver_callbacks = cb.into_proxy();
62            responder.send(Ok(())).unwrap();
63            let _ = tx.send((driver_server, driver_callbacks));
64        })
65        .detach();
66
67        let (service, event_loop) = Vsock::new(None, Some(driver_client.into_proxy())).await?;
68        fasync::Task::local(event_loop.map_err(|x| panic!("Event loop stopped {}", x)).map(|_| ()))
69            .detach();
70        let (driver_server, driver_callbacks) = rx.await?;
71        let driver = MockDriver::new(driver_server, driver_callbacks);
72        Ok((driver, service))
73    }
74
75    fn make_con() -> Result<(zx::Socket, ConnectionProxy, ConnectionTransport), anyhow::Error> {
76        let (client_socket, server_socket) = zx::Socket::create_stream();
77        let (client_end, server_end) = endpoints::create_endpoints::<ConnectionMarker>();
78        let client_end = client_end.into_proxy();
79        let con = ConnectionTransport { data: server_socket, con: server_end };
80        Ok((client_socket, client_end, con))
81    }
82
83    fn make_client(service: &Vsock) -> Result<ConnectorProxy, anyhow::Error> {
84        let (app_client, app_remote) = endpoints::create_endpoints::<ConnectorMarker>();
85        let app_client = app_client.into_proxy();
86        // Run the client
87        fasync::Task::local(
88            Vsock::run_client_connection(service.clone(), app_remote.into_stream())
89                .then(|_| future::ready(())),
90        )
91        .detach();
92        Ok(app_client)
93    }
94
95    #[fasync::run_until_stalled(test)]
96    async fn basic_listen() -> Result<(), anyhow::Error> {
97        let (mut driver, service) = common_setup().await?;
98
99        let app_client = make_client(&service)?;
100
101        // Should reject listening at the ephemeral port ranges.
102        {
103            let (acceptor_remote, _acceptor_client) =
104                endpoints::create_endpoints::<AcceptorMarker>();
105            assert_eq!(
106                app_client.listen(49152, acceptor_remote).await?,
107                Err(zx::sys::ZX_ERR_UNAVAILABLE)
108            );
109        }
110
111        // Listen on a reasonable value.
112        let (acceptor_remote, acceptor_client) = endpoints::create_endpoints::<AcceptorMarker>();
113        assert_eq!(app_client.listen(8000, acceptor_remote).await?, Ok(()));
114        let mut acceptor_client = acceptor_client.into_stream();
115
116        // Validate that we cannot listen twice
117        {
118            let (acceptor_remote, _acceptor_client) =
119                endpoints::create_endpoints::<AcceptorMarker>();
120            assert_eq!(
121                app_client.listen(8000, acceptor_remote).await?,
122                Err(zx::sys::ZX_ERR_ALREADY_BOUND)
123            );
124        }
125
126        // Create a connection from the driver
127        driver.callbacks.request(&*addr::Vsock::new(8000, 80, VMADDR_CID_HOST))?;
128        let (_data_socket, _client_end, con) = make_con()?;
129
130        let (_, responder) =
131            unwrap_msg!(AcceptorRequest::Accept{addr, responder} from acceptor_client);
132        responder.send(Some(con))?;
133
134        // expect a response
135        let (_, _server_data_socket, responder) =
136            unwrap_msg!(DeviceRequest::SendResponse{addr, data, responder} from driver.client);
137        responder.send(Ok(()))?;
138
139        Ok(())
140    }
141
142    #[fasync::run_until_stalled(test)]
143    async fn basic_bind_and_listen() -> Result<(), anyhow::Error> {
144        let (mut driver, service) = common_setup().await?;
145        let app_client = make_client(&service)?;
146        let (_, listener_remote) = endpoints::create_endpoints::<ListenerMarker>();
147
148        // Should reject listening at the ephemeral port ranges.
149        assert_eq!(
150            app_client.bind(VMADDR_CID_LOCAL, 49152, listener_remote).await?,
151            Err(zx::sys::ZX_ERR_UNAVAILABLE)
152        );
153
154        let (listener_client, listener_remote) = endpoints::create_endpoints::<ListenerMarker>();
155        // Listen on a reasonable value.
156        assert_eq!(app_client.bind(VMADDR_CID_LOCAL, 8000, listener_remote).await?, Ok(()));
157
158        // Validate that we cannot listen twice
159        let (_, listener_remote2) = endpoints::create_endpoints::<ListenerMarker>();
160        assert_eq!(
161            app_client.bind(VMADDR_CID_LOCAL, 8000, listener_remote2).await?,
162            Err(zx::sys::ZX_ERR_ALREADY_BOUND)
163        );
164
165        let listener_client = listener_client.into_proxy();
166        assert_eq!(listener_client.listen(1).await?, Ok(()));
167
168        // Create a connection from the driver
169        driver.callbacks.request(&*addr::Vsock::new(8000, 80, VMADDR_CID_LOCAL))?;
170        let (_data_socket, _client_end, con) = make_con()?;
171
172        let accept_fut = listener_client.accept(con);
173
174        // expect a response
175        let (_, _server_data_socket, responder) =
176            unwrap_msg!(DeviceRequest::SendResponse{addr, data, responder} from driver.client);
177        responder.send(Ok(()))?;
178
179        assert_eq!(accept_fut.await?, Ok(*addr::Vsock::new(8000, 80, VMADDR_CID_LOCAL)));
180
181        Ok(())
182    }
183
184    #[fasync::run_until_stalled(test)]
185    async fn reject_connection() -> Result<(), anyhow::Error> {
186        let (mut driver, service) = common_setup().await?;
187
188        let app_client = make_client(&service)?;
189
190        // Send a connection request
191        let (_data_socket, _client_end, con) = make_con()?;
192        let request = app_client.connect(VMADDR_CID_HOST, 8000, con);
193
194        // Expect a driver message
195        {
196            let (addr, _server_data_socket, responder) =
197                unwrap_msg!(DeviceRequest::SendRequest{addr, data, responder} from driver.client);
198            responder.send(Ok(()))?;
199            // Now simulate an incoming RST for a rejected connection
200            driver.callbacks.rst(&addr)?;
201            // Leave this scope to drop the server_data_socket
202        }
203        // Request should resolve to an error
204        assert_eq!(request.await?, Err(zx::sys::ZX_ERR_UNAVAILABLE));
205        Ok(())
206    }
207
208    #[fasync::run_until_stalled(test)]
209    async fn transport_reset() -> Result<(), anyhow::Error> {
210        let (mut driver, service) = common_setup().await?;
211
212        let app_client = make_client(&service)?;
213
214        // Create a connection.
215        let (_data_socket_request, client_end_request, con) = make_con()?;
216        let request = app_client.connect(VMADDR_CID_HOST, 8000, con);
217        let (addr, server_data_socket_request, responder) =
218            unwrap_msg!(DeviceRequest::SendRequest{addr, data, responder} from driver.client);
219        responder.send(Ok(()))?;
220        driver.callbacks.response(&addr)?;
221        let _ = request.await?.map_err(zx::Status::from_raw)?;
222
223        // Start a listener
224        let (acceptor_remote, acceptor_client) = endpoints::create_endpoints::<AcceptorMarker>();
225        assert_eq!(app_client.listen(9000, acceptor_remote).await?, Ok(()));
226        let mut acceptor_client = acceptor_client.into_stream();
227
228        // Perform a transport reset
229        drop(server_data_socket_request);
230        driver.callbacks.transport_reset(7).await?;
231
232        // Connection should be closed
233        client_end_request.on_closed().await?;
234
235        // Listener should still be active and receive a connection
236        driver.callbacks.request(&*addr::Vsock::new(9000, 80, VMADDR_CID_HOST))?;
237        let (_data_socket, _client_end, _con) = make_con()?;
238
239        let (_addr, responder) =
240            unwrap_msg!(AcceptorRequest::Accept{addr, responder} from acceptor_client);
241        drop(responder); // don't bother responding, we're done
242
243        Ok(())
244    }
245}