fuchsia_hyper_test_support/
lib.rs1#![deny(missing_docs)]
6
7use anyhow::Error;
19use chrono::Utc;
20use fuchsia_async::{self as fasync, Task};
21use futures::future::BoxFuture;
22use futures::prelude::*;
23use hyper::server::accept::from_stream;
24use hyper::server::Server;
25use hyper::service::{make_service_fn, service_fn};
26use hyper::{Body, Request, Response, StatusCode};
27#[cfg(not(target_os = "fuchsia"))]
28use netext::TokioAsyncReadExt;
29use std::convert::Infallible;
30use std::net::{Ipv6Addr, SocketAddr};
31use std::pin::Pin;
32use std::sync::Arc;
33
34pub mod handler;
36
37pub mod fault_injection;
39
40pub struct TestServer {
43 stop: futures::channel::oneshot::Sender<()>,
44 addr: SocketAddr,
45 use_https: bool,
46 task: Task<()>,
47}
48
49pub trait Handler: 'static + Send + Sync {
51 fn handles(&self, request: &Request<Body>) -> Option<BoxFuture<'_, Response<Body>>>;
54}
55
56impl Handler for Arc<dyn Handler> {
57 fn handles(&self, request: &Request<Body>) -> Option<BoxFuture<'_, Response<Body>>> {
58 (**self).handles(request)
59 }
60}
61
62impl TestServer {
63 fn scheme(&self) -> &'static str {
65 if self.use_https {
66 "https"
67 } else {
68 "http"
69 }
70 }
71
72 pub fn local_url(&self) -> String {
74 format!("{}://localhost:{}", self.scheme(), self.addr.port())
75 }
76
77 pub fn local_url_for_path(&self, path: &str) -> String {
80 let path = path.trim_start_matches('/');
81 format!("{}://localhost:{}/{}", self.scheme(), self.addr.port(), path)
82 }
83
84 pub fn stop(self) -> impl Future<Output = ()> {
86 self.stop.send(()).expect("remote end to still be open");
87 self.task
88 }
89
90 async fn handle_request(
93 handlers: Arc<Vec<Arc<dyn Handler>>>,
94 req: Request<Body>,
95 ) -> Response<Body> {
96 let response = handlers.iter().find_map(|h| h.handles(&req));
97
98 match response {
99 Some(response) => response.await,
100 None => Response::builder().status(StatusCode::NOT_FOUND).body(Body::empty()).unwrap(),
101 }
102 }
103
104 pub fn builder() -> TestServerBuilder {
106 TestServerBuilder::new()
107 }
108}
109
110#[derive(Default)]
112pub struct TestServerBuilder {
113 handlers: Vec<Arc<dyn Handler>>,
114 https_certs: Option<(Vec<rustls::Certificate>, rustls::PrivateKey)>,
115}
116
117impl TestServerBuilder {
118 pub fn new() -> Self {
120 Self::default()
121 }
122
123 pub fn use_https(mut self, cert_chain: &[u8], private_key: &[u8]) -> Self {
125 let cert_chain = parse_cert_chain(cert_chain);
126 let private_key = parse_private_key(private_key);
127 self.https_certs = Some((cert_chain, private_key));
128 self
129 }
130
131 pub fn handler(mut self, handler: impl Handler + 'static) -> Self {
134 self.handlers.push(Arc::new(handler));
135 self
136 }
137
138 pub async fn start(self) -> TestServer {
140 let (mut listener, addr) = {
141 let addr = SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), 0);
142 let listener = bind_listener(&addr).await;
143 let local_addr = listener.local_addr().unwrap();
144 (listener, local_addr)
145 };
146
147 let (stop, rx_stop) = futures::channel::oneshot::channel();
148
149 let (tls_acceptor, use_https) = if let Some((cert_chain, private_key)) = self.https_certs {
150 let tls_config = rustls::ServerConfig::builder()
152 .with_safe_defaults()
153 .with_no_client_auth()
154 .with_single_cert(cert_chain, private_key)
155 .unwrap();
156 let tls_acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(tls_config));
157
158 (Some(tls_acceptor), true)
159 } else {
160 (None, false)
161 };
162
163 let task = fasync::Task::spawn(async move {
164 let listener = accept_stream(&mut listener);
165 #[cfg(target_os = "fuchsia")]
166 let listener = listener
167 .map_err(Error::from)
168 .map_ok(|conn| fuchsia_hyper::TcpStream { stream: conn });
169 #[cfg(not(target_os = "fuchsia"))]
170 let listener = listener.map_err(Error::from).map_ok(|conn| fuchsia_hyper::TcpStream {
171 stream: conn.into_multithreaded_futures_stream(),
172 });
173
174 let connections = if let Some(tls_acceptor) = tls_acceptor {
175 listener
177 .and_then(move |conn| {
178 tls_acceptor.accept(conn).map(|res| match res {
179 Ok(conn) => {
180 Ok(Pin::new(Box::new(conn)) as Pin<Box<dyn AsyncReadWrite>>)
181 }
182 Err(e) => Err(Error::from(e)),
183 })
184 })
185 .boxed() } else {
187 listener
188 .map_ok(|conn| Pin::new(Box::new(conn)) as Pin<Box<dyn AsyncReadWrite>>)
189 .boxed() };
191
192 let handlers = Arc::new(self.handlers);
194
195 let make_svc = make_service_fn(move |_socket| {
196 let handlers = Arc::clone(&handlers);
199
200 async move {
201 Ok::<_, Infallible>(service_fn(move |req| {
202 let method = req.method().to_owned();
205 let path = req.uri().path().to_owned();
206 TestServer::handle_request(Arc::clone(&handlers), req)
207 .inspect(move |x| {
208 println!(
209 "{} [test http] {} {} => {}",
210 Utc::now().format("%T.%6f"),
211 method,
212 path,
213 x.status()
214 )
215 })
216 .map(Ok::<_, Infallible>)
217 }))
218 }
219 });
220
221 Server::builder(from_stream(connections))
222 .executor(fuchsia_hyper::Executor)
223 .serve(make_svc)
224 .with_graceful_shutdown(
225 rx_stop.map(|res| res.unwrap_or_else(|futures::channel::oneshot::Canceled| ())),
226 )
227 .unwrap_or_else(|e| panic!("error serving repo over http: {}", e))
228 .await;
229 });
230
231 TestServer { stop, addr, use_https, task }
232 }
233}
234
235#[cfg(target_os = "fuchsia")]
236async fn bind_listener(addr: &SocketAddr) -> fuchsia_async::net::TcpListener {
237 fuchsia_async::net::TcpListener::bind(addr).unwrap()
238}
239
240#[cfg(not(target_os = "fuchsia"))]
241async fn bind_listener(&addr: &SocketAddr) -> tokio::net::TcpListener {
242 tokio::net::TcpListener::bind(addr).await.unwrap()
243}
244
245#[cfg(target_os = "fuchsia")]
246fn accept_stream<'a>(
247 listener: &'a mut fuchsia_async::net::TcpListener,
248) -> impl Stream<Item = std::io::Result<fuchsia_async::net::TcpStream>> + 'a {
249 use std::task::{Context, Poll};
250
251 #[pin_project::pin_project]
252 struct AcceptStream<'a> {
253 #[pin]
254 listener: &'a mut fuchsia_async::net::TcpListener,
255 }
256
257 impl<'a> Stream for AcceptStream<'a> {
258 type Item = std::io::Result<fuchsia_async::net::TcpStream>;
259
260 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
261 let mut this = self.project();
262 match this.listener.async_accept(cx) {
263 Poll::Ready(Ok((conn, _addr))) => Poll::Ready(Some(Ok(conn))),
264 Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
265 Poll::Pending => Poll::Pending,
266 }
267 }
268 }
269
270 AcceptStream { listener }
271}
272
273#[cfg(not(target_os = "fuchsia"))]
274fn accept_stream<'a>(
275 listener: &'a mut tokio::net::TcpListener,
276) -> impl Stream<Item = std::io::Result<tokio::net::TcpStream>> + 'a {
277 netext::TcpListenerRefStream(listener)
278}
279
280fn parse_cert_chain(mut bytes: &[u8]) -> Vec<rustls::Certificate> {
281 rustls_pemfile::certs(&mut bytes)
282 .expect("certs to parse")
283 .into_iter()
284 .map(|cert| rustls::Certificate(cert))
285 .collect()
286}
287
288fn parse_private_key(mut bytes: &[u8]) -> rustls::PrivateKey {
289 let keys = rustls_pemfile::read_all(&mut bytes).expect("private keys to parse");
290 assert_eq!(keys.len(), 1, "expecting a single private key");
291 match keys.into_iter().next().unwrap() {
292 rustls_pemfile::Item::RSAKey(key) => return rustls::PrivateKey(key),
293 _ => panic!("expected an RSA private key"),
294 }
295}
296
297trait AsyncReadWrite: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send {}
298impl<T> AsyncReadWrite for T where T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send {}
299
300pub fn make_get(url: impl AsRef<str>) -> Result<Request<Body>, Error> {
304 Request::get(url.as_ref()).body(Body::empty()).map_err(Error::from)
305}
306
307pub async fn get(url: impl AsRef<str>) -> Result<Response<Body>, Error> {
309 let request = make_get(url)?;
310 let client = fuchsia_hyper::new_client();
311 let response = client.request(request).await?;
312 Ok(response)
313}
314
315pub async fn body_as_bytes(response: Response<Body>) -> Result<Vec<u8>, Error> {
317 let bytes = response
318 .into_body()
319 .try_fold(Vec::new(), |mut vec, b| async move {
320 vec.extend(b);
321 Ok(vec)
322 })
323 .await?;
324 Ok(bytes)
325}
326
327pub async fn body_as_string(response: Response<Body>) -> Result<String, Error> {
329 let bytes = body_as_bytes(response).await?;
330 let string = String::from_utf8(bytes)?;
331 Ok(string)
332}
333
334pub async fn get_body_as_string(url: impl AsRef<str>) -> Result<String, Error> {
336 let response = get(url).await?;
337 body_as_string(response).await
338}
339
340#[cfg(test)]
341mod tests {
342 use super::*;
343 use crate::fault_injection::*;
344 use crate::handler::*;
345 use anyhow::anyhow;
346 use fasync::TimeoutExt;
347
348 #[fuchsia_async::run_singlethreaded(test)]
349 async fn test_start_stop() {
350 let server = TestServer::builder().start().await;
351 server.stop().await;
352 }
353
354 #[fuchsia_async::run_singlethreaded(test)]
355 async fn test_empty_server_404s() {
356 let server = TestServer::builder().start().await;
357 let result = get(server.local_url()).await;
358 assert_eq!(result.unwrap().status(), StatusCode::NOT_FOUND);
359 }
360
361 #[fuchsia_async::run_singlethreaded(test)]
362 async fn test_shared_handler() {
363 let shared: Arc<dyn Handler> = Arc::new(StaticResponse::ok_body("shared"));
364
365 let server = TestServer::builder()
366 .handler(ForPath::new("/a", Arc::clone(&shared)))
367 .handler(shared)
368 .start()
369 .await;
370
371 assert_eq!(get_body_as_string(server.local_url_for_path("/a")).await.unwrap(), "shared");
372 assert_eq!(get_body_as_string(server.local_url_for_path("/foo")).await.unwrap(), "shared");
373 }
374
375 #[fuchsia_async::run_singlethreaded(test)]
376 async fn test_simple_responder() {
377 let server =
378 TestServer::builder().handler(StaticResponse::ok_body("some data")).start().await;
379 assert_eq!(
380 get_body_as_string(server.local_url_for_path("ignored")).await.unwrap(),
381 "some data"
382 );
383 }
384
385 #[fuchsia_async::run_singlethreaded(test)]
386 async fn test_simple_path() {
387 let server = TestServer::builder()
388 .handler(ForPath::new("/some/path", StaticResponse::ok_body("some data")))
389 .start()
390 .await;
391 assert_eq!(
392 get_body_as_string(server.local_url_for_path("/some/path")).await.unwrap(),
393 "some data"
394 );
395 }
396
397 #[fuchsia_async::run_singlethreaded(test)]
398 async fn test_simple_path_doesnt_respond_to_wrong_path() {
399 let server = TestServer::builder()
400 .handler(ForPath::new("/some/path", StaticResponse::ok_body("some data")))
401 .start()
402 .await;
403 let result = get(server.local_url_for_path("/other/path")).await;
405 assert_eq!(result.unwrap().status(), StatusCode::NOT_FOUND);
406 }
407
408 #[fuchsia_async::run_singlethreaded(test)]
409 async fn test_hang() {
410 let server = TestServer::builder().handler(Hang).start().await;
411 let result = get(server.local_url_for_path("ignored"))
412 .on_timeout(std::time::Duration::from_secs(1), || Err(anyhow!("timed out")))
413 .await;
414 assert_eq!(result.unwrap_err().to_string(), Error::msg("timed out").to_string());
415 }
416
417 #[fuchsia_async::run_singlethreaded(test)]
418 async fn test_hang_body() {
419 let server = TestServer::builder().handler(HangBody::content_length(500)).start().await;
420 let result = get_body_as_string(server.local_url_for_path("ignored"))
421 .on_timeout(std::time::Duration::from_secs(1), || Err(anyhow!("timed out")))
422 .await;
423 assert_eq!(result.unwrap_err().to_string(), Error::msg("timed out").to_string());
424 }
425}