http_client/
main.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::Context as _;
6use fidl::endpoints::ServerEnd;
7use fidl::prelude::*;
8use fuchsia_async::{self as fasync, TimeoutExt as _};
9use fuchsia_component::server::{Item, ServiceFs, ServiceFsDir};
10use fuchsia_runtime::{HandleInfo, HandleType};
11use futures::StreamExt;
12use futures::future::Either;
13use futures::prelude::*;
14use http_client_config::Config;
15use log::{debug, error, info, trace};
16use std::str::FromStr as _;
17use {
18    fidl_fuchsia_io as fio, fidl_fuchsia_net_http as net_http, fidl_fuchsia_pkg_http as fpkg_http,
19    fidl_fuchsia_process_lifecycle as flifecycle, fuchsia_hyper as fhyper,
20    fuchsia_inspect as finspect,
21};
22
23mod pkg;
24mod resuming_get;
25
26static MAX_REDIRECTS: u8 = 10;
27static DEFAULT_DEADLINE_DURATION: zx::MonotonicDuration = zx::MonotonicDuration::from_seconds(15);
28
29fn to_status_line(version: hyper::Version, status: hyper::StatusCode) -> Vec<u8> {
30    match status.canonical_reason() {
31        None => format!("{:?} {}", version, status.as_str()),
32        Some(canonical_reason) => format!("{:?} {} {}", version, status.as_str(), canonical_reason),
33    }
34    .as_bytes()
35    .to_vec()
36}
37
38fn tcp_options() -> fhyper::TcpOptions {
39    let mut options: fhyper::TcpOptions = std::default::Default::default();
40
41    // Use TCP keepalive to notice stuck connections.
42    // After 60s with no data received send a probe every 15s.
43    options.keepalive_idle = Some(std::time::Duration::from_secs(60));
44    options.keepalive_interval = Some(std::time::Duration::from_secs(15));
45    // After 8 probes go unacknowledged treat the connection as dead.
46    options.keepalive_count = Some(8);
47
48    options
49}
50
51struct RedirectInfo {
52    url: Option<hyper::Uri>,
53    referrer: Option<hyper::Uri>,
54    method: hyper::Method,
55}
56
57fn redirect_info(
58    old_uri: &hyper::Uri,
59    method: &hyper::Method,
60    hyper_response: &hyper::Response<hyper::Body>,
61) -> Option<RedirectInfo> {
62    if hyper_response.status().is_redirection() {
63        Some(RedirectInfo {
64            url: hyper_response
65                .headers()
66                .get(hyper::header::LOCATION)
67                .and_then(|loc| calculate_redirect(old_uri, loc)),
68            referrer: hyper_response
69                .headers()
70                .get(hyper::header::REFERER)
71                .and_then(|loc| calculate_redirect(old_uri, loc)),
72            method: if hyper_response.status() == hyper::StatusCode::SEE_OTHER {
73                hyper::Method::GET
74            } else {
75                method.clone()
76            },
77        })
78    } else {
79        None
80    }
81}
82
83async fn to_success_response(
84    current_url: &hyper::Uri,
85    current_method: &hyper::Method,
86    mut hyper_response: hyper::Response<hyper::Body>,
87    scope: vfs::execution_scope::ExecutionScope,
88) -> net_http::Response {
89    let redirect_info = redirect_info(current_url, current_method, &hyper_response);
90    let headers = hyper_response
91        .headers()
92        .iter()
93        .map(|(name, value)| net_http::Header {
94            name: name.as_str().as_bytes().to_vec(),
95            value: value.as_bytes().to_vec(),
96        })
97        .collect();
98
99    let (tx, rx) = zx::Socket::create_stream();
100    let response = net_http::Response {
101        error: None,
102        body: Some(rx),
103        final_url: Some(current_url.to_string()),
104        status_code: Some(hyper_response.status().as_u16() as u32),
105        status_line: Some(to_status_line(hyper_response.version(), hyper_response.status())),
106        headers: Some(headers),
107        redirect: redirect_info.map(|info| net_http::RedirectTarget {
108            method: Some(info.method.to_string()),
109            url: info.url.map(|u| u.to_string()),
110            referrer: info.referrer.map(|r| r.to_string()),
111            ..Default::default()
112        }),
113        ..Default::default()
114    };
115
116    let _ = scope.spawn(async move {
117        let hyper_body = hyper_response.body_mut();
118        while let Some(chunk) = hyper_body.next().await {
119            if let Ok(chunk) = chunk {
120                let mut offset: usize = 0;
121                while offset < chunk.len() {
122                    let pending = match tx.wait_one(
123                        zx::Signals::SOCKET_PEER_CLOSED | zx::Signals::SOCKET_WRITABLE,
124                        zx::MonotonicInstant::INFINITE,
125                    ).to_result() {
126                        Err(status) => {
127                            error!("tx.wait() failed - status: {}", status);
128                            return;
129                        }
130                        Ok(pending) => pending,
131                    };
132                    if pending.contains(zx::Signals::SOCKET_PEER_CLOSED) {
133                        info!("tx.wait() saw signal SOCKET_PEER_CLOSED");
134                        return;
135                    }
136                    assert!(pending.contains(zx::Signals::SOCKET_WRITABLE));
137                    let written = match tx.write(&chunk[offset..]) {
138                        Err(status) => {
139                            // Because of the wait above, we shouldn't ever see SHOULD_WAIT here, but to avoid
140                            // brittle-ness, continue and wait again in that case.
141                            if status == zx::Status::SHOULD_WAIT {
142                                error!("Saw SHOULD_WAIT despite waiting first - expected now? - continuing");
143                                continue;
144                            }
145                            info!("tx.write() failed - status: {}", status);
146                            return;
147                        }
148                        Ok(written) => written,
149                    };
150                    offset += written;
151                }
152            }
153        }
154    });
155
156    response
157}
158
159fn to_fidl_error(error: &hyper::Error) -> net_http::Error {
160    #[allow(clippy::if_same_then_else)] // TODO(https://fxbug.dev/42176989)
161    if error.is_parse() {
162        net_http::Error::UnableToParse
163    } else if error.is_user() {
164        //TODO(zmbush): handle this case.
165        net_http::Error::Internal
166    } else if error.is_canceled() {
167        //TODO(zmbush): handle this case.
168        net_http::Error::Internal
169    } else if error.is_closed() {
170        net_http::Error::ChannelClosed
171    } else if error.is_connect() {
172        net_http::Error::Connect
173    } else if error.is_incomplete_message() {
174        //TODO(zmbush): handle this case.
175        net_http::Error::Internal
176    } else if error.is_body_write_aborted() {
177        //TODO(zmbush): handle this case.
178        net_http::Error::Internal
179    } else {
180        net_http::Error::Internal
181    }
182}
183
184fn to_error_response(error: net_http::Error) -> net_http::Response {
185    net_http::Response {
186        error: Some(error),
187        body: None,
188        final_url: None,
189        status_code: None,
190        status_line: None,
191        headers: None,
192        redirect: None,
193        ..Default::default()
194    }
195}
196
197struct Loader {
198    method: hyper::Method,
199    url: hyper::Uri,
200    headers: hyper::HeaderMap,
201    body: Vec<u8>,
202    deadline: fasync::MonotonicInstant,
203    scope: vfs::execution_scope::ExecutionScope,
204}
205
206impl Loader {
207    async fn new(
208        req: net_http::Request,
209        scope: vfs::execution_scope::ExecutionScope,
210    ) -> Result<Self, anyhow::Error> {
211        let net_http::Request { method, url, headers, body, deadline, .. } = req;
212        let method = method.as_ref().map(|method| hyper::Method::from_str(method)).transpose()?;
213        let method = method.unwrap_or(hyper::Method::GET);
214        if let Some(url) = url {
215            let url = hyper::Uri::try_from(url)?;
216            let headers = headers
217                .unwrap_or_else(|| vec![])
218                .into_iter()
219                .map(|net_http::Header { name, value }| {
220                    let name = hyper::header::HeaderName::from_bytes(&name)?;
221                    let value = hyper::header::HeaderValue::from_bytes(&value)?;
222                    Ok((name, value))
223                })
224                .collect::<Result<hyper::HeaderMap, anyhow::Error>>()?;
225
226            let body = match body {
227                Some(net_http::Body::Buffer(buffer)) => {
228                    let mut bytes = vec![0; buffer.size as usize];
229                    buffer.vmo.read(&mut bytes, 0)?;
230                    bytes
231                }
232                Some(net_http::Body::Stream(socket)) => {
233                    let mut stream = fasync::Socket::from_socket(socket)
234                        .into_datagram_stream()
235                        .map(|r| r.context("reading from datagram stream"));
236                    let mut bytes = Vec::new();
237                    while let Some(chunk) = stream.next().await {
238                        bytes.extend(chunk?);
239                    }
240                    bytes
241                }
242                None => Vec::new(),
243            };
244
245            let deadline = deadline
246                .map(|deadline| fasync::MonotonicInstant::from_nanos(deadline))
247                .unwrap_or_else(|| fasync::MonotonicInstant::after(DEFAULT_DEADLINE_DURATION));
248
249            trace!("Starting request {} {}", method, url);
250
251            Ok(Loader { method, url, headers, body, deadline, scope })
252        } else {
253            Err(anyhow::Error::msg("Request missing URL"))
254        }
255    }
256
257    fn build_request(&self) -> hyper::Request<hyper::Body> {
258        let Self { method, url, headers, body, deadline: _, scope: _ } = self;
259        let mut request = hyper::Request::new(body.clone().into());
260        *request.method_mut() = method.clone();
261        *request.uri_mut() = url.clone();
262        *request.headers_mut() = headers.clone();
263        request
264    }
265
266    async fn start(mut self, loader_client: net_http::LoaderClientProxy) -> Result<(), zx::Status> {
267        let client = fhyper::new_https_client_from_tcp_options(tcp_options());
268        loop {
269            break match client.request(self.build_request()).await {
270                Ok(hyper_response) => {
271                    let redirect = redirect_info(&self.url, &self.method, &hyper_response);
272                    if let Some(redirect) = redirect {
273                        if let Some(url) = redirect.url {
274                            self.url = url;
275                            self.method = redirect.method;
276                            trace!(
277                                "Reporting redirect to OnResponse: {} {}",
278                                self.method, self.url
279                            );
280                            let response = to_success_response(
281                                &self.url,
282                                &self.method,
283                                hyper_response,
284                                self.scope.clone(),
285                            )
286                            .await;
287                            match loader_client.on_response(response).await {
288                                Ok(()) => {}
289                                Err(e) => {
290                                    debug!("Not redirecting because: {}", e);
291                                    break Ok(());
292                                }
293                            };
294                            trace!("Redirect allowed to {} {}", self.method, self.url);
295                            continue;
296                        }
297                    }
298                    let response = to_success_response(
299                        &self.url,
300                        &self.method,
301                        hyper_response,
302                        self.scope.clone(),
303                    )
304                    .await;
305                    // We don't care if on_response returns an error since this is the last
306                    // callback.
307                    let _: Result<_, _> = loader_client.on_response(response).await;
308                    Ok(())
309                }
310                Err(error) => {
311                    info!("Received network level error from hyper: {}", error);
312                    // We don't care if on_response returns an error since this is the last
313                    // callback.
314                    let _: Result<_, _> =
315                        loader_client.on_response(to_error_response(to_fidl_error(&error))).await;
316                    Ok(())
317                }
318            };
319        }
320    }
321
322    async fn fetch(
323        mut self,
324    ) -> Result<(hyper::Response<hyper::Body>, hyper::Uri, hyper::Method), net_http::Error> {
325        let deadline = self.deadline;
326        if deadline < fasync::MonotonicInstant::now() {
327            return Err(net_http::Error::DeadlineExceeded);
328        }
329        let client = fhyper::new_https_client_from_tcp_options(tcp_options());
330
331        async move {
332            let mut redirects = 0;
333            loop {
334                break match client.request(self.build_request()).await {
335                    Ok(hyper_response) => {
336                        if redirects != MAX_REDIRECTS {
337                            let redirect = redirect_info(&self.url, &self.method, &hyper_response);
338                            if let Some(redirect) = redirect {
339                                if let Some(url) = redirect.url {
340                                    self.url = url;
341                                    self.method = redirect.method;
342                                    trace!("Redirecting to {} {}", self.method, self.url);
343                                    redirects += 1;
344                                    continue;
345                                }
346                            }
347                        }
348                        Ok((hyper_response, self.url, self.method))
349                    }
350                    Err(e) => {
351                        info!("Received network level error from hyper: {}", e);
352                        Err(to_fidl_error(&e))
353                    }
354                };
355            }
356        }
357        .on_timeout(deadline, || Err(net_http::Error::DeadlineExceeded))
358        .await
359    }
360}
361
362fn calculate_redirect(
363    old_url: &hyper::Uri,
364    location: &hyper::header::HeaderValue,
365) -> Option<hyper::Uri> {
366    let old_parts = old_url.clone().into_parts();
367    let mut new_parts = hyper::Uri::try_from(location.as_bytes()).ok()?.into_parts();
368    if new_parts.scheme.is_none() {
369        new_parts.scheme = old_parts.scheme;
370    }
371    if new_parts.authority.is_none() {
372        new_parts.authority = old_parts.authority;
373    }
374    Some(hyper::Uri::from_parts(new_parts).ok()?)
375}
376
377async fn loader_server(
378    stream: net_http::LoaderRequestStream,
379    idle_timeout: fasync::MonotonicDuration,
380) -> Result<(), anyhow::Error> {
381    let background_tasks = vfs::execution_scope::ExecutionScope::new();
382    let (stream, unbind_if_stalled) = detect_stall::until_stalled(stream, idle_timeout);
383
384    stream
385        .err_into::<anyhow::Error>()
386        .try_for_each_concurrent(None, |message| {
387            let scope = background_tasks.clone();
388            async move {
389                match message {
390                    net_http::LoaderRequest::Fetch { request, responder } => {
391                        debug!(
392                            "Fetch request received (url: {}): {:?}",
393                            request
394                                .url
395                                .as_ref()
396                                .and_then(|url| Some(url.as_str()))
397                                .unwrap_or_default(),
398                            request
399                        );
400                        let result = Loader::new(request, scope.clone()).await?.fetch().await;
401                        responder.send(match result {
402                            Ok((hyper_response, final_url, final_method)) => {
403                                to_success_response(
404                                    &final_url,
405                                    &final_method,
406                                    hyper_response,
407                                    scope.clone(),
408                                )
409                                .await
410                            }
411                            Err(error) => to_error_response(error),
412                        })?;
413                    }
414                    net_http::LoaderRequest::Start { request, client, control_handle } => {
415                        debug!(
416                            "Start request received (url: {}): {:?}",
417                            request
418                                .url
419                                .as_ref()
420                                .and_then(|url| Some(url.as_str()))
421                                .unwrap_or_default(),
422                            request
423                        );
424                        Loader::new(request, scope).await?.start(client.into_proxy()).await?;
425                        control_handle.shutdown();
426                    }
427                }
428                Ok(())
429            }
430        })
431        .await?;
432
433    background_tasks.wait().await;
434
435    // If the connection did not close or receive new messages within the timeout, send it
436    // over to component manager to wait for it on our behalf.
437    if let Ok(Some(server_end)) = unbind_if_stalled.await {
438        fuchsia_component::client::connect_channel_to_protocol_at::<net_http::LoaderMarker>(
439            server_end.into(),
440            "/escrow",
441        )?;
442    }
443
444    Ok(())
445}
446
447enum HttpServices {
448    Loader(net_http::LoaderRequestStream),
449    PkgClient(fpkg_http::ClientRequestStream),
450}
451
452#[fuchsia::main]
453pub async fn main() -> Result<(), anyhow::Error> {
454    log::info!("http-client starting");
455    fuchsia_trace_provider::trace_provider_create_with_fdio();
456    let inspector = finspect::Inspector::default();
457    let pkg_http_node = inspector.root().create_child("pkg-http");
458    let pkg_http_connections_node = pkg_http_node.create_child("connections");
459    let pkg_http_connection_count = std::sync::atomic::AtomicU64::new(0);
460    let _inspect_server_task =
461        inspect_runtime::publish(&inspector, inspect_runtime::PublishOptions::default());
462
463    // TODO(https://fxbug.dev/333080598): This is quite some boilerplate to escrow the outgoing dir.
464    // Design some library function to handle the lifecycle requests.
465    let lifecycle =
466        fuchsia_runtime::take_startup_handle(HandleInfo::new(HandleType::Lifecycle, 0)).unwrap();
467    let lifecycle: zx::Channel = lifecycle.into();
468    let lifecycle: ServerEnd<flifecycle::LifecycleMarker> = lifecycle.into();
469    let (mut lifecycle_request_stream, lifecycle_control_handle) =
470        lifecycle.into_stream_and_control_handle();
471
472    let config = Config::take_from_startup_handle();
473    let idle_timeout = if config.stop_on_idle_timeout_millis >= 0 {
474        fasync::MonotonicDuration::from_millis(config.stop_on_idle_timeout_millis)
475    } else {
476        fasync::MonotonicDuration::INFINITE
477    };
478
479    let mut fs = ServiceFs::new();
480    let _: &mut ServiceFsDir<'_, _> = fs
481        .take_and_serve_directory_handle()?
482        .dir("svc")
483        .add_fidl_service(HttpServices::Loader)
484        .add_fidl_service(HttpServices::PkgClient);
485
486    let lifecycle_task = async move {
487        let Some(Ok(request)) = lifecycle_request_stream.next().await else {
488            return std::future::pending::<()>().await;
489        };
490        match request {
491            flifecycle::LifecycleRequest::Stop { .. } => {
492                // TODO(https://fxbug.dev/332341289): If the framework asks us to stop, we still
493                // end up dropping requests. If we teach the `ServiceFs` etc. libraries to skip
494                // the timeout when this happens, we can cleanly stop the component.
495                return;
496            }
497        }
498    };
499
500    let outgoing_dir_task = async move {
501        fs.until_stalled(idle_timeout)
502            .for_each_concurrent(None, |item| async {
503                match item {
504                    Item::Request(services, _active_guard) => match services {
505                        HttpServices::Loader(stream) => loader_server(stream, idle_timeout)
506                            .await
507                            .unwrap_or_else(|e: anyhow::Error| error!("{:?}", e)),
508                        HttpServices::PkgClient(stream) => pkg::serve_client_request_stream(
509                            stream,
510                            idle_timeout,
511                            pkg_http_connections_node.create_child(
512                                pkg_http_connection_count
513                                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
514                                    .to_string(),
515                            ),
516                        )
517                        .await
518                        .unwrap_or_else(|e: anyhow::Error| error!("{e:#}")),
519                    },
520                    Item::Stalled(outgoing_directory) => {
521                        escrow_outgoing(lifecycle_control_handle.clone(), outgoing_directory.into())
522                    }
523                }
524            })
525            .await;
526    };
527
528    match futures::future::select(lifecycle_task.boxed_local(), outgoing_dir_task.boxed_local())
529        .await
530    {
531        Either::Left(_) => log::info!("http-client stopping because we are told to stop"),
532        Either::Right(_) => log::info!("http-client stopping because it is idle"),
533    }
534
535    Ok(())
536}
537
538/// Escrow the outgoing directory server endpoint to component manager, such that we will receive
539/// the same server endpoint on the next execution.
540fn escrow_outgoing(
541    lifecycle_control_handle: flifecycle::LifecycleControlHandle,
542    outgoing_dir: ServerEnd<fio::DirectoryMarker>,
543) {
544    let outgoing_dir = Some(outgoing_dir);
545    lifecycle_control_handle
546        .send_on_escrow(flifecycle::LifecycleOnEscrowRequest { outgoing_dir, ..Default::default() })
547        .unwrap();
548}