fuchsia_pkg_testing/
serve.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Test tools for serving TUF repositories.
6
7use crate::repo::Repository;
8use anyhow::{Context as _, Error, bail, format_err};
9use chrono::Utc;
10use fidl_fuchsia_pkg_ext::{
11    MirrorConfig, MirrorConfigBuilder, RepositoryConfig, RepositoryStorageType,
12};
13use fuchsia_async::net::TcpListener;
14use fuchsia_async::{self as fasync, Task};
15use fuchsia_url::RepositoryUrl;
16use futures::future::BoxFuture;
17use futures::prelude::*;
18use http::Uri;
19use http_sse::{Event, EventSender, SseResponseCreator};
20use hyper::server::Server;
21use hyper::server::accept::from_stream;
22use hyper::service::{make_service_fn, service_fn};
23use hyper::{Body, Method, Request, Response, StatusCode, header};
24use std::convert::{Infallible, TryInto as _};
25use std::io::{Cursor, Read as _, Seek as _};
26use std::net::{IpAddr, Ipv6Addr, SocketAddr};
27use std::path::{Path, PathBuf};
28use std::pin::Pin;
29use std::sync::Arc;
30use std::sync::atomic::{AtomicU64, Ordering};
31use std::time::Duration;
32
33pub mod responder;
34
35trait AsyncReadWrite: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send {}
36impl<T> AsyncReadWrite for T where T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send {}
37
38/// Domains with known keys and certificates for testing.
39pub enum Domain {
40    /// `test.fuchsia.com` and `localhost`.
41    TestFuchsiaCom,
42    /// `*.fuchsia-updates.googleusercontent.com`.
43    WildcardFuchsiaUpdatesGoogleusercontentCom,
44}
45
46/// A builder to construct a test repository server.
47pub struct ServedRepositoryBuilder {
48    repo: Arc<Repository>,
49    response_overriders: Vec<Arc<dyn HttpResponder>>,
50    bind_addr: IpAddr,
51    bind_port: u16,
52    https_domain: Option<Domain>,
53}
54
55/// Override how a `ServedRepository` responds to requests.
56/// Useful for injecting failures.
57pub trait HttpResponder: 'static + Send + Sync {
58    /// `response` is what the server would have responded with.
59    fn respond<'a>(
60        &'a self,
61        request: &'a Request<Body>,
62        response: Response<Body>,
63    ) -> BoxFuture<'a, Response<Body>>;
64}
65
66impl ServedRepositoryBuilder {
67    pub(crate) fn new(repo: Arc<Repository>) -> Self {
68        ServedRepositoryBuilder {
69            repo,
70            response_overriders: vec![],
71            bind_addr: Ipv6Addr::UNSPECIFIED.into(),
72            bind_port: 0,
73            https_domain: None,
74        }
75    }
76
77    /// Override how the `ServedRepositoryBuilder` responds to requests.
78    ///
79    /// Requests are passed through responders in the order in which the responders are added to
80    /// this builder.
81    pub fn response_overrider(mut self, responder: impl HttpResponder) -> Self {
82        self.response_overriders.push(Arc::new(responder));
83        self
84    }
85
86    /// Serve the repository over https via a domain known by test certificates
87    /// and keys.
88    pub fn use_https_domain(mut self, domain: Domain) -> Self {
89        self.https_domain = Some(domain);
90        self
91    }
92
93    /// Bind the tcp listener to the provided ip address. Binds to Ipv6Addr::UNSPECIFIED by
94    /// default.
95    pub fn bind_to_addr(mut self, addr: impl Into<IpAddr>) -> Self {
96        self.bind_addr = addr.into();
97        self
98    }
99
100    /// Bind the tcp listener to the provided port. Binds to 0 (allowing system to select a port) by
101    /// default.
102    pub fn bind_to_port(mut self, port: u16) -> Self {
103        self.bind_port = port;
104        self
105    }
106
107    /// Spawn the server on the current executor, returning a handle to manage the server.
108    pub fn start(self) -> Result<ServedRepository, Error> {
109        let (listener, addr) = {
110            let addr = SocketAddr::new(self.bind_addr, self.bind_port);
111            let listener = TcpListener::bind(&addr).context("bind")?;
112            let local_addr = listener.local_addr().context("local_addr")?;
113            (listener, local_addr)
114        };
115
116        let listener = listener
117            .accept_stream()
118            .map_err(Error::from)
119            .map_ok(|(conn, _addr)| fuchsia_hyper::TcpStream { stream: conn });
120
121        let connection_attempts = Arc::new(AtomicU64::new(0));
122        #[allow(clippy::type_complexity)]
123        let connections: Pin<
124            Box<dyn Stream<Item = Result<Pin<Box<dyn AsyncReadWrite>>, Error>> + Send>,
125        > = if let Some(ref https_domain) = self.https_domain {
126            // build a server configuration using a test CA and cert chain
127            let (certs, key) = match https_domain {
128                Domain::TestFuchsiaCom => (
129                    parse_cert_chain(&include_bytes!("../certs/test.fuchsia.com.certchain")[..]),
130                    parse_private_key(&include_bytes!("../certs/test.fuchsia.com.rsa")[..]),
131                ),
132                Domain::WildcardFuchsiaUpdatesGoogleusercontentCom => (
133                    parse_cert_chain(
134                        &include_bytes!(
135                            "../certs/wildcard.fuchsia-updates.googleusercontent.com.certchain"
136                        )[..],
137                    ),
138                    parse_private_key(
139                        &include_bytes!(
140                            "../certs/wildcard.fuchsia-updates.googleusercontent.com.rsa"
141                        )[..],
142                    ),
143                ),
144            };
145            let mut tls_config = rustls::ServerConfig::builder()
146                .with_safe_defaults()
147                .with_no_client_auth()
148                .with_single_cert(certs, key)
149                .unwrap();
150
151            // Configure ALPN and prefer H2 over HTTP/1.1.
152            tls_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
153
154            let tls_acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(tls_config));
155            let connection_attempts = Arc::clone(&connection_attempts);
156
157            // wrap incoming tcp streams
158            listener
159                .and_then(move |conn| {
160                    connection_attempts.fetch_add(1, Ordering::SeqCst);
161                    tls_acceptor.accept(conn).map(|res| match res {
162                        Ok(conn) => Ok(Pin::new(Box::new(conn)) as Pin<Box<dyn AsyncReadWrite>>),
163                        Err(e) => Err(Error::from(e)),
164                    })
165                })
166                .boxed()
167        } else {
168            let connection_attempts = Arc::clone(&connection_attempts);
169            listener
170                .map_ok(move |conn| {
171                    connection_attempts.fetch_add(1, Ordering::SeqCst);
172                    Pin::new(Box::new(conn)) as Pin<Box<dyn AsyncReadWrite>>
173                })
174                .boxed()
175        };
176
177        let root = self.repo.path();
178        let response_overriders = Arc::new(self.response_overriders);
179
180        let (auto_response_creator, auto_event_sender) =
181            SseResponseCreator::with_additional_buffer_size(10);
182        let auto_response_creator = Arc::new(auto_response_creator);
183
184        let make_svc = make_service_fn(move |_socket| {
185            let root = root.clone();
186            let response_overriders = Arc::clone(&response_overriders);
187            let auto_response_creator = Arc::clone(&auto_response_creator);
188
189            async move {
190                Ok::<_, Infallible>(service_fn(move |req| {
191                    let method = req.method().to_owned();
192                    let path = req.uri().path().to_owned();
193                    let headers = req.headers().clone();
194                    ServedRepository::handle_tuf_repo_request_infallible(
195                        root.clone(),
196                        Arc::clone(&response_overriders),
197                        Arc::clone(&auto_response_creator),
198                        req,
199                    )
200                    .inspect(move |x| {
201                        println!(
202                            "{} [http repo] {} {} {:?} => {}",
203                            Utc::now().format("%T.%6f"),
204                            method,
205                            path,
206                            headers,
207                            x.status()
208                        )
209                    })
210                    .map(Ok::<_, Infallible>)
211                }))
212            }
213        });
214
215        let (stop, rx_stop) = futures::channel::oneshot::channel();
216
217        let server = Server::builder(from_stream(connections))
218            .executor(fuchsia_hyper::Executor)
219            .serve(make_svc)
220            .with_graceful_shutdown(rx_stop.map(|res| res.unwrap_or(())))
221            .unwrap_or_else(|e| panic!("error serving repo over http: {e}"));
222
223        let server = Task::spawn(server);
224
225        Ok(ServedRepository {
226            repo: self.repo,
227            stop,
228            server,
229            addr,
230            https_domain: self.https_domain,
231            auto_event_sender,
232            connection_attempts,
233        })
234    }
235}
236
237fn parse_cert_chain(mut bytes: &[u8]) -> Vec<rustls::Certificate> {
238    rustls_pemfile::certs(&mut bytes)
239        .expect("certs to parse")
240        .into_iter()
241        .map(rustls::Certificate)
242        .collect()
243}
244
245fn parse_private_key(mut bytes: &[u8]) -> rustls::PrivateKey {
246    let keys = rustls_pemfile::read_all(&mut bytes).expect("private keys to parse");
247    assert_eq!(keys.len(), 1, "expecting a single private key");
248    match keys.into_iter().next().unwrap() {
249        rustls_pemfile::Item::RSAKey(key) => rustls::PrivateKey(key),
250        _ => panic!("expected an RSA private key"),
251    }
252}
253
254/// A [`Repository`] being served over HTTP.
255pub struct ServedRepository {
256    repo: Arc<Repository>,
257    stop: futures::channel::oneshot::Sender<()>,
258    server: Task<()>,
259    addr: SocketAddr,
260    auto_event_sender: EventSender,
261    connection_attempts: Arc<AtomicU64>,
262    https_domain: Option<Domain>,
263}
264
265impl ServedRepository {
266    fn scheme(&self) -> &'static str {
267        if self.https_domain.is_some() { "https" } else { "http" }
268    }
269    /// Request the given path served by the repository over HTTP.
270    pub async fn get(&self, path: impl AsRef<str>) -> Result<Vec<u8>, Error> {
271        let url = format!("{}/{}", self.local_url(), path.as_ref());
272        get(url).await
273    }
274
275    /// Returns the URL that can be used to connect to this repository from this device.
276    pub fn local_url(&self) -> String {
277        format!("{}://localhost:{}", self.scheme(), self.addr.port())
278    }
279
280    /// Returns a sorted vector of all packages contained in this repository.
281    pub async fn list_packages(&self) -> Result<Vec<crate::repo::PackageEntry>, Error> {
282        let targets_json = self.get("targets.json").await?;
283        let mut packages = crate::repo::iter_packages(Cursor::new(targets_json))?
284            .collect::<Result<Vec<_>, _>>()?;
285        packages.sort_unstable();
286        Ok(packages)
287    }
288
289    /// Generates a [`MirrorConfigBuilder`] that points to this served repository.
290    pub fn get_mirror_config_builder(&self) -> MirrorConfigBuilder {
291        MirrorConfigBuilder::new(self.local_url().parse::<Uri>().unwrap()).unwrap()
292    }
293
294    /// Generates a [`MirrorConfig`] that points to this served repository.
295    fn get_mirror_config(&self, subscribe: bool) -> MirrorConfig {
296        self.get_mirror_config_builder().subscribe(subscribe).build()
297    }
298
299    /// Generate a [`RepositoryConfig`] suitable for configuring a package resolver to use this
300    /// served repository.
301    pub fn make_repo_config(&self, url: RepositoryUrl) -> RepositoryConfig {
302        self.repo.make_repo_config(url, Some(self.get_mirror_config(false)), false)
303    }
304
305    /// Generate a [`RepositoryConfig`] suitable for configuring a package resolver to use this
306    /// served repository. Set subscribe on the mirror configs to true.
307    pub fn make_repo_config_with_subscribe(&self, url: RepositoryUrl) -> RepositoryConfig {
308        self.repo.make_repo_config(url, Some(self.get_mirror_config(true)), false)
309    }
310
311    /// Generate a [`RepositoryConfig`] suitable for configuring a package resolver to use this
312    /// served repository with local mirroring enabled.
313    // TODO(https://fxbug.dev/42137938) delete this method once pkg-resolver can fetch metadata from a LocalMirror.
314    pub fn make_repo_config_with_local_mirror(&self, url: RepositoryUrl) -> RepositoryConfig {
315        self.repo.make_repo_config(url, Some(self.get_mirror_config(false)), true)
316    }
317
318    /// Generate a [`RepositoryConfig`] that permits persisting metadata.
319    pub fn make_repo_config_with_persistent_storage(&self, url: RepositoryUrl) -> RepositoryConfig {
320        self.repo
321            .make_repo_config_builder(url)
322            .add_mirror(self.get_mirror_config(false))
323            .use_local_mirror(false)
324            .repo_storage_type(RepositoryStorageType::Persistent)
325            .build()
326    }
327
328    /// Send an SSE event to all clients subscribed to /auto.
329    pub async fn send_auto_event(&self, event: &Event) {
330        self.auto_event_sender.send(event).await
331    }
332
333    /// Waits until `send_auto_event` would attempt to send an `Event` to exactly
334    /// `n` clients. Panics if extra clients are connected.
335    pub async fn wait_for_n_connected_auto_clients(&self, n: usize) {
336        loop {
337            let connected = self.auto_event_sender.client_count().await;
338            match connected.cmp(&n) {
339                std::cmp::Ordering::Equal => {
340                    break;
341                }
342                std::cmp::Ordering::Greater => {
343                    panic!("ServedRepository too many auto clients connected.");
344                }
345                _ => {}
346            }
347            fasync::Timer::new(Duration::from_millis(10)).await;
348        }
349    }
350
351    /// Errors all currently existing /auto `Response<Body>` streams.
352    pub async fn drop_all_auto_clients(&self) {
353        self.auto_event_sender.drop_all_clients().await
354    }
355
356    /// Gracefully signal the server to stop and returns a future that resolves when it terminates.
357    pub fn stop(self) -> impl Future<Output = ()> {
358        self.stop.send(()).expect("remote end to still be open");
359        self.server
360    }
361
362    /// Number of connection attempts.
363    pub fn connection_attempts(&self) -> u64 {
364        self.connection_attempts.load(Ordering::SeqCst)
365    }
366
367    async fn handle_tuf_repo_request_infallible(
368        repo: PathBuf,
369        response_overriders: Arc<Vec<Arc<dyn HttpResponder>>>,
370        auto_response_creator: Arc<SseResponseCreator>,
371        req: Request<Body>,
372    ) -> Response<Body> {
373        let mut response = Self::handle_tuf_repo_request(repo, auto_response_creator, &req)
374            .await
375            .unwrap_or_else(|e| {
376                eprintln!("hyper tuf server error creating response for request {req:?}: {e:#}");
377                Response::builder()
378                    .status(StatusCode::INTERNAL_SERVER_ERROR)
379                    .body(Body::from("Error creating response".to_owned().into_bytes()))
380                    .unwrap()
381            });
382
383        for responder in response_overriders.iter() {
384            response = responder.respond(&req, response).await
385        }
386
387        response
388    }
389
390    async fn handle_tuf_repo_request(
391        repo: PathBuf,
392        auto_response_creator: Arc<SseResponseCreator>,
393        req: &Request<Body>,
394    ) -> Result<Response<Body>, Error> {
395        let fail =
396            |status: StatusCode| Response::builder().status(status).body(Body::empty()).unwrap();
397        if *req.method() != Method::GET {
398            return Ok(fail(StatusCode::NOT_FOUND));
399        } else if req.uri().query().is_some() {
400            return Ok(fail(StatusCode::BAD_REQUEST));
401        }
402
403        let uri_path = Path::new(req.uri().path());
404
405        // don't let queries escape the repo root.
406        if uri_path.components().any(|component| component == std::path::Component::ParentDir) {
407            return Ok(fail(StatusCode::NOT_FOUND));
408        }
409
410        let response = if uri_path.to_str() == Some("/auto") {
411            auto_response_creator.create().await
412        } else {
413            let fs_path = repo.join(uri_path.strip_prefix("/").unwrap_or(uri_path));
414            // TODO(https://fxbug.dev/42150717) synchronous IO in an async context.
415            let mut file = match std::fs::File::open(fs_path) {
416                Ok(file) => file,
417                Err(ref err) if err.kind() == std::io::ErrorKind::NotFound => {
418                    return Ok(Response::builder()
419                        .status(StatusCode::NOT_FOUND)
420                        .body(Body::from("File did not exist".to_owned().into_bytes()))
421                        .unwrap());
422                }
423                Err(e) => Err(e).context("opening file")?,
424            };
425
426            if let Some(range) = req.headers().get(http::header::RANGE) {
427                make_range_response(file, range).context("error making range response")?
428            } else {
429                let mut body = vec![];
430                file.read_to_end(&mut body).context("reading repo file")?;
431                Response::builder()
432                    .status(StatusCode::OK)
433                    .header(header::CONTENT_LENGTH, body.len())
434                    .body(Body::from(body))
435                    .unwrap()
436            }
437        };
438
439        Ok(response)
440    }
441}
442
443// TODO(https://fxbug.dev/42150593) use specific HTTP status codes for errors instead of mapping everything to
444// INTERNAL_SERVER_ERROR.
445fn make_range_response(
446    mut file: std::fs::File,
447    range: &http::HeaderValue,
448) -> Result<Response<Body>, Error> {
449    let HttpRange { first_byte_pos, last_byte_pos } =
450        range.try_into().context("parse range header")?;
451    let file_size = file.metadata().context("file metadata")?.len();
452    // TODO(https://fxbug.dev/42150593) return 416 if the range is invalid
453    file.seek(std::io::SeekFrom::Start(first_byte_pos)).context("seeking file")?;
454    let mut data = vec![0; 1 + last_byte_pos as usize - first_byte_pos as usize];
455    file.read_exact(&mut data).context("reading file for range request")?;
456
457    Ok(Response::builder()
458        .status(StatusCode::PARTIAL_CONTENT)
459        .header(header::CONTENT_LENGTH, data.len())
460        .header(
461            header::CONTENT_RANGE,
462            format!("bytes {first_byte_pos}-{last_byte_pos}/{file_size}"),
463        )
464        .body(Body::from(data))
465        .unwrap())
466}
467
468/// Parsed value of an HTTP request "Range" headers
469pub struct HttpRange {
470    first_byte_pos: u64,
471    last_byte_pos: u64,
472}
473
474impl HttpRange {
475    /// The first byte pos of the Range header.
476    pub fn first_byte_pos(&self) -> u64 {
477        self.first_byte_pos
478    }
479
480    /// The last byte pos of the Range header.
481    pub fn last_byte_pos(&self) -> u64 {
482        self.last_byte_pos
483    }
484}
485
486// TODO(https://fxbug.dev/42150593) use a spec compliant parser
487impl TryFrom<&http::HeaderValue> for HttpRange {
488    type Error = anyhow::Error;
489
490    fn try_from(range: &http::HeaderValue) -> Result<Self, Self::Error> {
491        let range = range.to_str().context("range header should be ascii")?;
492        let range = if let Some(range) = range.strip_prefix("bytes=") {
493            range
494        } else {
495            bail!("range header should start with 'bytes='");
496        };
497        let dash =
498            range.find('-').ok_or_else(|| anyhow::anyhow!("range header should have dash"))?;
499        let (first, last) = range.split_at(dash);
500        if last.len() < 2 {
501            bail!("range header last_byte_pos empty");
502        }
503        let first = first.parse().context("valid range first_byte_pos")?;
504        let last = last[1..].parse().context("valid range last_byte_pos")?;
505
506        if first > last {
507            bail!("first_byte_pos {} > last_byte_pos {}", first, last);
508        }
509
510        Ok(HttpRange { first_byte_pos: first, last_byte_pos: last })
511    }
512}
513
514async fn get(url: impl AsRef<str>) -> Result<Vec<u8>, Error> {
515    let request = Request::get(url.as_ref()).body(Body::empty()).map_err(Error::from)?;
516    let client = fuchsia_hyper::new_client();
517    let response = client.request(request).await?;
518
519    if response.status() != StatusCode::OK {
520        return Err(format_err!("unexpected status code: {:?}", response.status()));
521    }
522
523    let body = hyper::body::to_bytes(response).await?;
524
525    Ok(body.to_vec())
526}
527
528#[cfg(test)]
529mod tests {
530    use super::*;
531    use crate::package::PackageBuilder;
532    use crate::repo::RepositoryBuilder;
533    use assert_matches::assert_matches;
534
535    #[fuchsia_async::run_singlethreaded(test)]
536    #[ignore]
537    async fn test_serve_empty_hangs_on_last_get() {
538        let repo = Arc::new(RepositoryBuilder::new().build().await.unwrap());
539        let served_repo = repo.server().start().unwrap();
540
541        // contains no packages.
542        let packages = served_repo.list_packages().await.unwrap();
543        assert_eq!(packages, vec![]);
544
545        // no '..' allowed.
546        assert_matches!(served_repo.get("blobs/../root.json").await, Err(_));
547
548        // getting a known file fetches something.
549        let bytes = served_repo.get("targets.json").await.unwrap();
550        assert_ne!(bytes, Vec::<u8>::new());
551
552        // even if it doesn't go through the helper function.
553        let url = format!("{}/targets.json", served_repo.local_url());
554        let also_bytes = get(&url).await.unwrap();
555        assert_eq!(bytes, also_bytes);
556
557        // requests fail after stopping the server
558        served_repo.stop().await;
559
560        // FIXME(49247): this often flakes by hanging
561        assert_matches!(get(url).await, Err(_));
562    }
563
564    #[fuchsia_async::run_singlethreaded(test)]
565    async fn test_serve_empty() {
566        let repo = Arc::new(RepositoryBuilder::new().build().await.unwrap());
567        let served_repo = repo.server().start().unwrap();
568
569        // contains no packages.
570        let packages = served_repo.list_packages().await.unwrap();
571        assert_eq!(packages, vec![]);
572
573        // no '..' allowed.
574        assert_matches!(served_repo.get("blobs/../root.json").await, Err(_));
575
576        // getting a known file fetches something.
577        let bytes = served_repo.get("targets.json").await.unwrap();
578        assert_ne!(bytes, Vec::<u8>::new());
579
580        // even if it doesn't go through the helper function.
581        let url = format!("{}/targets.json", served_repo.local_url());
582        let also_bytes = get(&url).await.unwrap();
583        assert_eq!(bytes, also_bytes);
584    }
585
586    #[fuchsia_async::run_singlethreaded(test)]
587    async fn test_serve_packages() {
588        let same_contents = "same contents";
589        let repo = RepositoryBuilder::new()
590            .add_package(
591                PackageBuilder::new("rolldice")
592                    .add_resource_at("bin/rolldice", "#!/boot/bin/sh\necho 4\n".as_bytes())
593                    .add_resource_at(
594                        "meta/rolldice.cml",
595                        r#"{"program":{"binary":"bin/rolldice"}}"#.as_bytes(),
596                    )
597                    .add_resource_at("data/duplicate_a", "same contents".as_bytes())
598                    .build()
599                    .await
600                    .unwrap(),
601            )
602            .add_package(
603                PackageBuilder::new("fortune")
604                    .add_resource_at(
605                        "bin/fortune",
606                        "#!/boot/bin/sh\necho ask again later\n".as_bytes(),
607                    )
608                    .add_resource_at(
609                        "meta/fortune.cml",
610                        r#"{"program":{"binary":"bin/fortune"}}"#.as_bytes(),
611                    )
612                    .add_resource_at("data/duplicate_b", same_contents.as_bytes())
613                    .add_resource_at("data/duplicate_c", same_contents.as_bytes())
614                    .build()
615                    .await
616                    .unwrap(),
617            )
618            .build()
619            .await
620            .unwrap();
621        let repo = Arc::new(repo);
622
623        let local_packages = repo.list_packages().unwrap();
624
625        let served_repository = repo.server().start().unwrap();
626        let served_packages = served_repository.list_packages().await.unwrap();
627        assert_eq!(local_packages, served_packages);
628    }
629}