1use crate::repo::Repository;
8use anyhow::{Context as _, Error, bail, format_err};
9use chrono::Utc;
10use fidl_fuchsia_pkg_ext::{
11 MirrorConfig, MirrorConfigBuilder, RepositoryConfig, RepositoryStorageType,
12};
13use fuchsia_async::net::TcpListener;
14use fuchsia_async::{self as fasync, Task};
15use fuchsia_url::RepositoryUrl;
16use futures::future::BoxFuture;
17use futures::prelude::*;
18use http::Uri;
19use http_sse::{Event, EventSender, SseResponseCreator};
20use hyper::server::Server;
21use hyper::server::accept::from_stream;
22use hyper::service::{make_service_fn, service_fn};
23use hyper::{Body, Method, Request, Response, StatusCode, header};
24use std::convert::{Infallible, TryInto as _};
25use std::io::{Cursor, Read as _, Seek as _};
26use std::net::{IpAddr, Ipv6Addr, SocketAddr};
27use std::path::{Path, PathBuf};
28use std::pin::Pin;
29use std::sync::Arc;
30use std::sync::atomic::{AtomicU64, Ordering};
31use std::time::Duration;
32
33pub mod responder;
34
35trait AsyncReadWrite: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send {}
36impl<T> AsyncReadWrite for T where T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send {}
37
38pub enum Domain {
40 TestFuchsiaCom,
42 WildcardFuchsiaUpdatesGoogleusercontentCom,
44}
45
46pub struct ServedRepositoryBuilder {
48 repo: Arc<Repository>,
49 response_overriders: Vec<Arc<dyn HttpResponder>>,
50 bind_addr: IpAddr,
51 bind_port: u16,
52 https_domain: Option<Domain>,
53}
54
55pub trait HttpResponder: 'static + Send + Sync {
58 fn respond<'a>(
60 &'a self,
61 request: &'a Request<Body>,
62 response: Response<Body>,
63 ) -> BoxFuture<'a, Response<Body>>;
64}
65
66impl ServedRepositoryBuilder {
67 pub(crate) fn new(repo: Arc<Repository>) -> Self {
68 ServedRepositoryBuilder {
69 repo,
70 response_overriders: vec![],
71 bind_addr: Ipv6Addr::UNSPECIFIED.into(),
72 bind_port: 0,
73 https_domain: None,
74 }
75 }
76
77 pub fn response_overrider(mut self, responder: impl HttpResponder) -> Self {
82 self.response_overriders.push(Arc::new(responder));
83 self
84 }
85
86 pub fn use_https_domain(mut self, domain: Domain) -> Self {
89 self.https_domain = Some(domain);
90 self
91 }
92
93 pub fn bind_to_addr(mut self, addr: impl Into<IpAddr>) -> Self {
96 self.bind_addr = addr.into();
97 self
98 }
99
100 pub fn bind_to_port(mut self, port: u16) -> Self {
103 self.bind_port = port;
104 self
105 }
106
107 pub fn start(self) -> Result<ServedRepository, Error> {
109 let (listener, addr) = {
110 let addr = SocketAddr::new(self.bind_addr, self.bind_port);
111 let listener = TcpListener::bind(&addr).context("bind")?;
112 let local_addr = listener.local_addr().context("local_addr")?;
113 (listener, local_addr)
114 };
115
116 let listener = listener
117 .accept_stream()
118 .map_err(Error::from)
119 .map_ok(|(conn, _addr)| fuchsia_hyper::TcpStream { stream: conn });
120
121 let connection_attempts = Arc::new(AtomicU64::new(0));
122 #[allow(clippy::type_complexity)]
123 let connections: Pin<
124 Box<dyn Stream<Item = Result<Pin<Box<dyn AsyncReadWrite>>, Error>> + Send>,
125 > = if let Some(ref https_domain) = self.https_domain {
126 let (certs, key) = match https_domain {
128 Domain::TestFuchsiaCom => (
129 parse_cert_chain(&include_bytes!("../certs/test.fuchsia.com.certchain")[..]),
130 parse_private_key(&include_bytes!("../certs/test.fuchsia.com.rsa")[..]),
131 ),
132 Domain::WildcardFuchsiaUpdatesGoogleusercontentCom => (
133 parse_cert_chain(
134 &include_bytes!(
135 "../certs/wildcard.fuchsia-updates.googleusercontent.com.certchain"
136 )[..],
137 ),
138 parse_private_key(
139 &include_bytes!(
140 "../certs/wildcard.fuchsia-updates.googleusercontent.com.rsa"
141 )[..],
142 ),
143 ),
144 };
145 let mut tls_config = rustls::ServerConfig::builder()
146 .with_safe_defaults()
147 .with_no_client_auth()
148 .with_single_cert(certs, key)
149 .unwrap();
150
151 tls_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
153
154 let tls_acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(tls_config));
155 let connection_attempts = Arc::clone(&connection_attempts);
156
157 listener
159 .and_then(move |conn| {
160 connection_attempts.fetch_add(1, Ordering::SeqCst);
161 tls_acceptor.accept(conn).map(|res| match res {
162 Ok(conn) => Ok(Pin::new(Box::new(conn)) as Pin<Box<dyn AsyncReadWrite>>),
163 Err(e) => Err(Error::from(e)),
164 })
165 })
166 .boxed()
167 } else {
168 let connection_attempts = Arc::clone(&connection_attempts);
169 listener
170 .map_ok(move |conn| {
171 connection_attempts.fetch_add(1, Ordering::SeqCst);
172 Pin::new(Box::new(conn)) as Pin<Box<dyn AsyncReadWrite>>
173 })
174 .boxed()
175 };
176
177 let root = self.repo.path();
178 let response_overriders = Arc::new(self.response_overriders);
179
180 let (auto_response_creator, auto_event_sender) =
181 SseResponseCreator::with_additional_buffer_size(10);
182 let auto_response_creator = Arc::new(auto_response_creator);
183
184 let make_svc = make_service_fn(move |_socket| {
185 let root = root.clone();
186 let response_overriders = Arc::clone(&response_overriders);
187 let auto_response_creator = Arc::clone(&auto_response_creator);
188
189 async move {
190 Ok::<_, Infallible>(service_fn(move |req| {
191 let method = req.method().to_owned();
192 let path = req.uri().path().to_owned();
193 let headers = req.headers().clone();
194 ServedRepository::handle_tuf_repo_request_infallible(
195 root.clone(),
196 Arc::clone(&response_overriders),
197 Arc::clone(&auto_response_creator),
198 req,
199 )
200 .inspect(move |x| {
201 println!(
202 "{} [http repo] {} {} {:?} => {}",
203 Utc::now().format("%T.%6f"),
204 method,
205 path,
206 headers,
207 x.status()
208 )
209 })
210 .map(Ok::<_, Infallible>)
211 }))
212 }
213 });
214
215 let (stop, rx_stop) = futures::channel::oneshot::channel();
216
217 let server = Server::builder(from_stream(connections))
218 .executor(fuchsia_hyper::Executor)
219 .serve(make_svc)
220 .with_graceful_shutdown(rx_stop.map(|res| res.unwrap_or(())))
221 .unwrap_or_else(|e| panic!("error serving repo over http: {e}"));
222
223 let server = Task::spawn(server);
224
225 Ok(ServedRepository {
226 repo: self.repo,
227 stop,
228 server,
229 addr,
230 https_domain: self.https_domain,
231 auto_event_sender,
232 connection_attempts,
233 })
234 }
235}
236
237fn parse_cert_chain(mut bytes: &[u8]) -> Vec<rustls::Certificate> {
238 rustls_pemfile::certs(&mut bytes)
239 .expect("certs to parse")
240 .into_iter()
241 .map(rustls::Certificate)
242 .collect()
243}
244
245fn parse_private_key(mut bytes: &[u8]) -> rustls::PrivateKey {
246 let keys = rustls_pemfile::read_all(&mut bytes).expect("private keys to parse");
247 assert_eq!(keys.len(), 1, "expecting a single private key");
248 match keys.into_iter().next().unwrap() {
249 rustls_pemfile::Item::RSAKey(key) => rustls::PrivateKey(key),
250 _ => panic!("expected an RSA private key"),
251 }
252}
253
254pub struct ServedRepository {
256 repo: Arc<Repository>,
257 stop: futures::channel::oneshot::Sender<()>,
258 server: Task<()>,
259 addr: SocketAddr,
260 auto_event_sender: EventSender,
261 connection_attempts: Arc<AtomicU64>,
262 https_domain: Option<Domain>,
263}
264
265impl ServedRepository {
266 fn scheme(&self) -> &'static str {
267 if self.https_domain.is_some() { "https" } else { "http" }
268 }
269 pub async fn get(&self, path: impl AsRef<str>) -> Result<Vec<u8>, Error> {
271 let url = format!("{}/{}", self.local_url(), path.as_ref());
272 get(url).await
273 }
274
275 pub fn local_url(&self) -> String {
277 format!("{}://localhost:{}", self.scheme(), self.addr.port())
278 }
279
280 pub async fn list_packages(&self) -> Result<Vec<crate::repo::PackageEntry>, Error> {
282 let targets_json = self.get("targets.json").await?;
283 let mut packages = crate::repo::iter_packages(Cursor::new(targets_json))?
284 .collect::<Result<Vec<_>, _>>()?;
285 packages.sort_unstable();
286 Ok(packages)
287 }
288
289 pub fn get_mirror_config_builder(&self) -> MirrorConfigBuilder {
291 MirrorConfigBuilder::new(self.local_url().parse::<Uri>().unwrap()).unwrap()
292 }
293
294 fn get_mirror_config(&self, subscribe: bool) -> MirrorConfig {
296 self.get_mirror_config_builder().subscribe(subscribe).build()
297 }
298
299 pub fn make_repo_config(&self, url: RepositoryUrl) -> RepositoryConfig {
302 self.repo.make_repo_config(url, Some(self.get_mirror_config(false)), false)
303 }
304
305 pub fn make_repo_config_with_subscribe(&self, url: RepositoryUrl) -> RepositoryConfig {
308 self.repo.make_repo_config(url, Some(self.get_mirror_config(true)), false)
309 }
310
311 pub fn make_repo_config_with_local_mirror(&self, url: RepositoryUrl) -> RepositoryConfig {
315 self.repo.make_repo_config(url, Some(self.get_mirror_config(false)), true)
316 }
317
318 pub fn make_repo_config_with_persistent_storage(&self, url: RepositoryUrl) -> RepositoryConfig {
320 self.repo
321 .make_repo_config_builder(url)
322 .add_mirror(self.get_mirror_config(false))
323 .use_local_mirror(false)
324 .repo_storage_type(RepositoryStorageType::Persistent)
325 .build()
326 }
327
328 pub async fn send_auto_event(&self, event: &Event) {
330 self.auto_event_sender.send(event).await
331 }
332
333 pub async fn wait_for_n_connected_auto_clients(&self, n: usize) {
336 loop {
337 let connected = self.auto_event_sender.client_count().await;
338 match connected.cmp(&n) {
339 std::cmp::Ordering::Equal => {
340 break;
341 }
342 std::cmp::Ordering::Greater => {
343 panic!("ServedRepository too many auto clients connected.");
344 }
345 _ => {}
346 }
347 fasync::Timer::new(Duration::from_millis(10)).await;
348 }
349 }
350
351 pub async fn drop_all_auto_clients(&self) {
353 self.auto_event_sender.drop_all_clients().await
354 }
355
356 pub fn stop(self) -> impl Future<Output = ()> {
358 self.stop.send(()).expect("remote end to still be open");
359 self.server
360 }
361
362 pub fn connection_attempts(&self) -> u64 {
364 self.connection_attempts.load(Ordering::SeqCst)
365 }
366
367 async fn handle_tuf_repo_request_infallible(
368 repo: PathBuf,
369 response_overriders: Arc<Vec<Arc<dyn HttpResponder>>>,
370 auto_response_creator: Arc<SseResponseCreator>,
371 req: Request<Body>,
372 ) -> Response<Body> {
373 let mut response = Self::handle_tuf_repo_request(repo, auto_response_creator, &req)
374 .await
375 .unwrap_or_else(|e| {
376 eprintln!("hyper tuf server error creating response for request {req:?}: {e:#}");
377 Response::builder()
378 .status(StatusCode::INTERNAL_SERVER_ERROR)
379 .body(Body::from("Error creating response".to_owned().into_bytes()))
380 .unwrap()
381 });
382
383 for responder in response_overriders.iter() {
384 response = responder.respond(&req, response).await
385 }
386
387 response
388 }
389
390 async fn handle_tuf_repo_request(
391 repo: PathBuf,
392 auto_response_creator: Arc<SseResponseCreator>,
393 req: &Request<Body>,
394 ) -> Result<Response<Body>, Error> {
395 let fail =
396 |status: StatusCode| Response::builder().status(status).body(Body::empty()).unwrap();
397 if *req.method() != Method::GET {
398 return Ok(fail(StatusCode::NOT_FOUND));
399 } else if req.uri().query().is_some() {
400 return Ok(fail(StatusCode::BAD_REQUEST));
401 }
402
403 let uri_path = Path::new(req.uri().path());
404
405 if uri_path.components().any(|component| component == std::path::Component::ParentDir) {
407 return Ok(fail(StatusCode::NOT_FOUND));
408 }
409
410 let response = if uri_path.to_str() == Some("/auto") {
411 auto_response_creator.create().await
412 } else {
413 let fs_path = repo.join(uri_path.strip_prefix("/").unwrap_or(uri_path));
414 let mut file = match std::fs::File::open(fs_path) {
416 Ok(file) => file,
417 Err(ref err) if err.kind() == std::io::ErrorKind::NotFound => {
418 return Ok(Response::builder()
419 .status(StatusCode::NOT_FOUND)
420 .body(Body::from("File did not exist".to_owned().into_bytes()))
421 .unwrap());
422 }
423 Err(e) => Err(e).context("opening file")?,
424 };
425
426 if let Some(range) = req.headers().get(http::header::RANGE) {
427 make_range_response(file, range).context("error making range response")?
428 } else {
429 let mut body = vec![];
430 file.read_to_end(&mut body).context("reading repo file")?;
431 Response::builder()
432 .status(StatusCode::OK)
433 .header(header::CONTENT_LENGTH, body.len())
434 .body(Body::from(body))
435 .unwrap()
436 }
437 };
438
439 Ok(response)
440 }
441}
442
443fn make_range_response(
446 mut file: std::fs::File,
447 range: &http::HeaderValue,
448) -> Result<Response<Body>, Error> {
449 let HttpRange { first_byte_pos, last_byte_pos } =
450 range.try_into().context("parse range header")?;
451 let file_size = file.metadata().context("file metadata")?.len();
452 file.seek(std::io::SeekFrom::Start(first_byte_pos)).context("seeking file")?;
454 let mut data = vec![0; 1 + last_byte_pos as usize - first_byte_pos as usize];
455 file.read_exact(&mut data).context("reading file for range request")?;
456
457 Ok(Response::builder()
458 .status(StatusCode::PARTIAL_CONTENT)
459 .header(header::CONTENT_LENGTH, data.len())
460 .header(
461 header::CONTENT_RANGE,
462 format!("bytes {first_byte_pos}-{last_byte_pos}/{file_size}"),
463 )
464 .body(Body::from(data))
465 .unwrap())
466}
467
468pub struct HttpRange {
470 first_byte_pos: u64,
471 last_byte_pos: u64,
472}
473
474impl HttpRange {
475 pub fn first_byte_pos(&self) -> u64 {
477 self.first_byte_pos
478 }
479
480 pub fn last_byte_pos(&self) -> u64 {
482 self.last_byte_pos
483 }
484}
485
486impl TryFrom<&http::HeaderValue> for HttpRange {
488 type Error = anyhow::Error;
489
490 fn try_from(range: &http::HeaderValue) -> Result<Self, Self::Error> {
491 let range = range.to_str().context("range header should be ascii")?;
492 let range = if let Some(range) = range.strip_prefix("bytes=") {
493 range
494 } else {
495 bail!("range header should start with 'bytes='");
496 };
497 let dash =
498 range.find('-').ok_or_else(|| anyhow::anyhow!("range header should have dash"))?;
499 let (first, last) = range.split_at(dash);
500 if last.len() < 2 {
501 bail!("range header last_byte_pos empty");
502 }
503 let first = first.parse().context("valid range first_byte_pos")?;
504 let last = last[1..].parse().context("valid range last_byte_pos")?;
505
506 if first > last {
507 bail!("first_byte_pos {} > last_byte_pos {}", first, last);
508 }
509
510 Ok(HttpRange { first_byte_pos: first, last_byte_pos: last })
511 }
512}
513
514async fn get(url: impl AsRef<str>) -> Result<Vec<u8>, Error> {
515 let request = Request::get(url.as_ref()).body(Body::empty()).map_err(Error::from)?;
516 let client = fuchsia_hyper::new_client();
517 let response = client.request(request).await?;
518
519 if response.status() != StatusCode::OK {
520 return Err(format_err!("unexpected status code: {:?}", response.status()));
521 }
522
523 let body = hyper::body::to_bytes(response).await?;
524
525 Ok(body.to_vec())
526}
527
528#[cfg(test)]
529mod tests {
530 use super::*;
531 use crate::package::PackageBuilder;
532 use crate::repo::RepositoryBuilder;
533 use assert_matches::assert_matches;
534
535 #[fuchsia_async::run_singlethreaded(test)]
536 #[ignore]
537 async fn test_serve_empty_hangs_on_last_get() {
538 let repo = Arc::new(RepositoryBuilder::new().build().await.unwrap());
539 let served_repo = repo.server().start().unwrap();
540
541 let packages = served_repo.list_packages().await.unwrap();
543 assert_eq!(packages, vec![]);
544
545 assert_matches!(served_repo.get("blobs/../root.json").await, Err(_));
547
548 let bytes = served_repo.get("targets.json").await.unwrap();
550 assert_ne!(bytes, Vec::<u8>::new());
551
552 let url = format!("{}/targets.json", served_repo.local_url());
554 let also_bytes = get(&url).await.unwrap();
555 assert_eq!(bytes, also_bytes);
556
557 served_repo.stop().await;
559
560 assert_matches!(get(url).await, Err(_));
562 }
563
564 #[fuchsia_async::run_singlethreaded(test)]
565 async fn test_serve_empty() {
566 let repo = Arc::new(RepositoryBuilder::new().build().await.unwrap());
567 let served_repo = repo.server().start().unwrap();
568
569 let packages = served_repo.list_packages().await.unwrap();
571 assert_eq!(packages, vec![]);
572
573 assert_matches!(served_repo.get("blobs/../root.json").await, Err(_));
575
576 let bytes = served_repo.get("targets.json").await.unwrap();
578 assert_ne!(bytes, Vec::<u8>::new());
579
580 let url = format!("{}/targets.json", served_repo.local_url());
582 let also_bytes = get(&url).await.unwrap();
583 assert_eq!(bytes, also_bytes);
584 }
585
586 #[fuchsia_async::run_singlethreaded(test)]
587 async fn test_serve_packages() {
588 let same_contents = "same contents";
589 let repo = RepositoryBuilder::new()
590 .add_package(
591 PackageBuilder::new("rolldice")
592 .add_resource_at("bin/rolldice", "#!/boot/bin/sh\necho 4\n".as_bytes())
593 .add_resource_at(
594 "meta/rolldice.cml",
595 r#"{"program":{"binary":"bin/rolldice"}}"#.as_bytes(),
596 )
597 .add_resource_at("data/duplicate_a", "same contents".as_bytes())
598 .build()
599 .await
600 .unwrap(),
601 )
602 .add_package(
603 PackageBuilder::new("fortune")
604 .add_resource_at(
605 "bin/fortune",
606 "#!/boot/bin/sh\necho ask again later\n".as_bytes(),
607 )
608 .add_resource_at(
609 "meta/fortune.cml",
610 r#"{"program":{"binary":"bin/fortune"}}"#.as_bytes(),
611 )
612 .add_resource_at("data/duplicate_b", same_contents.as_bytes())
613 .add_resource_at("data/duplicate_c", same_contents.as_bytes())
614 .build()
615 .await
616 .unwrap(),
617 )
618 .build()
619 .await
620 .unwrap();
621 let repo = Arc::new(repo);
622
623 let local_packages = repo.list_packages().unwrap();
624
625 let served_repository = repo.server().start().unwrap();
626 let served_packages = served_repository.list_packages().await.unwrap();
627 assert_eq!(local_packages, served_packages);
628 }
629}