1use anyhow::Context as _;
6use fidl::endpoints::ServerEnd;
7use fidl::prelude::*;
8use fuchsia_async::{self as fasync, TimeoutExt as _};
9use fuchsia_component::server::{Item, ServiceFs, ServiceFsDir};
10use fuchsia_runtime::{HandleInfo, HandleType};
11use futures::StreamExt;
12use futures::future::Either;
13use futures::prelude::*;
14use http_client_config::Config;
15use log::{debug, error, info, trace};
16use std::str::FromStr as _;
17use {
18 fidl_fuchsia_io as fio, fidl_fuchsia_net_http as net_http, fidl_fuchsia_pkg_http as fpkg_http,
19 fidl_fuchsia_process_lifecycle as flifecycle, fuchsia_hyper as fhyper,
20 fuchsia_inspect as finspect,
21};
22
23mod pkg;
24mod resuming_get;
25
26static MAX_REDIRECTS: u8 = 10;
27static DEFAULT_DEADLINE_DURATION: zx::MonotonicDuration = zx::MonotonicDuration::from_seconds(15);
28
29fn to_status_line(version: hyper::Version, status: hyper::StatusCode) -> Vec<u8> {
30 match status.canonical_reason() {
31 None => format!("{:?} {}", version, status.as_str()),
32 Some(canonical_reason) => format!("{:?} {} {}", version, status.as_str(), canonical_reason),
33 }
34 .as_bytes()
35 .to_vec()
36}
37
38fn tcp_options() -> fhyper::TcpOptions {
39 let mut options: fhyper::TcpOptions = std::default::Default::default();
40
41 options.keepalive_idle = Some(std::time::Duration::from_secs(60));
44 options.keepalive_interval = Some(std::time::Duration::from_secs(15));
45 options.keepalive_count = Some(8);
47
48 options
49}
50
51struct RedirectInfo {
52 url: Option<hyper::Uri>,
53 referrer: Option<hyper::Uri>,
54 method: hyper::Method,
55}
56
57fn redirect_info(
58 old_uri: &hyper::Uri,
59 method: &hyper::Method,
60 hyper_response: &hyper::Response<hyper::Body>,
61) -> Option<RedirectInfo> {
62 if hyper_response.status().is_redirection() {
63 Some(RedirectInfo {
64 url: hyper_response
65 .headers()
66 .get(hyper::header::LOCATION)
67 .and_then(|loc| calculate_redirect(old_uri, loc)),
68 referrer: hyper_response
69 .headers()
70 .get(hyper::header::REFERER)
71 .and_then(|loc| calculate_redirect(old_uri, loc)),
72 method: if hyper_response.status() == hyper::StatusCode::SEE_OTHER {
73 hyper::Method::GET
74 } else {
75 method.clone()
76 },
77 })
78 } else {
79 None
80 }
81}
82
83async fn to_success_response(
84 current_url: &hyper::Uri,
85 current_method: &hyper::Method,
86 mut hyper_response: hyper::Response<hyper::Body>,
87 scope: vfs::execution_scope::ExecutionScope,
88) -> net_http::Response {
89 let redirect_info = redirect_info(current_url, current_method, &hyper_response);
90 let headers = hyper_response
91 .headers()
92 .iter()
93 .map(|(name, value)| net_http::Header {
94 name: name.as_str().as_bytes().to_vec(),
95 value: value.as_bytes().to_vec(),
96 })
97 .collect();
98
99 let (tx, rx) = zx::Socket::create_stream();
100 let response = net_http::Response {
101 error: None,
102 body: Some(rx),
103 final_url: Some(current_url.to_string()),
104 status_code: Some(hyper_response.status().as_u16() as u32),
105 status_line: Some(to_status_line(hyper_response.version(), hyper_response.status())),
106 headers: Some(headers),
107 redirect: redirect_info.map(|info| net_http::RedirectTarget {
108 method: Some(info.method.to_string()),
109 url: info.url.map(|u| u.to_string()),
110 referrer: info.referrer.map(|r| r.to_string()),
111 ..Default::default()
112 }),
113 ..Default::default()
114 };
115
116 let _ = scope.spawn(async move {
117 let hyper_body = hyper_response.body_mut();
118 while let Some(chunk) = hyper_body.next().await {
119 if let Ok(chunk) = chunk {
120 let mut offset: usize = 0;
121 while offset < chunk.len() {
122 let pending = match tx.wait_one(
123 zx::Signals::SOCKET_PEER_CLOSED | zx::Signals::SOCKET_WRITABLE,
124 zx::MonotonicInstant::INFINITE,
125 ).to_result() {
126 Err(status) => {
127 error!("tx.wait() failed - status: {}", status);
128 return;
129 }
130 Ok(pending) => pending,
131 };
132 if pending.contains(zx::Signals::SOCKET_PEER_CLOSED) {
133 info!("tx.wait() saw signal SOCKET_PEER_CLOSED");
134 return;
135 }
136 assert!(pending.contains(zx::Signals::SOCKET_WRITABLE));
137 let written = match tx.write(&chunk[offset..]) {
138 Err(status) => {
139 if status == zx::Status::SHOULD_WAIT {
142 error!("Saw SHOULD_WAIT despite waiting first - expected now? - continuing");
143 continue;
144 }
145 info!("tx.write() failed - status: {}", status);
146 return;
147 }
148 Ok(written) => written,
149 };
150 offset += written;
151 }
152 }
153 }
154 });
155
156 response
157}
158
159fn to_fidl_error(error: &hyper::Error) -> net_http::Error {
160 #[allow(clippy::if_same_then_else)] if error.is_parse() {
162 net_http::Error::UnableToParse
163 } else if error.is_user() {
164 net_http::Error::Internal
166 } else if error.is_canceled() {
167 net_http::Error::Internal
169 } else if error.is_closed() {
170 net_http::Error::ChannelClosed
171 } else if error.is_connect() {
172 net_http::Error::Connect
173 } else if error.is_incomplete_message() {
174 net_http::Error::Internal
176 } else if error.is_body_write_aborted() {
177 net_http::Error::Internal
179 } else {
180 net_http::Error::Internal
181 }
182}
183
184fn to_error_response(error: net_http::Error) -> net_http::Response {
185 net_http::Response {
186 error: Some(error),
187 body: None,
188 final_url: None,
189 status_code: None,
190 status_line: None,
191 headers: None,
192 redirect: None,
193 ..Default::default()
194 }
195}
196
197struct Loader {
198 method: hyper::Method,
199 url: hyper::Uri,
200 headers: hyper::HeaderMap,
201 body: Vec<u8>,
202 deadline: fasync::MonotonicInstant,
203 scope: vfs::execution_scope::ExecutionScope,
204}
205
206impl Loader {
207 async fn new(
208 req: net_http::Request,
209 scope: vfs::execution_scope::ExecutionScope,
210 ) -> Result<Self, anyhow::Error> {
211 let net_http::Request { method, url, headers, body, deadline, .. } = req;
212 let method = method.as_ref().map(|method| hyper::Method::from_str(method)).transpose()?;
213 let method = method.unwrap_or(hyper::Method::GET);
214 if let Some(url) = url {
215 let url = hyper::Uri::try_from(url)?;
216 let headers = headers
217 .unwrap_or_else(|| vec![])
218 .into_iter()
219 .map(|net_http::Header { name, value }| {
220 let name = hyper::header::HeaderName::from_bytes(&name)?;
221 let value = hyper::header::HeaderValue::from_bytes(&value)?;
222 Ok((name, value))
223 })
224 .collect::<Result<hyper::HeaderMap, anyhow::Error>>()?;
225
226 let body = match body {
227 Some(net_http::Body::Buffer(buffer)) => {
228 let mut bytes = vec![0; buffer.size as usize];
229 buffer.vmo.read(&mut bytes, 0)?;
230 bytes
231 }
232 Some(net_http::Body::Stream(socket)) => {
233 let mut stream = fasync::Socket::from_socket(socket)
234 .into_datagram_stream()
235 .map(|r| r.context("reading from datagram stream"));
236 let mut bytes = Vec::new();
237 while let Some(chunk) = stream.next().await {
238 bytes.extend(chunk?);
239 }
240 bytes
241 }
242 None => Vec::new(),
243 };
244
245 let deadline = deadline
246 .map(|deadline| fasync::MonotonicInstant::from_nanos(deadline))
247 .unwrap_or_else(|| fasync::MonotonicInstant::after(DEFAULT_DEADLINE_DURATION));
248
249 trace!("Starting request {} {}", method, url);
250
251 Ok(Loader { method, url, headers, body, deadline, scope })
252 } else {
253 Err(anyhow::Error::msg("Request missing URL"))
254 }
255 }
256
257 fn build_request(&self) -> hyper::Request<hyper::Body> {
258 let Self { method, url, headers, body, deadline: _, scope: _ } = self;
259 let mut request = hyper::Request::new(body.clone().into());
260 *request.method_mut() = method.clone();
261 *request.uri_mut() = url.clone();
262 *request.headers_mut() = headers.clone();
263 request
264 }
265
266 async fn start(mut self, loader_client: net_http::LoaderClientProxy) -> Result<(), zx::Status> {
267 let client = fhyper::new_https_client_from_tcp_options(tcp_options());
268 loop {
269 break match client.request(self.build_request()).await {
270 Ok(hyper_response) => {
271 let redirect = redirect_info(&self.url, &self.method, &hyper_response);
272 if let Some(redirect) = redirect {
273 if let Some(url) = redirect.url {
274 self.url = url;
275 self.method = redirect.method;
276 trace!(
277 "Reporting redirect to OnResponse: {} {}",
278 self.method, self.url
279 );
280 let response = to_success_response(
281 &self.url,
282 &self.method,
283 hyper_response,
284 self.scope.clone(),
285 )
286 .await;
287 match loader_client.on_response(response).await {
288 Ok(()) => {}
289 Err(e) => {
290 debug!("Not redirecting because: {}", e);
291 break Ok(());
292 }
293 };
294 trace!("Redirect allowed to {} {}", self.method, self.url);
295 continue;
296 }
297 }
298 let response = to_success_response(
299 &self.url,
300 &self.method,
301 hyper_response,
302 self.scope.clone(),
303 )
304 .await;
305 let _: Result<_, _> = loader_client.on_response(response).await;
308 Ok(())
309 }
310 Err(error) => {
311 info!("Received network level error from hyper: {}", error);
312 let _: Result<_, _> =
315 loader_client.on_response(to_error_response(to_fidl_error(&error))).await;
316 Ok(())
317 }
318 };
319 }
320 }
321
322 async fn fetch(
323 mut self,
324 ) -> Result<(hyper::Response<hyper::Body>, hyper::Uri, hyper::Method), net_http::Error> {
325 let deadline = self.deadline;
326 if deadline < fasync::MonotonicInstant::now() {
327 return Err(net_http::Error::DeadlineExceeded);
328 }
329 let client = fhyper::new_https_client_from_tcp_options(tcp_options());
330
331 async move {
332 let mut redirects = 0;
333 loop {
334 break match client.request(self.build_request()).await {
335 Ok(hyper_response) => {
336 if redirects != MAX_REDIRECTS {
337 let redirect = redirect_info(&self.url, &self.method, &hyper_response);
338 if let Some(redirect) = redirect {
339 if let Some(url) = redirect.url {
340 self.url = url;
341 self.method = redirect.method;
342 trace!("Redirecting to {} {}", self.method, self.url);
343 redirects += 1;
344 continue;
345 }
346 }
347 }
348 Ok((hyper_response, self.url, self.method))
349 }
350 Err(e) => {
351 info!("Received network level error from hyper: {}", e);
352 Err(to_fidl_error(&e))
353 }
354 };
355 }
356 }
357 .on_timeout(deadline, || Err(net_http::Error::DeadlineExceeded))
358 .await
359 }
360}
361
362fn calculate_redirect(
363 old_url: &hyper::Uri,
364 location: &hyper::header::HeaderValue,
365) -> Option<hyper::Uri> {
366 let old_parts = old_url.clone().into_parts();
367 let mut new_parts = hyper::Uri::try_from(location.as_bytes()).ok()?.into_parts();
368 if new_parts.scheme.is_none() {
369 new_parts.scheme = old_parts.scheme;
370 }
371 if new_parts.authority.is_none() {
372 new_parts.authority = old_parts.authority;
373 }
374 Some(hyper::Uri::from_parts(new_parts).ok()?)
375}
376
377async fn loader_server(
378 stream: net_http::LoaderRequestStream,
379 idle_timeout: fasync::MonotonicDuration,
380) -> Result<(), anyhow::Error> {
381 let background_tasks = vfs::execution_scope::ExecutionScope::new();
382 let (stream, unbind_if_stalled) = detect_stall::until_stalled(stream, idle_timeout);
383
384 stream
385 .err_into::<anyhow::Error>()
386 .try_for_each_concurrent(None, |message| {
387 let scope = background_tasks.clone();
388 async move {
389 match message {
390 net_http::LoaderRequest::Fetch { request, responder } => {
391 debug!(
392 "Fetch request received (url: {}): {:?}",
393 request
394 .url
395 .as_ref()
396 .and_then(|url| Some(url.as_str()))
397 .unwrap_or_default(),
398 request
399 );
400 let result = Loader::new(request, scope.clone()).await?.fetch().await;
401 responder.send(match result {
402 Ok((hyper_response, final_url, final_method)) => {
403 to_success_response(
404 &final_url,
405 &final_method,
406 hyper_response,
407 scope.clone(),
408 )
409 .await
410 }
411 Err(error) => to_error_response(error),
412 })?;
413 }
414 net_http::LoaderRequest::Start { request, client, control_handle } => {
415 debug!(
416 "Start request received (url: {}): {:?}",
417 request
418 .url
419 .as_ref()
420 .and_then(|url| Some(url.as_str()))
421 .unwrap_or_default(),
422 request
423 );
424 Loader::new(request, scope).await?.start(client.into_proxy()).await?;
425 control_handle.shutdown();
426 }
427 }
428 Ok(())
429 }
430 })
431 .await?;
432
433 background_tasks.wait().await;
434
435 if let Ok(Some(server_end)) = unbind_if_stalled.await {
438 fuchsia_component::client::connect_channel_to_protocol_at::<net_http::LoaderMarker>(
439 server_end.into(),
440 "/escrow",
441 )?;
442 }
443
444 Ok(())
445}
446
447enum HttpServices {
448 Loader(net_http::LoaderRequestStream),
449 PkgClient(fpkg_http::ClientRequestStream),
450}
451
452#[fuchsia::main]
453pub async fn main() -> Result<(), anyhow::Error> {
454 log::info!("http-client starting");
455 fuchsia_trace_provider::trace_provider_create_with_fdio();
456 let inspector = finspect::Inspector::default();
457 let pkg_http_node = inspector.root().create_child("pkg-http");
458 let pkg_http_connections_node = pkg_http_node.create_child("connections");
459 let pkg_http_connection_count = std::sync::atomic::AtomicU64::new(0);
460 let _inspect_server_task =
461 inspect_runtime::publish(&inspector, inspect_runtime::PublishOptions::default());
462
463 let lifecycle =
466 fuchsia_runtime::take_startup_handle(HandleInfo::new(HandleType::Lifecycle, 0)).unwrap();
467 let lifecycle: zx::Channel = lifecycle.into();
468 let lifecycle: ServerEnd<flifecycle::LifecycleMarker> = lifecycle.into();
469 let (mut lifecycle_request_stream, lifecycle_control_handle) =
470 lifecycle.into_stream_and_control_handle();
471
472 let config = Config::take_from_startup_handle();
473 let idle_timeout = if config.stop_on_idle_timeout_millis >= 0 {
474 fasync::MonotonicDuration::from_millis(config.stop_on_idle_timeout_millis)
475 } else {
476 fasync::MonotonicDuration::INFINITE
477 };
478
479 let mut fs = ServiceFs::new();
480 let _: &mut ServiceFsDir<'_, _> = fs
481 .take_and_serve_directory_handle()?
482 .dir("svc")
483 .add_fidl_service(HttpServices::Loader)
484 .add_fidl_service(HttpServices::PkgClient);
485
486 let lifecycle_task = async move {
487 let Some(Ok(request)) = lifecycle_request_stream.next().await else {
488 return std::future::pending::<()>().await;
489 };
490 match request {
491 flifecycle::LifecycleRequest::Stop { .. } => {
492 return;
496 }
497 }
498 };
499
500 let outgoing_dir_task = async move {
501 fs.until_stalled(idle_timeout)
502 .for_each_concurrent(None, |item| async {
503 match item {
504 Item::Request(services, _active_guard) => match services {
505 HttpServices::Loader(stream) => loader_server(stream, idle_timeout)
506 .await
507 .unwrap_or_else(|e: anyhow::Error| error!("{:?}", e)),
508 HttpServices::PkgClient(stream) => pkg::serve_client_request_stream(
509 stream,
510 idle_timeout,
511 pkg_http_connections_node.create_child(
512 pkg_http_connection_count
513 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
514 .to_string(),
515 ),
516 )
517 .await
518 .unwrap_or_else(|e: anyhow::Error| error!("{e:#}")),
519 },
520 Item::Stalled(outgoing_directory) => {
521 escrow_outgoing(lifecycle_control_handle.clone(), outgoing_directory.into())
522 }
523 }
524 })
525 .await;
526 };
527
528 match futures::future::select(lifecycle_task.boxed_local(), outgoing_dir_task.boxed_local())
529 .await
530 {
531 Either::Left(_) => log::info!("http-client stopping because we are told to stop"),
532 Either::Right(_) => log::info!("http-client stopping because it is idle"),
533 }
534
535 Ok(())
536}
537
538fn escrow_outgoing(
541 lifecycle_control_handle: flifecycle::LifecycleControlHandle,
542 outgoing_dir: ServerEnd<fio::DirectoryMarker>,
543) {
544 let outgoing_dir = Some(outgoing_dir);
545 lifecycle_control_handle
546 .send_on_escrow(flifecycle::LifecycleOnEscrowRequest { outgoing_dir, ..Default::default() })
547 .unwrap();
548}