Skip to main content

fuchsia_hyper/
fuchsia.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::happy_eyeballs::{self, RealSocketConnector};
6use crate::{
7    HyperConnectorFuture, SocketOptions, TcpOptions, TcpStream, connect_and_bind_device,
8    parse_ip_addr,
9};
10use fidl::endpoints::{ClientEnd, create_endpoints};
11use fidl_connector::{Connect, ServiceReconnector};
12use fidl_fuchsia_net_name::{LookupIpOptions, LookupMarker, LookupProxy, LookupResult};
13use fidl_fuchsia_posix_socket::{Domain, ProviderMarker, StreamSocketProtocol};
14use fuchsia_async::net;
15
16use futures::future::{Future, FutureExt};
17use futures::io;
18use futures::task::{Context, Poll};
19use http::uri::{Scheme, Uri};
20use hyper::service::Service;
21use rustls::RootCertStore;
22use std::convert::TryFrom as _;
23use std::net::SocketAddr;
24use std::num::TryFromIntError;
25use std::sync::{Arc, LazyLock};
26
27pub fn new_root_cert_store() -> Arc<RootCertStore> {
28    // It can be expensive to parse the certs, so cache them
29    static ROOT_STORE: LazyLock<Arc<RootCertStore>> = LazyLock::new(|| {
30        let mut root_store = rustls::RootCertStore::empty();
31
32        root_store.add_trust_anchors(webpki_roots_fuchsia::TLS_SERVER_ROOTS.iter().map(|cert| {
33            rustls::OwnedTrustAnchor::from_subject_spki_name_constraints(
34                cert.subject,
35                cert.spki,
36                cert.name_constraints,
37            )
38        }));
39
40        Arc::new(root_store)
41    });
42
43    Arc::clone(&ROOT_STORE)
44}
45
46/// A Fuchsia-compatible implementation of hyper's `Connect` trait which allows
47/// creating a TcpStream to a particular destination.
48#[derive(Clone)]
49pub struct HyperConnector {
50    tcp_options: TcpOptions,
51    socket_options: SocketOptions,
52    provider: RealServiceConnector,
53}
54
55impl From<(TcpOptions, SocketOptions)> for HyperConnector {
56    fn from((tcp_options, socket_options): (TcpOptions, SocketOptions)) -> Self {
57        Self { tcp_options, socket_options, provider: RealServiceConnector::new() }
58    }
59}
60
61impl HyperConnector {
62    pub fn new() -> Self {
63        Self::from_tcp_options(TcpOptions::default())
64    }
65
66    pub fn from_tcp_options(tcp_options: TcpOptions) -> Self {
67        Self {
68            tcp_options,
69            socket_options: SocketOptions::default(),
70            provider: RealServiceConnector::new(),
71        }
72    }
73}
74
75impl Service<Uri> for HyperConnector {
76    type Response = TcpStream;
77    type Error = std::io::Error;
78    type Future = HyperConnectorFuture;
79
80    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
81        // This connector is always ready, but others might not be.
82        Poll::Ready(Ok(()))
83    }
84
85    fn call(&mut self, dst: Uri) -> Self::Future {
86        let self_ = self.clone();
87        HyperConnectorFuture { fut: Box::pin(async move { self_.call_async(dst).await }) }
88    }
89}
90
91impl HyperConnector {
92    async fn call_async(&self, dst: Uri) -> Result<TcpStream, io::Error> {
93        let host = dst.host().ok_or_else(|| io::Error::other("destination host is unspecified"))?;
94        let port = match dst.port() {
95            Some(port) => port.as_u16(),
96            None => {
97                if dst.scheme() == Some(&Scheme::HTTPS) {
98                    443
99                } else {
100                    80
101                }
102            }
103        };
104
105        let stream = connect_to_addr(
106            self.provider.clone(),
107            host,
108            port,
109            self.socket_options.bind_device.as_deref(),
110        )
111        .await?;
112        let () = self.tcp_options.apply(stream.std())?;
113
114        Ok(TcpStream { stream })
115    }
116}
117
118#[allow(dead_code)] // TODO(https://fxbug.dev/421409340)
119#[derive(Clone)]
120pub struct Executor;
121
122impl<F: Future + Send + 'static> hyper::rt::Executor<F> for Executor {
123    fn execute(&self, fut: F) {
124        fuchsia_async::Task::spawn(fut.map(|_| ())).detach()
125    }
126}
127
128#[allow(dead_code)] // TODO(https://fxbug.dev/421409340)
129#[derive(Clone)]
130pub struct LocalExecutor;
131
132impl<F: Future + 'static> hyper::rt::Executor<F> for LocalExecutor {
133    fn execute(&self, fut: F) {
134        fuchsia_async::Task::local(fut.map(drop)).detach()
135    }
136}
137
138pub(crate) trait ProviderConnector {
139    fn connect(&self) -> Result<ClientEnd<ProviderMarker>, io::Error>;
140}
141
142trait LookupConnector {
143    fn connect(&self) -> Result<LookupProxy, io::Error>;
144}
145
146#[derive(Clone)]
147struct RealServiceConnector {
148    name_lookup_connector: ServiceReconnector<LookupMarker>,
149}
150
151impl RealServiceConnector {
152    fn new() -> Self {
153        RealServiceConnector { name_lookup_connector: ServiceReconnector::<LookupMarker>::new() }
154    }
155}
156
157impl ProviderConnector for RealServiceConnector {
158    fn connect(&self) -> Result<ClientEnd<ProviderMarker>, io::Error> {
159        let (client_end, server_end) = create_endpoints::<ProviderMarker>();
160        fuchsia_component::client::connect_channel_to_protocol::<ProviderMarker>(server_end.into())
161            .map_err(|err| {
162                io::Error::other(format!("failed to connect to socket provider service: {}", err))
163            })?;
164        Ok(client_end)
165    }
166}
167
168impl LookupConnector for RealServiceConnector {
169    fn connect(&self) -> Result<LookupProxy, io::Error> {
170        self.name_lookup_connector.connect().map_err(|err| {
171            io::Error::other(format!("failed to connect to name lookup service: {}", err))
172        })
173    }
174}
175
176async fn connect_to_addr<T: ProviderConnector + LookupConnector + 'static>(
177    provider: T,
178    host: &str,
179    port: u16,
180    bind_device: Option<&str>,
181) -> Result<net::TcpStream, io::Error> {
182    if let Some(addr) = parse_ip_addr_with_provider(&provider, host, port).await? {
183        return connect_and_bind_device(&provider, addr, bind_device)?.await;
184    }
185
186    happy_eyeballs::happy_eyeballs(
187        resolve_ip_addr(&provider, host, port).await?,
188        RealSocketConnector::new(provider),
189        happy_eyeballs::RECOMMENDED_MIN_CONN_ATT_DELAY,
190        happy_eyeballs::RECOMMENDED_CONN_ATT_DELAY,
191        bind_device,
192    )
193    .await
194}
195
196async fn resolve_ip_addr<T: LookupConnector>(
197    name_lookup: &T,
198    host: &str,
199    port: u16,
200) -> Result<impl Iterator<Item = SocketAddr> + use<T>, io::Error> {
201    let proxy = name_lookup.connect()?;
202    let LookupResult { addresses, .. } = proxy
203        .lookup_ip(
204            host,
205            &LookupIpOptions {
206                ipv4_lookup: Some(true),
207                ipv6_lookup: Some(true),
208                sort_addresses: Some(true),
209                ..Default::default()
210            },
211        )
212        .await
213        .map_err(|err| io::Error::other(format!("failed to call NameProvider.LookupIp: {}", err)))?
214        .map_err(|err| {
215            // Match stdlib's behavior, which maps all GAI errors but EAI_SYSTEM
216            // to io::ErrorKind::Other.
217            io::Error::other(format!("NameProvider.LookupIp failure: {:?}", err))
218        })?;
219
220    Ok(addresses
221        .ok_or_else(|| io::Error::other("addresses not provided in NameProvider response"))?
222        .into_iter()
223        .map(move |addr| {
224            let fidl_fuchsia_net_ext::IpAddress(addr) = addr.into();
225            SocketAddr::new(addr, port)
226        }))
227}
228
229async fn parse_ip_addr_with_provider(
230    provider: &impl ProviderConnector,
231    host: &str,
232    port: u16,
233) -> Result<Option<SocketAddr>, io::Error> {
234    parse_ip_addr(host, port, |zone_id| async {
235        let proxy = provider.connect()?.into_proxy();
236        let id = proxy
237            .interface_name_to_index(zone_id)
238            .await
239            .map_err(|err| {
240                io::Error::other(format!(
241                    "failed to get interface index from socket provider: {}",
242                    err
243                ))
244            })?
245            .map_err(|status| zx::Status::from_raw(status).into_io_error())?;
246
247        // SocketAddrV6 only works with 32 bit scope ids.
248        u32::try_from(id).map_err(|TryFromIntError { .. }| {
249            io::Error::other("interface index too large to convert to scope_id")
250        })
251    })
252    .await
253}
254
255pub(crate) fn stream_socket<T: ProviderConnector>(
256    provider: &T,
257    domain: Domain,
258    proto: StreamSocketProtocol,
259) -> io::Result<socket2::Socket> {
260    let socket_provider = provider.connect()?.into_sync_proxy();
261    let sock = socket_provider
262        .stream_socket(domain, proto, zx::MonotonicInstant::INFINITE)
263        .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?
264        .map_err(|e| std::io::Error::from_raw_os_error(e.into_primitive()))?;
265
266    Ok(fdio::create_fd(sock.into())?.into())
267}
268
269#[cfg(test)]
270mod test {
271    use super::*;
272    use crate::*;
273    use assert_matches::assert_matches;
274    use fidl::endpoints::{create_proxy_and_stream, create_request_stream};
275    use fidl_fuchsia_net_name::{LookupError, LookupRequest};
276    use fidl_fuchsia_posix_socket::ProviderRequest;
277    use fuchsia_async::net::TcpListener;
278    use fuchsia_async::{self as fasync, LocalExecutor};
279    use futures::prelude::*;
280    use std::cell::RefCell;
281
282    struct PanicConnector;
283
284    impl ProviderConnector for PanicConnector {
285        fn connect(&self) -> Result<ClientEnd<ProviderMarker>, io::Error> {
286            panic!("should not be trying to talk to the Provider service")
287        }
288    }
289
290    #[test]
291    fn can_create_client() {
292        let _exec = LocalExecutor::default();
293        let _client = new_client();
294    }
295
296    #[test]
297    fn can_create_https_client() {
298        let _exec = LocalExecutor::default();
299        let _client = new_https_client();
300    }
301
302    #[fasync::run_singlethreaded(test)]
303    async fn hyper_connector_sets_tcp_options() {
304        let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0);
305        let listener = TcpListener::bind(&addr).unwrap();
306        let addr = listener.local_addr().unwrap();
307
308        let idle = std::time::Duration::from_secs(36);
309        let interval = std::time::Duration::from_secs(47);
310        let count = 58;
311        let uri = format!("https://{}", addr).parse::<hyper::Uri>().unwrap();
312        let (TcpStream { stream }, _server) = future::try_join(
313            HyperConnector::from_tcp_options(TcpOptions {
314                keepalive_idle: Some(idle),
315                keepalive_interval: Some(interval),
316                keepalive_count: Some(count),
317                ..Default::default()
318            })
319            .call(uri),
320            listener.accept_stream().try_next(),
321        )
322        .await
323        .unwrap();
324
325        let stream = socket2::SockRef::from(stream.std());
326
327        assert_matches!(stream.keepalive(), Ok(v) if v);
328        assert_matches!(stream.keepalive_time(), Ok(v) if v == idle);
329        assert_matches!(stream.keepalive_interval(), Ok(v) if v == interval);
330        assert_matches!(stream.keepalive_retries(), Ok(v) if v == count);
331    }
332
333    #[fasync::run_singlethreaded(test)]
334    async fn test_parse_ipv6_addr_with_provider() {
335        let expected = "fe80::1:2:3:4".parse::<Ipv6Addr>().unwrap();
336
337        assert_matches!(
338            parse_ip_addr_with_provider(&PanicConnector, "[fe80::1:2:3:4%250]", 8080).await,
339            Ok(Some(addr)) if addr == SocketAddr::V6(SocketAddrV6::new(expected, 8080, 0, 0))
340        );
341
342        assert_matches!(
343            parse_ip_addr_with_provider(&PanicConnector, "[fe80::1:2:3:4%252]", 8080).await,
344            Ok(Some(addr)) if addr == SocketAddr::V6(SocketAddrV6::new(expected, 8080, 0, 2))
345        );
346    }
347
348    #[fasync::run_singlethreaded(test)]
349    async fn test_parse_ipv6_addr_with_provider_supports_interface_names() {
350        let connector = RealServiceConnector::new();
351        let expected = "fe80::1:2:3:4".parse::<Ipv6Addr>().unwrap();
352
353        assert_matches!(
354            parse_ip_addr_with_provider(&connector, "[fe80::1:2:3:4%25lo]", 8080).await,
355            Ok(Some(addr)) if addr == SocketAddr::V6(SocketAddrV6::new(expected, 8080, 0, 1))
356        );
357
358        assert_matches!(
359            parse_ip_addr_with_provider(&connector, "[fe80::1:2:3:4%25]", 8080).await,
360            Err(err) if err.kind() == io::ErrorKind::NotFound
361        );
362
363        assert_matches!(
364            parse_ip_addr_with_provider(&connector, "[fe80::1:2:3:4%25unknownif]", 8080).await,
365            Err(err) if err.kind() == io::ErrorKind::NotFound
366        );
367    }
368
369    #[fasync::run_singlethreaded(test)]
370    async fn test_parse_ipv6_addr_handles_connection_errors() {
371        struct ErrorConnector;
372
373        impl ProviderConnector for ErrorConnector {
374            fn connect(&self) -> Result<ClientEnd<ProviderMarker>, io::Error> {
375                Err(io::Error::other("something bad happened"))
376            }
377        }
378
379        assert_matches!(parse_ip_addr_with_provider(&ErrorConnector, "[fe80::1:2:3:4%25lo]", 8080).await,
380            Err(err) if err.kind() == io::ErrorKind::Other);
381    }
382
383    #[fasync::run_singlethreaded(test)]
384    async fn test_parse_ipv6_addr_handles_large_interface_indices() {
385        let (client_end, mut stream) = create_request_stream::<ProviderMarker>();
386
387        let provider_fut = async move {
388            while let Some(req) = stream.try_next().await.unwrap_or(None) {
389                match req {
390                    ProviderRequest::InterfaceNameToIndex { name: _, responder } => {
391                        responder.send(Ok(u64::MAX)).unwrap()
392                    }
393                    _ => panic!("unexpected request"),
394                }
395            }
396        };
397
398        struct ErrorConnector {
399            client_end: RefCell<Option<ClientEnd<ProviderMarker>>>,
400        }
401
402        impl ProviderConnector for ErrorConnector {
403            fn connect(&self) -> Result<ClientEnd<ProviderMarker>, io::Error> {
404                let client_end = self.client_end.borrow_mut().take().unwrap();
405                Ok(client_end)
406            }
407        }
408
409        let connector = ErrorConnector { client_end: RefCell::new(Some(client_end)) };
410
411        let parse_ip_fut = parse_ip_addr_with_provider(&connector, "[fe80::1:2:3:4%25lo]", 8080);
412
413        // Join the two futures to make sure they both complete.
414        let ((), res) = future::join(provider_fut, parse_ip_fut).await;
415
416        assert_matches!(res, Err(err) if err.kind() == io::ErrorKind::Other);
417    }
418
419    struct ProxyConnector<T> {
420        proxy: T,
421    }
422
423    impl LookupConnector for ProxyConnector<LookupProxy> {
424        fn connect(&self) -> Result<LookupProxy, io::Error> {
425            Ok(self.proxy.clone())
426        }
427    }
428
429    #[fasync::run_singlethreaded(test)]
430    async fn test_resolve_ip_addr() {
431        let (sender, receiver) =
432            futures::channel::mpsc::unbounded::<Result<LookupResult, LookupError>>();
433        let (proxy, stream) = create_proxy_and_stream::<LookupMarker>();
434        const TEST_HOSTNAME: &'static str = "foobar.com";
435        let name_lookup_fut = stream.zip(receiver).for_each(|(req, rsp)| match req {
436            Ok(LookupRequest::LookupIp { hostname, options, responder }) => {
437                assert_eq!(hostname.as_str(), TEST_HOSTNAME);
438                assert_eq!(
439                    options,
440                    LookupIpOptions {
441                        ipv4_lookup: Some(true),
442                        ipv6_lookup: Some(true),
443                        sort_addresses: Some(true),
444                        ..Default::default()
445                    }
446                );
447                let rsp = rsp.as_ref().map_err(|e| *e);
448                futures::future::ready(responder.send(rsp).expect("failed to send FIDL response"))
449            }
450            req => panic!("unexpected item in request stream {:?}", req),
451        });
452
453        let connector = ProxyConnector { proxy };
454
455        let ip_v4 = Ipv4Addr::LOCALHOST.into();
456        let ip_v6 = Ipv6Addr::LOCALHOST.into();
457        const PORT1: u16 = 1234;
458        const PORT2: u16 = 4321;
459
460        let test_fut = async move {
461            // Test expectation's error variant is a tuple of the lookup error
462            // to inject and the expected io error kind returned.
463            type Expectation = Result<Vec<std::net::IpAddr>, (LookupError, io::ErrorKind)>;
464            let test_resolve = |port, expect: Expectation| {
465                let fidl_response = expect
466                    .clone()
467                    .map(|addrs| LookupResult {
468                        addresses: Some(
469                            addrs
470                                .into_iter()
471                                .map(|std| fidl_fuchsia_net_ext::IpAddress(std).into())
472                                .collect(),
473                        ),
474                        ..Default::default()
475                    })
476                    .map_err(|(fidl_err, _io_err)| fidl_err);
477                let expect = expect
478                    .map(|addrs| {
479                        addrs.into_iter().map(|addr| SocketAddr::new(addr, port)).collect()
480                    })
481                    .map_err(|(_fidl_err, io_err)| io_err);
482                let () = sender.unbounded_send(fidl_response).expect("failed to send expectation");
483                resolve_ip_addr(&connector, TEST_HOSTNAME, port)
484                    .map_ok(Iterator::collect::<Vec<_>>)
485                    // Map IO error to kind so we can do equality.
486                    .map_err(|err| err.kind())
487                    .map(move |result| {
488                        assert_eq!(result, expect);
489                    })
490            };
491            let () = test_resolve(PORT1, Ok(vec![ip_v4])).await;
492            let () = test_resolve(PORT2, Ok(vec![ip_v6])).await;
493            let () = test_resolve(PORT1, Ok(vec![ip_v4, ip_v6])).await;
494            let () = test_resolve(PORT1, Err((LookupError::NotFound, io::ErrorKind::Other))).await;
495        };
496
497        let ((), ()) = futures::future::join(name_lookup_fut, test_fut).await;
498    }
499}