fuchsia_hyper_test_support/
lib.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![deny(missing_docs)]
6
7//! This module provides a test HTTP(S) server that can be instantiated simply by a unit test, for
8//! connecting components to where you need to vary the response(s) from the HTTP(S) server during
9//! the operation of the test.
10//!
11//! It handles the TCP setup, letting the user specify `Handler` implementations which return the
12//! responses from the server.  `Handler` implementations are meant to be composable to provide
13//! for fault injection and varying behavior in tests.
14
15// This is gratuitously borrowed from src/sys/pkg/lib/fuchsia-pkg-testing/src/serve.rs, and then
16// made generic across all requests by removing the repo-serving aspects of it.
17
18use anyhow::Error;
19use chrono::Utc;
20use fuchsia_async::{self as fasync, Task};
21use futures::future::BoxFuture;
22use futures::prelude::*;
23use hyper::server::accept::from_stream;
24use hyper::server::Server;
25use hyper::service::{make_service_fn, service_fn};
26use hyper::{Body, Request, Response, StatusCode};
27#[cfg(not(target_os = "fuchsia"))]
28use netext::TokioAsyncReadExt;
29use std::convert::Infallible;
30use std::net::{Ipv6Addr, SocketAddr};
31use std::pin::Pin;
32use std::sync::Arc;
33
34// Some provided Handler implementations.
35pub mod handler;
36
37// Some provided Handler implementations for injecting faults into the server's behavior.
38pub mod fault_injection;
39
40/// A "test" HTTP(S) server which is composed of `Handler` implementations, and holding the
41/// connection state.
42pub struct TestServer {
43    stop: futures::channel::oneshot::Sender<()>,
44    addr: SocketAddr,
45    use_https: bool,
46    task: Task<()>,
47}
48
49/// Base trait that all Handlers implement.
50pub trait Handler: 'static + Send + Sync {
51    /// A Handler impl signals that it wishes to handle a request by returning a response for it,
52    /// otherwise it returns None.
53    fn handles(&self, request: &Request<Body>) -> Option<BoxFuture<'_, Response<Body>>>;
54}
55
56impl Handler for Arc<dyn Handler> {
57    fn handles(&self, request: &Request<Body>) -> Option<BoxFuture<'_, Response<Body>>> {
58        (**self).handles(request)
59    }
60}
61
62impl TestServer {
63    /// return the scheme of the TestServer
64    fn scheme(&self) -> &'static str {
65        if self.use_https {
66            "https"
67        } else {
68            "http"
69        }
70    }
71
72    /// Returns the URL that can be used to connect to this repository from this device.
73    pub fn local_url(&self) -> String {
74        format!("{}://localhost:{}", self.scheme(), self.addr.port())
75    }
76
77    /// Returns the URL for the given path that can be used to connect to this repository from this
78    /// device.
79    pub fn local_url_for_path(&self, path: &str) -> String {
80        let path = path.trim_start_matches('/');
81        format!("{}://localhost:{}/{}", self.scheme(), self.addr.port(), path)
82    }
83
84    /// Gracefully signal the server to stop and returns a future that resolves when it terminates.
85    pub fn stop(self) -> impl Future<Output = ()> {
86        self.stop.send(()).expect("remote end to still be open");
87        self.task
88    }
89
90    /// Internal helper which iterates over all Handlers until it finds one that will respond to the
91    /// request.  It then returns that response.  If not response is found, it returns 404 NOT_FOUND.
92    async fn handle_request(
93        handlers: Arc<Vec<Arc<dyn Handler>>>,
94        req: Request<Body>,
95    ) -> Response<Body> {
96        let response = handlers.iter().find_map(|h| h.handles(&req));
97
98        match response {
99            Some(response) => response.await,
100            None => Response::builder().status(StatusCode::NOT_FOUND).body(Body::empty()).unwrap(),
101        }
102    }
103
104    /// Create a Builder
105    pub fn builder() -> TestServerBuilder {
106        TestServerBuilder::new()
107    }
108}
109
110/// A builder to construct a `TestServer`.
111#[derive(Default)]
112pub struct TestServerBuilder {
113    handlers: Vec<Arc<dyn Handler>>,
114    https_certs: Option<(Vec<rustls::Certificate>, rustls::PrivateKey)>,
115}
116
117impl TestServerBuilder {
118    /// Create a new TestServerBuilder
119    pub fn new() -> Self {
120        Self::default()
121    }
122
123    /// Serve over TLS, using a server certificate rooted the provided certs
124    pub fn use_https(mut self, cert_chain: &[u8], private_key: &[u8]) -> Self {
125        let cert_chain = parse_cert_chain(cert_chain);
126        let private_key = parse_private_key(private_key);
127        self.https_certs = Some((cert_chain, private_key));
128        self
129    }
130
131    /// Add a Handler which implements the server's behavior.  These are given the ability to
132    /// handle a request in the order in which they are added to the `TestServerBuilder`.
133    pub fn handler(mut self, handler: impl Handler + 'static) -> Self {
134        self.handlers.push(Arc::new(handler));
135        self
136    }
137
138    /// Spawn the server on the current executor, returning a handle to manage the server.
139    pub async fn start(self) -> TestServer {
140        let (mut listener, addr) = {
141            let addr = SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), 0);
142            let listener = bind_listener(&addr).await;
143            let local_addr = listener.local_addr().unwrap();
144            (listener, local_addr)
145        };
146
147        let (stop, rx_stop) = futures::channel::oneshot::channel();
148
149        let (tls_acceptor, use_https) = if let Some((cert_chain, private_key)) = self.https_certs {
150            // build a server configuration using a test CA and cert chain
151            let tls_config = rustls::ServerConfig::builder()
152                .with_safe_defaults()
153                .with_no_client_auth()
154                .with_single_cert(cert_chain, private_key)
155                .unwrap();
156            let tls_acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(tls_config));
157
158            (Some(tls_acceptor), true)
159        } else {
160            (None, false)
161        };
162
163        let task = fasync::Task::spawn(async move {
164            let listener = accept_stream(&mut listener);
165            #[cfg(target_os = "fuchsia")]
166            let listener = listener
167                .map_err(Error::from)
168                .map_ok(|conn| fuchsia_hyper::TcpStream { stream: conn });
169            #[cfg(not(target_os = "fuchsia"))]
170            let listener = listener.map_err(Error::from).map_ok(|conn| fuchsia_hyper::TcpStream {
171                stream: conn.into_multithreaded_futures_stream(),
172            });
173
174            let connections = if let Some(tls_acceptor) = tls_acceptor {
175                // wrap incoming tcp streams
176                listener
177                    .and_then(move |conn| {
178                        tls_acceptor.accept(conn).map(|res| match res {
179                            Ok(conn) => {
180                                Ok(Pin::new(Box::new(conn)) as Pin<Box<dyn AsyncReadWrite>>)
181                            }
182                            Err(e) => Err(Error::from(e)),
183                        })
184                    })
185                    .boxed() // connections
186            } else {
187                listener
188                    .map_ok(|conn| Pin::new(Box::new(conn)) as Pin<Box<dyn AsyncReadWrite>>)
189                    .boxed() // connections
190            };
191
192            // This is the root Arc<Vec<Arc<dyn Handler>>>.
193            let handlers = Arc::new(self.handlers);
194
195            let make_svc = make_service_fn(move |_socket| {
196                // Each connection to the server receives a separate service_fn instance, and so
197                // needs it's own copy of the handlers, this is a factory of sorts.
198                let handlers = Arc::clone(&handlers);
199
200                async move {
201                    Ok::<_, Infallible>(service_fn(move |req| {
202                        // Each request made by a connection is serviced by the service_fn created from
203                        // this scope, which is why there is another cloning of the Arc of Handlers.
204                        let method = req.method().to_owned();
205                        let path = req.uri().path().to_owned();
206                        TestServer::handle_request(Arc::clone(&handlers), req)
207                            .inspect(move |x| {
208                                println!(
209                                    "{} [test http] {} {} => {}",
210                                    Utc::now().format("%T.%6f"),
211                                    method,
212                                    path,
213                                    x.status()
214                                )
215                            })
216                            .map(Ok::<_, Infallible>)
217                    }))
218                }
219            });
220
221            Server::builder(from_stream(connections))
222                .executor(fuchsia_hyper::Executor)
223                .serve(make_svc)
224                .with_graceful_shutdown(
225                    rx_stop.map(|res| res.unwrap_or_else(|futures::channel::oneshot::Canceled| ())),
226                )
227                .unwrap_or_else(|e| panic!("error serving repo over http: {}", e))
228                .await;
229        });
230
231        TestServer { stop, addr, use_https, task }
232    }
233}
234
235#[cfg(target_os = "fuchsia")]
236async fn bind_listener(addr: &SocketAddr) -> fuchsia_async::net::TcpListener {
237    fuchsia_async::net::TcpListener::bind(addr).unwrap()
238}
239
240#[cfg(not(target_os = "fuchsia"))]
241async fn bind_listener(&addr: &SocketAddr) -> tokio::net::TcpListener {
242    tokio::net::TcpListener::bind(addr).await.unwrap()
243}
244
245#[cfg(target_os = "fuchsia")]
246fn accept_stream<'a>(
247    listener: &'a mut fuchsia_async::net::TcpListener,
248) -> impl Stream<Item = std::io::Result<fuchsia_async::net::TcpStream>> + 'a {
249    use std::task::{Context, Poll};
250
251    #[pin_project::pin_project]
252    struct AcceptStream<'a> {
253        #[pin]
254        listener: &'a mut fuchsia_async::net::TcpListener,
255    }
256
257    impl<'a> Stream for AcceptStream<'a> {
258        type Item = std::io::Result<fuchsia_async::net::TcpStream>;
259
260        fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
261            let mut this = self.project();
262            match this.listener.async_accept(cx) {
263                Poll::Ready(Ok((conn, _addr))) => Poll::Ready(Some(Ok(conn))),
264                Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
265                Poll::Pending => Poll::Pending,
266            }
267        }
268    }
269
270    AcceptStream { listener }
271}
272
273#[cfg(not(target_os = "fuchsia"))]
274fn accept_stream<'a>(
275    listener: &'a mut tokio::net::TcpListener,
276) -> impl Stream<Item = std::io::Result<tokio::net::TcpStream>> + 'a {
277    netext::TcpListenerRefStream(listener)
278}
279
280fn parse_cert_chain(mut bytes: &[u8]) -> Vec<rustls::Certificate> {
281    rustls_pemfile::certs(&mut bytes)
282        .expect("certs to parse")
283        .into_iter()
284        .map(|cert| rustls::Certificate(cert))
285        .collect()
286}
287
288fn parse_private_key(mut bytes: &[u8]) -> rustls::PrivateKey {
289    let keys = rustls_pemfile::read_all(&mut bytes).expect("private keys to parse");
290    assert_eq!(keys.len(), 1, "expecting a single private key");
291    match keys.into_iter().next().unwrap() {
292        rustls_pemfile::Item::RSAKey(key) => return rustls::PrivateKey(key),
293        _ => panic!("expected an RSA private key"),
294    }
295}
296
297trait AsyncReadWrite: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send {}
298impl<T> AsyncReadWrite for T where T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send {}
299
300// These are a set of useful functions when writing tests.
301
302/// Create a GET request for a given url, which can be used with any hyper client.
303pub fn make_get(url: impl AsRef<str>) -> Result<Request<Body>, Error> {
304    Request::get(url.as_ref()).body(Body::empty()).map_err(Error::from)
305}
306
307/// Perform an HTTP GET for the given url, returning the result.
308pub async fn get(url: impl AsRef<str>) -> Result<Response<Body>, Error> {
309    let request = make_get(url)?;
310    let client = fuchsia_hyper::new_client();
311    let response = client.request(request).await?;
312    Ok(response)
313}
314
315/// Collect a Response into a single Vec of bytes.
316pub async fn body_as_bytes(response: Response<Body>) -> Result<Vec<u8>, Error> {
317    let bytes = response
318        .into_body()
319        .try_fold(Vec::new(), |mut vec, b| async move {
320            vec.extend(b);
321            Ok(vec)
322        })
323        .await?;
324    Ok(bytes)
325}
326
327/// Collect a Response's Body and convert the body to a tring.
328pub async fn body_as_string(response: Response<Body>) -> Result<String, Error> {
329    let bytes = body_as_bytes(response).await?;
330    let string = String::from_utf8(bytes)?;
331    Ok(string)
332}
333
334/// Get a url and return the body of the response as a string.
335pub async fn get_body_as_string(url: impl AsRef<str>) -> Result<String, Error> {
336    let response = get(url).await?;
337    body_as_string(response).await
338}
339
340#[cfg(test)]
341mod tests {
342    use super::*;
343    use crate::fault_injection::*;
344    use crate::handler::*;
345    use anyhow::anyhow;
346    use fasync::TimeoutExt;
347
348    #[fuchsia_async::run_singlethreaded(test)]
349    async fn test_start_stop() {
350        let server = TestServer::builder().start().await;
351        server.stop().await;
352    }
353
354    #[fuchsia_async::run_singlethreaded(test)]
355    async fn test_empty_server_404s() {
356        let server = TestServer::builder().start().await;
357        let result = get(server.local_url()).await;
358        assert_eq!(result.unwrap().status(), StatusCode::NOT_FOUND);
359    }
360
361    #[fuchsia_async::run_singlethreaded(test)]
362    async fn test_shared_handler() {
363        let shared: Arc<dyn Handler> = Arc::new(StaticResponse::ok_body("shared"));
364
365        let server = TestServer::builder()
366            .handler(ForPath::new("/a", Arc::clone(&shared)))
367            .handler(shared)
368            .start()
369            .await;
370
371        assert_eq!(get_body_as_string(server.local_url_for_path("/a")).await.unwrap(), "shared");
372        assert_eq!(get_body_as_string(server.local_url_for_path("/foo")).await.unwrap(), "shared");
373    }
374
375    #[fuchsia_async::run_singlethreaded(test)]
376    async fn test_simple_responder() {
377        let server =
378            TestServer::builder().handler(StaticResponse::ok_body("some data")).start().await;
379        assert_eq!(
380            get_body_as_string(server.local_url_for_path("ignored")).await.unwrap(),
381            "some data"
382        );
383    }
384
385    #[fuchsia_async::run_singlethreaded(test)]
386    async fn test_simple_path() {
387        let server = TestServer::builder()
388            .handler(ForPath::new("/some/path", StaticResponse::ok_body("some data")))
389            .start()
390            .await;
391        assert_eq!(
392            get_body_as_string(server.local_url_for_path("/some/path")).await.unwrap(),
393            "some data"
394        );
395    }
396
397    #[fuchsia_async::run_singlethreaded(test)]
398    async fn test_simple_path_doesnt_respond_to_wrong_path() {
399        let server = TestServer::builder()
400            .handler(ForPath::new("/some/path", StaticResponse::ok_body("some data")))
401            .start()
402            .await;
403        // make sure a non-matching path fails
404        let result = get(server.local_url_for_path("/other/path")).await;
405        assert_eq!(result.unwrap().status(), StatusCode::NOT_FOUND);
406    }
407
408    #[fuchsia_async::run_singlethreaded(test)]
409    async fn test_hang() {
410        let server = TestServer::builder().handler(Hang).start().await;
411        let result = get(server.local_url_for_path("ignored"))
412            .on_timeout(std::time::Duration::from_secs(1), || Err(anyhow!("timed out")))
413            .await;
414        assert_eq!(result.unwrap_err().to_string(), Error::msg("timed out").to_string());
415    }
416
417    #[fuchsia_async::run_singlethreaded(test)]
418    async fn test_hang_body() {
419        let server = TestServer::builder().handler(HangBody::content_length(500)).start().await;
420        let result = get_body_as_string(server.local_url_for_path("ignored"))
421            .on_timeout(std::time::Duration::from_secs(1), || Err(anyhow!("timed out")))
422            .await;
423        assert_eq!(result.unwrap_err().to_string(), Error::msg("timed out").to_string());
424    }
425}