http_client/
main.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::Context as _;
6use fidl::endpoints::ServerEnd;
7use fidl::prelude::*;
8use fuchsia_async::{self as fasync, TimeoutExt as _};
9use fuchsia_component::server::{Item, ServiceFs, ServiceFsDir};
10use fuchsia_runtime::{HandleInfo, HandleType};
11use futures::StreamExt;
12use futures::future::Either;
13use futures::prelude::*;
14use http_client_config::Config;
15use log::{debug, error, info, trace};
16use std::str::FromStr as _;
17use zx::{self as zx, AsHandleRef};
18use {
19    fidl_fuchsia_io as fio, fidl_fuchsia_net_http as net_http, fidl_fuchsia_pkg_http as fpkg_http,
20    fidl_fuchsia_process_lifecycle as flifecycle, fuchsia_hyper as fhyper,
21    fuchsia_inspect as finspect,
22};
23
24mod pkg;
25mod resuming_get;
26
27static MAX_REDIRECTS: u8 = 10;
28static DEFAULT_DEADLINE_DURATION: zx::MonotonicDuration = zx::MonotonicDuration::from_seconds(15);
29
30fn to_status_line(version: hyper::Version, status: hyper::StatusCode) -> Vec<u8> {
31    match status.canonical_reason() {
32        None => format!("{:?} {}", version, status.as_str()),
33        Some(canonical_reason) => format!("{:?} {} {}", version, status.as_str(), canonical_reason),
34    }
35    .as_bytes()
36    .to_vec()
37}
38
39fn tcp_options() -> fhyper::TcpOptions {
40    let mut options: fhyper::TcpOptions = std::default::Default::default();
41
42    // Use TCP keepalive to notice stuck connections.
43    // After 60s with no data received send a probe every 15s.
44    options.keepalive_idle = Some(std::time::Duration::from_secs(60));
45    options.keepalive_interval = Some(std::time::Duration::from_secs(15));
46    // After 8 probes go unacknowledged treat the connection as dead.
47    options.keepalive_count = Some(8);
48
49    options
50}
51
52struct RedirectInfo {
53    url: Option<hyper::Uri>,
54    referrer: Option<hyper::Uri>,
55    method: hyper::Method,
56}
57
58fn redirect_info(
59    old_uri: &hyper::Uri,
60    method: &hyper::Method,
61    hyper_response: &hyper::Response<hyper::Body>,
62) -> Option<RedirectInfo> {
63    if hyper_response.status().is_redirection() {
64        Some(RedirectInfo {
65            url: hyper_response
66                .headers()
67                .get(hyper::header::LOCATION)
68                .and_then(|loc| calculate_redirect(old_uri, loc)),
69            referrer: hyper_response
70                .headers()
71                .get(hyper::header::REFERER)
72                .and_then(|loc| calculate_redirect(old_uri, loc)),
73            method: if hyper_response.status() == hyper::StatusCode::SEE_OTHER {
74                hyper::Method::GET
75            } else {
76                method.clone()
77            },
78        })
79    } else {
80        None
81    }
82}
83
84async fn to_success_response(
85    current_url: &hyper::Uri,
86    current_method: &hyper::Method,
87    mut hyper_response: hyper::Response<hyper::Body>,
88    scope: vfs::execution_scope::ExecutionScope,
89) -> net_http::Response {
90    let redirect_info = redirect_info(current_url, current_method, &hyper_response);
91    let headers = hyper_response
92        .headers()
93        .iter()
94        .map(|(name, value)| net_http::Header {
95            name: name.as_str().as_bytes().to_vec(),
96            value: value.as_bytes().to_vec(),
97        })
98        .collect();
99
100    let (tx, rx) = zx::Socket::create_stream();
101    let response = net_http::Response {
102        error: None,
103        body: Some(rx),
104        final_url: Some(current_url.to_string()),
105        status_code: Some(hyper_response.status().as_u16() as u32),
106        status_line: Some(to_status_line(hyper_response.version(), hyper_response.status())),
107        headers: Some(headers),
108        redirect: redirect_info.map(|info| net_http::RedirectTarget {
109            method: Some(info.method.to_string()),
110            url: info.url.map(|u| u.to_string()),
111            referrer: info.referrer.map(|r| r.to_string()),
112            ..Default::default()
113        }),
114        ..Default::default()
115    };
116
117    let _ = scope.spawn(async move {
118        let hyper_body = hyper_response.body_mut();
119        while let Some(chunk) = hyper_body.next().await {
120            if let Ok(chunk) = chunk {
121                let mut offset: usize = 0;
122                while offset < chunk.len() {
123                    let pending = match tx.wait_handle(
124                        zx::Signals::SOCKET_PEER_CLOSED | zx::Signals::SOCKET_WRITABLE,
125                        zx::MonotonicInstant::INFINITE,
126                    ).to_result() {
127                        Err(status) => {
128                            error!("tx.wait() failed - status: {}", status);
129                            return;
130                        }
131                        Ok(pending) => pending,
132                    };
133                    if pending.contains(zx::Signals::SOCKET_PEER_CLOSED) {
134                        info!("tx.wait() saw signal SOCKET_PEER_CLOSED");
135                        return;
136                    }
137                    assert!(pending.contains(zx::Signals::SOCKET_WRITABLE));
138                    let written = match tx.write(&chunk[offset..]) {
139                        Err(status) => {
140                            // Because of the wait above, we shouldn't ever see SHOULD_WAIT here, but to avoid
141                            // brittle-ness, continue and wait again in that case.
142                            if status == zx::Status::SHOULD_WAIT {
143                                error!("Saw SHOULD_WAIT despite waiting first - expected now? - continuing");
144                                continue;
145                            }
146                            info!("tx.write() failed - status: {}", status);
147                            return;
148                        }
149                        Ok(written) => written,
150                    };
151                    offset += written;
152                }
153            }
154        }
155    });
156
157    response
158}
159
160fn to_fidl_error(error: &hyper::Error) -> net_http::Error {
161    #[allow(clippy::if_same_then_else)] // TODO(https://fxbug.dev/42176989)
162    if error.is_parse() {
163        net_http::Error::UnableToParse
164    } else if error.is_user() {
165        //TODO(zmbush): handle this case.
166        net_http::Error::Internal
167    } else if error.is_canceled() {
168        //TODO(zmbush): handle this case.
169        net_http::Error::Internal
170    } else if error.is_closed() {
171        net_http::Error::ChannelClosed
172    } else if error.is_connect() {
173        net_http::Error::Connect
174    } else if error.is_incomplete_message() {
175        //TODO(zmbush): handle this case.
176        net_http::Error::Internal
177    } else if error.is_body_write_aborted() {
178        //TODO(zmbush): handle this case.
179        net_http::Error::Internal
180    } else {
181        net_http::Error::Internal
182    }
183}
184
185fn to_error_response(error: net_http::Error) -> net_http::Response {
186    net_http::Response {
187        error: Some(error),
188        body: None,
189        final_url: None,
190        status_code: None,
191        status_line: None,
192        headers: None,
193        redirect: None,
194        ..Default::default()
195    }
196}
197
198struct Loader {
199    method: hyper::Method,
200    url: hyper::Uri,
201    headers: hyper::HeaderMap,
202    body: Vec<u8>,
203    deadline: fasync::MonotonicInstant,
204    scope: vfs::execution_scope::ExecutionScope,
205}
206
207impl Loader {
208    async fn new(
209        req: net_http::Request,
210        scope: vfs::execution_scope::ExecutionScope,
211    ) -> Result<Self, anyhow::Error> {
212        let net_http::Request { method, url, headers, body, deadline, .. } = req;
213        let method = method.as_ref().map(|method| hyper::Method::from_str(method)).transpose()?;
214        let method = method.unwrap_or(hyper::Method::GET);
215        if let Some(url) = url {
216            let url = hyper::Uri::try_from(url)?;
217            let headers = headers
218                .unwrap_or_else(|| vec![])
219                .into_iter()
220                .map(|net_http::Header { name, value }| {
221                    let name = hyper::header::HeaderName::from_bytes(&name)?;
222                    let value = hyper::header::HeaderValue::from_bytes(&value)?;
223                    Ok((name, value))
224                })
225                .collect::<Result<hyper::HeaderMap, anyhow::Error>>()?;
226
227            let body = match body {
228                Some(net_http::Body::Buffer(buffer)) => {
229                    let mut bytes = vec![0; buffer.size as usize];
230                    buffer.vmo.read(&mut bytes, 0)?;
231                    bytes
232                }
233                Some(net_http::Body::Stream(socket)) => {
234                    let mut stream = fasync::Socket::from_socket(socket)
235                        .into_datagram_stream()
236                        .map(|r| r.context("reading from datagram stream"));
237                    let mut bytes = Vec::new();
238                    while let Some(chunk) = stream.next().await {
239                        bytes.extend(chunk?);
240                    }
241                    bytes
242                }
243                None => Vec::new(),
244            };
245
246            let deadline = deadline
247                .map(|deadline| fasync::MonotonicInstant::from_nanos(deadline))
248                .unwrap_or_else(|| fasync::MonotonicInstant::after(DEFAULT_DEADLINE_DURATION));
249
250            trace!("Starting request {} {}", method, url);
251
252            Ok(Loader { method, url, headers, body, deadline, scope })
253        } else {
254            Err(anyhow::Error::msg("Request missing URL"))
255        }
256    }
257
258    fn build_request(&self) -> hyper::Request<hyper::Body> {
259        let Self { method, url, headers, body, deadline: _, scope: _ } = self;
260        let mut request = hyper::Request::new(body.clone().into());
261        *request.method_mut() = method.clone();
262        *request.uri_mut() = url.clone();
263        *request.headers_mut() = headers.clone();
264        request
265    }
266
267    async fn start(mut self, loader_client: net_http::LoaderClientProxy) -> Result<(), zx::Status> {
268        let client = fhyper::new_https_client_from_tcp_options(tcp_options());
269        loop {
270            break match client.request(self.build_request()).await {
271                Ok(hyper_response) => {
272                    let redirect = redirect_info(&self.url, &self.method, &hyper_response);
273                    if let Some(redirect) = redirect {
274                        if let Some(url) = redirect.url {
275                            self.url = url;
276                            self.method = redirect.method;
277                            trace!(
278                                "Reporting redirect to OnResponse: {} {}",
279                                self.method, self.url
280                            );
281                            let response = to_success_response(
282                                &self.url,
283                                &self.method,
284                                hyper_response,
285                                self.scope.clone(),
286                            )
287                            .await;
288                            match loader_client.on_response(response).await {
289                                Ok(()) => {}
290                                Err(e) => {
291                                    debug!("Not redirecting because: {}", e);
292                                    break Ok(());
293                                }
294                            };
295                            trace!("Redirect allowed to {} {}", self.method, self.url);
296                            continue;
297                        }
298                    }
299                    let response = to_success_response(
300                        &self.url,
301                        &self.method,
302                        hyper_response,
303                        self.scope.clone(),
304                    )
305                    .await;
306                    // We don't care if on_response returns an error since this is the last
307                    // callback.
308                    let _: Result<_, _> = loader_client.on_response(response).await;
309                    Ok(())
310                }
311                Err(error) => {
312                    info!("Received network level error from hyper: {}", error);
313                    // We don't care if on_response returns an error since this is the last
314                    // callback.
315                    let _: Result<_, _> =
316                        loader_client.on_response(to_error_response(to_fidl_error(&error))).await;
317                    Ok(())
318                }
319            };
320        }
321    }
322
323    async fn fetch(
324        mut self,
325    ) -> Result<(hyper::Response<hyper::Body>, hyper::Uri, hyper::Method), net_http::Error> {
326        let deadline = self.deadline;
327        if deadline < fasync::MonotonicInstant::now() {
328            return Err(net_http::Error::DeadlineExceeded);
329        }
330        let client = fhyper::new_https_client_from_tcp_options(tcp_options());
331
332        async move {
333            let mut redirects = 0;
334            loop {
335                break match client.request(self.build_request()).await {
336                    Ok(hyper_response) => {
337                        if redirects != MAX_REDIRECTS {
338                            let redirect = redirect_info(&self.url, &self.method, &hyper_response);
339                            if let Some(redirect) = redirect {
340                                if let Some(url) = redirect.url {
341                                    self.url = url;
342                                    self.method = redirect.method;
343                                    trace!("Redirecting to {} {}", self.method, self.url);
344                                    redirects += 1;
345                                    continue;
346                                }
347                            }
348                        }
349                        Ok((hyper_response, self.url, self.method))
350                    }
351                    Err(e) => {
352                        info!("Received network level error from hyper: {}", e);
353                        Err(to_fidl_error(&e))
354                    }
355                };
356            }
357        }
358        .on_timeout(deadline, || Err(net_http::Error::DeadlineExceeded))
359        .await
360    }
361}
362
363fn calculate_redirect(
364    old_url: &hyper::Uri,
365    location: &hyper::header::HeaderValue,
366) -> Option<hyper::Uri> {
367    let old_parts = old_url.clone().into_parts();
368    let mut new_parts = hyper::Uri::try_from(location.as_bytes()).ok()?.into_parts();
369    if new_parts.scheme.is_none() {
370        new_parts.scheme = old_parts.scheme;
371    }
372    if new_parts.authority.is_none() {
373        new_parts.authority = old_parts.authority;
374    }
375    Some(hyper::Uri::from_parts(new_parts).ok()?)
376}
377
378async fn loader_server(
379    stream: net_http::LoaderRequestStream,
380    idle_timeout: fasync::MonotonicDuration,
381) -> Result<(), anyhow::Error> {
382    let background_tasks = vfs::execution_scope::ExecutionScope::new();
383    let (stream, unbind_if_stalled) = detect_stall::until_stalled(stream, idle_timeout);
384
385    stream
386        .err_into::<anyhow::Error>()
387        .try_for_each_concurrent(None, |message| {
388            let scope = background_tasks.clone();
389            async move {
390                match message {
391                    net_http::LoaderRequest::Fetch { request, responder } => {
392                        debug!(
393                            "Fetch request received (url: {}): {:?}",
394                            request
395                                .url
396                                .as_ref()
397                                .and_then(|url| Some(url.as_str()))
398                                .unwrap_or_default(),
399                            request
400                        );
401                        let result = Loader::new(request, scope.clone()).await?.fetch().await;
402                        responder.send(match result {
403                            Ok((hyper_response, final_url, final_method)) => {
404                                to_success_response(
405                                    &final_url,
406                                    &final_method,
407                                    hyper_response,
408                                    scope.clone(),
409                                )
410                                .await
411                            }
412                            Err(error) => to_error_response(error),
413                        })?;
414                    }
415                    net_http::LoaderRequest::Start { request, client, control_handle } => {
416                        debug!(
417                            "Start request received (url: {}): {:?}",
418                            request
419                                .url
420                                .as_ref()
421                                .and_then(|url| Some(url.as_str()))
422                                .unwrap_or_default(),
423                            request
424                        );
425                        Loader::new(request, scope).await?.start(client.into_proxy()).await?;
426                        control_handle.shutdown();
427                    }
428                }
429                Ok(())
430            }
431        })
432        .await?;
433
434    background_tasks.wait().await;
435
436    // If the connection did not close or receive new messages within the timeout, send it
437    // over to component manager to wait for it on our behalf.
438    if let Ok(Some(server_end)) = unbind_if_stalled.await {
439        fuchsia_component::client::connect_channel_to_protocol_at::<net_http::LoaderMarker>(
440            server_end.into(),
441            "/escrow",
442        )?;
443    }
444
445    Ok(())
446}
447
448enum HttpServices {
449    Loader(net_http::LoaderRequestStream),
450    PkgClient(fpkg_http::ClientRequestStream),
451}
452
453#[fuchsia::main]
454pub async fn main() -> Result<(), anyhow::Error> {
455    log::info!("http-client starting");
456    fuchsia_trace_provider::trace_provider_create_with_fdio();
457    let inspector = finspect::Inspector::default();
458    let pkg_http_node = inspector.root().create_child("pkg-http");
459    let pkg_http_connections_node = pkg_http_node.create_child("connections");
460    let pkg_http_connection_count = std::sync::atomic::AtomicU64::new(0);
461    let _inspect_server_task =
462        inspect_runtime::publish(&inspector, inspect_runtime::PublishOptions::default());
463
464    // TODO(https://fxbug.dev/333080598): This is quite some boilerplate to escrow the outgoing dir.
465    // Design some library function to handle the lifecycle requests.
466    let lifecycle =
467        fuchsia_runtime::take_startup_handle(HandleInfo::new(HandleType::Lifecycle, 0)).unwrap();
468    let lifecycle: zx::Channel = lifecycle.into();
469    let lifecycle: ServerEnd<flifecycle::LifecycleMarker> = lifecycle.into();
470    let (mut lifecycle_request_stream, lifecycle_control_handle) =
471        lifecycle.into_stream_and_control_handle();
472
473    let config = Config::take_from_startup_handle();
474    let idle_timeout = if config.stop_on_idle_timeout_millis >= 0 {
475        fasync::MonotonicDuration::from_millis(config.stop_on_idle_timeout_millis)
476    } else {
477        fasync::MonotonicDuration::INFINITE
478    };
479
480    let mut fs = ServiceFs::new();
481    let _: &mut ServiceFsDir<'_, _> = fs
482        .take_and_serve_directory_handle()?
483        .dir("svc")
484        .add_fidl_service(HttpServices::Loader)
485        .add_fidl_service(HttpServices::PkgClient);
486
487    let lifecycle_task = async move {
488        let Some(Ok(request)) = lifecycle_request_stream.next().await else {
489            return std::future::pending::<()>().await;
490        };
491        match request {
492            flifecycle::LifecycleRequest::Stop { .. } => {
493                // TODO(https://fxbug.dev/332341289): If the framework asks us to stop, we still
494                // end up dropping requests. If we teach the `ServiceFs` etc. libraries to skip
495                // the timeout when this happens, we can cleanly stop the component.
496                return;
497            }
498        }
499    };
500
501    let outgoing_dir_task = async move {
502        fs.until_stalled(idle_timeout)
503            .for_each_concurrent(None, |item| async {
504                match item {
505                    Item::Request(services, _active_guard) => match services {
506                        HttpServices::Loader(stream) => loader_server(stream, idle_timeout)
507                            .await
508                            .unwrap_or_else(|e: anyhow::Error| error!("{:?}", e)),
509                        HttpServices::PkgClient(stream) => pkg::serve_client_request_stream(
510                            stream,
511                            idle_timeout,
512                            pkg_http_connections_node.create_child(
513                                pkg_http_connection_count
514                                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
515                                    .to_string(),
516                            ),
517                        )
518                        .await
519                        .unwrap_or_else(|e: anyhow::Error| error!("{e:#}")),
520                    },
521                    Item::Stalled(outgoing_directory) => {
522                        escrow_outgoing(lifecycle_control_handle.clone(), outgoing_directory.into())
523                    }
524                }
525            })
526            .await;
527    };
528
529    match futures::future::select(lifecycle_task.boxed_local(), outgoing_dir_task.boxed_local())
530        .await
531    {
532        Either::Left(_) => log::info!("http-client stopping because we are told to stop"),
533        Either::Right(_) => log::info!("http-client stopping because it is idle"),
534    }
535
536    Ok(())
537}
538
539/// Escrow the outgoing directory server endpoint to component manager, such that we will receive
540/// the same server endpoint on the next execution.
541fn escrow_outgoing(
542    lifecycle_control_handle: flifecycle::LifecycleControlHandle,
543    outgoing_dir: ServerEnd<fio::DirectoryMarker>,
544) {
545    let outgoing_dir = Some(outgoing_dir);
546    lifecycle_control_handle
547        .send_on_escrow(flifecycle::LifecycleOnEscrowRequest { outgoing_dir, ..Default::default() })
548        .unwrap();
549}