1use anyhow::Context as _;
6use fidl::endpoints::ServerEnd;
7use fidl::prelude::*;
8use fuchsia_async::{self as fasync, TimeoutExt as _};
9use fuchsia_component::server::{Item, ServiceFs, ServiceFsDir};
10use fuchsia_runtime::{HandleInfo, HandleType};
11use futures::StreamExt;
12use futures::future::Either;
13use futures::prelude::*;
14use http_client_config::Config;
15use log::{debug, error, info, trace};
16use std::str::FromStr as _;
17use zx::{self as zx, AsHandleRef};
18use {
19 fidl_fuchsia_io as fio, fidl_fuchsia_net_http as net_http, fidl_fuchsia_pkg_http as fpkg_http,
20 fidl_fuchsia_process_lifecycle as flifecycle, fuchsia_hyper as fhyper,
21 fuchsia_inspect as finspect,
22};
23
24mod pkg;
25mod resuming_get;
26
27static MAX_REDIRECTS: u8 = 10;
28static DEFAULT_DEADLINE_DURATION: zx::MonotonicDuration = zx::MonotonicDuration::from_seconds(15);
29
30fn to_status_line(version: hyper::Version, status: hyper::StatusCode) -> Vec<u8> {
31 match status.canonical_reason() {
32 None => format!("{:?} {}", version, status.as_str()),
33 Some(canonical_reason) => format!("{:?} {} {}", version, status.as_str(), canonical_reason),
34 }
35 .as_bytes()
36 .to_vec()
37}
38
39fn tcp_options() -> fhyper::TcpOptions {
40 let mut options: fhyper::TcpOptions = std::default::Default::default();
41
42 options.keepalive_idle = Some(std::time::Duration::from_secs(60));
45 options.keepalive_interval = Some(std::time::Duration::from_secs(15));
46 options.keepalive_count = Some(8);
48
49 options
50}
51
52struct RedirectInfo {
53 url: Option<hyper::Uri>,
54 referrer: Option<hyper::Uri>,
55 method: hyper::Method,
56}
57
58fn redirect_info(
59 old_uri: &hyper::Uri,
60 method: &hyper::Method,
61 hyper_response: &hyper::Response<hyper::Body>,
62) -> Option<RedirectInfo> {
63 if hyper_response.status().is_redirection() {
64 Some(RedirectInfo {
65 url: hyper_response
66 .headers()
67 .get(hyper::header::LOCATION)
68 .and_then(|loc| calculate_redirect(old_uri, loc)),
69 referrer: hyper_response
70 .headers()
71 .get(hyper::header::REFERER)
72 .and_then(|loc| calculate_redirect(old_uri, loc)),
73 method: if hyper_response.status() == hyper::StatusCode::SEE_OTHER {
74 hyper::Method::GET
75 } else {
76 method.clone()
77 },
78 })
79 } else {
80 None
81 }
82}
83
84async fn to_success_response(
85 current_url: &hyper::Uri,
86 current_method: &hyper::Method,
87 mut hyper_response: hyper::Response<hyper::Body>,
88 scope: vfs::execution_scope::ExecutionScope,
89) -> net_http::Response {
90 let redirect_info = redirect_info(current_url, current_method, &hyper_response);
91 let headers = hyper_response
92 .headers()
93 .iter()
94 .map(|(name, value)| net_http::Header {
95 name: name.as_str().as_bytes().to_vec(),
96 value: value.as_bytes().to_vec(),
97 })
98 .collect();
99
100 let (tx, rx) = zx::Socket::create_stream();
101 let response = net_http::Response {
102 error: None,
103 body: Some(rx),
104 final_url: Some(current_url.to_string()),
105 status_code: Some(hyper_response.status().as_u16() as u32),
106 status_line: Some(to_status_line(hyper_response.version(), hyper_response.status())),
107 headers: Some(headers),
108 redirect: redirect_info.map(|info| net_http::RedirectTarget {
109 method: Some(info.method.to_string()),
110 url: info.url.map(|u| u.to_string()),
111 referrer: info.referrer.map(|r| r.to_string()),
112 ..Default::default()
113 }),
114 ..Default::default()
115 };
116
117 let _ = scope.spawn(async move {
118 let hyper_body = hyper_response.body_mut();
119 while let Some(chunk) = hyper_body.next().await {
120 if let Ok(chunk) = chunk {
121 let mut offset: usize = 0;
122 while offset < chunk.len() {
123 let pending = match tx.wait_handle(
124 zx::Signals::SOCKET_PEER_CLOSED | zx::Signals::SOCKET_WRITABLE,
125 zx::MonotonicInstant::INFINITE,
126 ).to_result() {
127 Err(status) => {
128 error!("tx.wait() failed - status: {}", status);
129 return;
130 }
131 Ok(pending) => pending,
132 };
133 if pending.contains(zx::Signals::SOCKET_PEER_CLOSED) {
134 info!("tx.wait() saw signal SOCKET_PEER_CLOSED");
135 return;
136 }
137 assert!(pending.contains(zx::Signals::SOCKET_WRITABLE));
138 let written = match tx.write(&chunk[offset..]) {
139 Err(status) => {
140 if status == zx::Status::SHOULD_WAIT {
143 error!("Saw SHOULD_WAIT despite waiting first - expected now? - continuing");
144 continue;
145 }
146 info!("tx.write() failed - status: {}", status);
147 return;
148 }
149 Ok(written) => written,
150 };
151 offset += written;
152 }
153 }
154 }
155 });
156
157 response
158}
159
160fn to_fidl_error(error: &hyper::Error) -> net_http::Error {
161 #[allow(clippy::if_same_then_else)] if error.is_parse() {
163 net_http::Error::UnableToParse
164 } else if error.is_user() {
165 net_http::Error::Internal
167 } else if error.is_canceled() {
168 net_http::Error::Internal
170 } else if error.is_closed() {
171 net_http::Error::ChannelClosed
172 } else if error.is_connect() {
173 net_http::Error::Connect
174 } else if error.is_incomplete_message() {
175 net_http::Error::Internal
177 } else if error.is_body_write_aborted() {
178 net_http::Error::Internal
180 } else {
181 net_http::Error::Internal
182 }
183}
184
185fn to_error_response(error: net_http::Error) -> net_http::Response {
186 net_http::Response {
187 error: Some(error),
188 body: None,
189 final_url: None,
190 status_code: None,
191 status_line: None,
192 headers: None,
193 redirect: None,
194 ..Default::default()
195 }
196}
197
198struct Loader {
199 method: hyper::Method,
200 url: hyper::Uri,
201 headers: hyper::HeaderMap,
202 body: Vec<u8>,
203 deadline: fasync::MonotonicInstant,
204 scope: vfs::execution_scope::ExecutionScope,
205}
206
207impl Loader {
208 async fn new(
209 req: net_http::Request,
210 scope: vfs::execution_scope::ExecutionScope,
211 ) -> Result<Self, anyhow::Error> {
212 let net_http::Request { method, url, headers, body, deadline, .. } = req;
213 let method = method.as_ref().map(|method| hyper::Method::from_str(method)).transpose()?;
214 let method = method.unwrap_or(hyper::Method::GET);
215 if let Some(url) = url {
216 let url = hyper::Uri::try_from(url)?;
217 let headers = headers
218 .unwrap_or_else(|| vec![])
219 .into_iter()
220 .map(|net_http::Header { name, value }| {
221 let name = hyper::header::HeaderName::from_bytes(&name)?;
222 let value = hyper::header::HeaderValue::from_bytes(&value)?;
223 Ok((name, value))
224 })
225 .collect::<Result<hyper::HeaderMap, anyhow::Error>>()?;
226
227 let body = match body {
228 Some(net_http::Body::Buffer(buffer)) => {
229 let mut bytes = vec![0; buffer.size as usize];
230 buffer.vmo.read(&mut bytes, 0)?;
231 bytes
232 }
233 Some(net_http::Body::Stream(socket)) => {
234 let mut stream = fasync::Socket::from_socket(socket)
235 .into_datagram_stream()
236 .map(|r| r.context("reading from datagram stream"));
237 let mut bytes = Vec::new();
238 while let Some(chunk) = stream.next().await {
239 bytes.extend(chunk?);
240 }
241 bytes
242 }
243 None => Vec::new(),
244 };
245
246 let deadline = deadline
247 .map(|deadline| fasync::MonotonicInstant::from_nanos(deadline))
248 .unwrap_or_else(|| fasync::MonotonicInstant::after(DEFAULT_DEADLINE_DURATION));
249
250 trace!("Starting request {} {}", method, url);
251
252 Ok(Loader { method, url, headers, body, deadline, scope })
253 } else {
254 Err(anyhow::Error::msg("Request missing URL"))
255 }
256 }
257
258 fn build_request(&self) -> hyper::Request<hyper::Body> {
259 let Self { method, url, headers, body, deadline: _, scope: _ } = self;
260 let mut request = hyper::Request::new(body.clone().into());
261 *request.method_mut() = method.clone();
262 *request.uri_mut() = url.clone();
263 *request.headers_mut() = headers.clone();
264 request
265 }
266
267 async fn start(mut self, loader_client: net_http::LoaderClientProxy) -> Result<(), zx::Status> {
268 let client = fhyper::new_https_client_from_tcp_options(tcp_options());
269 loop {
270 break match client.request(self.build_request()).await {
271 Ok(hyper_response) => {
272 let redirect = redirect_info(&self.url, &self.method, &hyper_response);
273 if let Some(redirect) = redirect {
274 if let Some(url) = redirect.url {
275 self.url = url;
276 self.method = redirect.method;
277 trace!(
278 "Reporting redirect to OnResponse: {} {}",
279 self.method, self.url
280 );
281 let response = to_success_response(
282 &self.url,
283 &self.method,
284 hyper_response,
285 self.scope.clone(),
286 )
287 .await;
288 match loader_client.on_response(response).await {
289 Ok(()) => {}
290 Err(e) => {
291 debug!("Not redirecting because: {}", e);
292 break Ok(());
293 }
294 };
295 trace!("Redirect allowed to {} {}", self.method, self.url);
296 continue;
297 }
298 }
299 let response = to_success_response(
300 &self.url,
301 &self.method,
302 hyper_response,
303 self.scope.clone(),
304 )
305 .await;
306 let _: Result<_, _> = loader_client.on_response(response).await;
309 Ok(())
310 }
311 Err(error) => {
312 info!("Received network level error from hyper: {}", error);
313 let _: Result<_, _> =
316 loader_client.on_response(to_error_response(to_fidl_error(&error))).await;
317 Ok(())
318 }
319 };
320 }
321 }
322
323 async fn fetch(
324 mut self,
325 ) -> Result<(hyper::Response<hyper::Body>, hyper::Uri, hyper::Method), net_http::Error> {
326 let deadline = self.deadline;
327 if deadline < fasync::MonotonicInstant::now() {
328 return Err(net_http::Error::DeadlineExceeded);
329 }
330 let client = fhyper::new_https_client_from_tcp_options(tcp_options());
331
332 async move {
333 let mut redirects = 0;
334 loop {
335 break match client.request(self.build_request()).await {
336 Ok(hyper_response) => {
337 if redirects != MAX_REDIRECTS {
338 let redirect = redirect_info(&self.url, &self.method, &hyper_response);
339 if let Some(redirect) = redirect {
340 if let Some(url) = redirect.url {
341 self.url = url;
342 self.method = redirect.method;
343 trace!("Redirecting to {} {}", self.method, self.url);
344 redirects += 1;
345 continue;
346 }
347 }
348 }
349 Ok((hyper_response, self.url, self.method))
350 }
351 Err(e) => {
352 info!("Received network level error from hyper: {}", e);
353 Err(to_fidl_error(&e))
354 }
355 };
356 }
357 }
358 .on_timeout(deadline, || Err(net_http::Error::DeadlineExceeded))
359 .await
360 }
361}
362
363fn calculate_redirect(
364 old_url: &hyper::Uri,
365 location: &hyper::header::HeaderValue,
366) -> Option<hyper::Uri> {
367 let old_parts = old_url.clone().into_parts();
368 let mut new_parts = hyper::Uri::try_from(location.as_bytes()).ok()?.into_parts();
369 if new_parts.scheme.is_none() {
370 new_parts.scheme = old_parts.scheme;
371 }
372 if new_parts.authority.is_none() {
373 new_parts.authority = old_parts.authority;
374 }
375 Some(hyper::Uri::from_parts(new_parts).ok()?)
376}
377
378async fn loader_server(
379 stream: net_http::LoaderRequestStream,
380 idle_timeout: fasync::MonotonicDuration,
381) -> Result<(), anyhow::Error> {
382 let background_tasks = vfs::execution_scope::ExecutionScope::new();
383 let (stream, unbind_if_stalled) = detect_stall::until_stalled(stream, idle_timeout);
384
385 stream
386 .err_into::<anyhow::Error>()
387 .try_for_each_concurrent(None, |message| {
388 let scope = background_tasks.clone();
389 async move {
390 match message {
391 net_http::LoaderRequest::Fetch { request, responder } => {
392 debug!(
393 "Fetch request received (url: {}): {:?}",
394 request
395 .url
396 .as_ref()
397 .and_then(|url| Some(url.as_str()))
398 .unwrap_or_default(),
399 request
400 );
401 let result = Loader::new(request, scope.clone()).await?.fetch().await;
402 responder.send(match result {
403 Ok((hyper_response, final_url, final_method)) => {
404 to_success_response(
405 &final_url,
406 &final_method,
407 hyper_response,
408 scope.clone(),
409 )
410 .await
411 }
412 Err(error) => to_error_response(error),
413 })?;
414 }
415 net_http::LoaderRequest::Start { request, client, control_handle } => {
416 debug!(
417 "Start request received (url: {}): {:?}",
418 request
419 .url
420 .as_ref()
421 .and_then(|url| Some(url.as_str()))
422 .unwrap_or_default(),
423 request
424 );
425 Loader::new(request, scope).await?.start(client.into_proxy()).await?;
426 control_handle.shutdown();
427 }
428 }
429 Ok(())
430 }
431 })
432 .await?;
433
434 background_tasks.wait().await;
435
436 if let Ok(Some(server_end)) = unbind_if_stalled.await {
439 fuchsia_component::client::connect_channel_to_protocol_at::<net_http::LoaderMarker>(
440 server_end.into(),
441 "/escrow",
442 )?;
443 }
444
445 Ok(())
446}
447
448enum HttpServices {
449 Loader(net_http::LoaderRequestStream),
450 PkgClient(fpkg_http::ClientRequestStream),
451}
452
453#[fuchsia::main]
454pub async fn main() -> Result<(), anyhow::Error> {
455 log::info!("http-client starting");
456 fuchsia_trace_provider::trace_provider_create_with_fdio();
457 let inspector = finspect::Inspector::default();
458 let pkg_http_node = inspector.root().create_child("pkg-http");
459 let pkg_http_connections_node = pkg_http_node.create_child("connections");
460 let pkg_http_connection_count = std::sync::atomic::AtomicU64::new(0);
461 let _inspect_server_task =
462 inspect_runtime::publish(&inspector, inspect_runtime::PublishOptions::default());
463
464 let lifecycle =
467 fuchsia_runtime::take_startup_handle(HandleInfo::new(HandleType::Lifecycle, 0)).unwrap();
468 let lifecycle: zx::Channel = lifecycle.into();
469 let lifecycle: ServerEnd<flifecycle::LifecycleMarker> = lifecycle.into();
470 let (mut lifecycle_request_stream, lifecycle_control_handle) =
471 lifecycle.into_stream_and_control_handle();
472
473 let config = Config::take_from_startup_handle();
474 let idle_timeout = if config.stop_on_idle_timeout_millis >= 0 {
475 fasync::MonotonicDuration::from_millis(config.stop_on_idle_timeout_millis)
476 } else {
477 fasync::MonotonicDuration::INFINITE
478 };
479
480 let mut fs = ServiceFs::new();
481 let _: &mut ServiceFsDir<'_, _> = fs
482 .take_and_serve_directory_handle()?
483 .dir("svc")
484 .add_fidl_service(HttpServices::Loader)
485 .add_fidl_service(HttpServices::PkgClient);
486
487 let lifecycle_task = async move {
488 let Some(Ok(request)) = lifecycle_request_stream.next().await else {
489 return std::future::pending::<()>().await;
490 };
491 match request {
492 flifecycle::LifecycleRequest::Stop { .. } => {
493 return;
497 }
498 }
499 };
500
501 let outgoing_dir_task = async move {
502 fs.until_stalled(idle_timeout)
503 .for_each_concurrent(None, |item| async {
504 match item {
505 Item::Request(services, _active_guard) => match services {
506 HttpServices::Loader(stream) => loader_server(stream, idle_timeout)
507 .await
508 .unwrap_or_else(|e: anyhow::Error| error!("{:?}", e)),
509 HttpServices::PkgClient(stream) => pkg::serve_client_request_stream(
510 stream,
511 idle_timeout,
512 pkg_http_connections_node.create_child(
513 pkg_http_connection_count
514 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
515 .to_string(),
516 ),
517 )
518 .await
519 .unwrap_or_else(|e: anyhow::Error| error!("{e:#}")),
520 },
521 Item::Stalled(outgoing_directory) => {
522 escrow_outgoing(lifecycle_control_handle.clone(), outgoing_directory.into())
523 }
524 }
525 })
526 .await;
527 };
528
529 match futures::future::select(lifecycle_task.boxed_local(), outgoing_dir_task.boxed_local())
530 .await
531 {
532 Either::Left(_) => log::info!("http-client stopping because we are told to stop"),
533 Either::Right(_) => log::info!("http-client stopping because it is idle"),
534 }
535
536 Ok(())
537}
538
539fn escrow_outgoing(
542 lifecycle_control_handle: flifecycle::LifecycleControlHandle,
543 outgoing_dir: ServerEnd<fio::DirectoryMarker>,
544) {
545 let outgoing_dir = Some(outgoing_dir);
546 lifecycle_control_handle
547 .send_on_escrow(flifecycle::LifecycleOnEscrowRequest { outgoing_dir, ..Default::default() })
548 .unwrap();
549}