Skip to main content

fuchsia_hyper/
fuchsia.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::happy_eyeballs::{self, RealSocketConnector};
6use crate::{
7    HyperConnectorFuture, SocketOptions, TcpOptions, TcpStream, connect_and_bind_device,
8    parse_ip_addr,
9};
10use fidl::endpoints::{ClientEnd, create_endpoints};
11use fidl_connector::{Connect, ServiceReconnector};
12use fidl_fuchsia_net_name::{LookupIpOptions, LookupMarker, LookupProxy, LookupResult};
13use fidl_fuchsia_posix_socket::{Domain, ProviderMarker, StreamSocketProtocol};
14use fuchsia_async::net;
15
16use futures::future::{Future, FutureExt};
17use futures::io;
18use futures::task::{Context, Poll};
19use http::uri::{Scheme, Uri};
20use hyper::service::Service;
21use rustls::RootCertStore;
22use std::convert::TryFrom as _;
23use std::net::SocketAddr;
24use std::num::TryFromIntError;
25use std::sync::{Arc, LazyLock};
26use zx::StatusExt;
27
28pub fn new_root_cert_store() -> Arc<RootCertStore> {
29    // It can be expensive to parse the certs, so cache them
30    static ROOT_STORE: LazyLock<Arc<RootCertStore>> = LazyLock::new(|| {
31        let mut root_store = rustls::RootCertStore::empty();
32
33        root_store.add_trust_anchors(webpki_roots_fuchsia::TLS_SERVER_ROOTS.iter().map(|cert| {
34            rustls::OwnedTrustAnchor::from_subject_spki_name_constraints(
35                cert.subject,
36                cert.spki,
37                cert.name_constraints,
38            )
39        }));
40
41        Arc::new(root_store)
42    });
43
44    Arc::clone(&ROOT_STORE)
45}
46
47/// A Fuchsia-compatible implementation of hyper's `Connect` trait which allows
48/// creating a TcpStream to a particular destination.
49#[derive(Clone)]
50pub struct HyperConnector {
51    tcp_options: TcpOptions,
52    socket_options: SocketOptions,
53    provider: RealServiceConnector,
54}
55
56impl From<(TcpOptions, SocketOptions)> for HyperConnector {
57    fn from((tcp_options, socket_options): (TcpOptions, SocketOptions)) -> Self {
58        Self { tcp_options, socket_options, provider: RealServiceConnector::new() }
59    }
60}
61
62impl HyperConnector {
63    pub fn new() -> Self {
64        Self::from_tcp_options(TcpOptions::default())
65    }
66
67    pub fn from_tcp_options(tcp_options: TcpOptions) -> Self {
68        Self {
69            tcp_options,
70            socket_options: SocketOptions::default(),
71            provider: RealServiceConnector::new(),
72        }
73    }
74}
75
76impl Service<Uri> for HyperConnector {
77    type Response = TcpStream;
78    type Error = std::io::Error;
79    type Future = HyperConnectorFuture;
80
81    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
82        // This connector is always ready, but others might not be.
83        Poll::Ready(Ok(()))
84    }
85
86    fn call(&mut self, dst: Uri) -> Self::Future {
87        let self_ = self.clone();
88        HyperConnectorFuture { fut: Box::pin(async move { self_.call_async(dst).await }) }
89    }
90}
91
92impl HyperConnector {
93    async fn call_async(&self, dst: Uri) -> Result<TcpStream, io::Error> {
94        let host = dst.host().ok_or_else(|| io::Error::other("destination host is unspecified"))?;
95        let port = match dst.port() {
96            Some(port) => port.as_u16(),
97            None => {
98                if dst.scheme() == Some(&Scheme::HTTPS) {
99                    443
100                } else {
101                    80
102                }
103            }
104        };
105
106        let stream = connect_to_addr(
107            self.provider.clone(),
108            host,
109            port,
110            self.socket_options.bind_device.as_deref(),
111        )
112        .await?;
113        let () = self.tcp_options.apply(stream.std())?;
114
115        Ok(TcpStream { stream })
116    }
117}
118
119#[allow(dead_code)] // TODO(https://fxbug.dev/421409340)
120#[derive(Clone)]
121pub struct Executor;
122
123impl<F: Future + Send + 'static> hyper::rt::Executor<F> for Executor {
124    fn execute(&self, fut: F) {
125        fuchsia_async::Task::spawn(fut.map(|_| ())).detach()
126    }
127}
128
129#[allow(dead_code)] // TODO(https://fxbug.dev/421409340)
130#[derive(Clone)]
131pub struct LocalExecutor;
132
133impl<F: Future + 'static> hyper::rt::Executor<F> for LocalExecutor {
134    fn execute(&self, fut: F) {
135        fuchsia_async::Task::local(fut.map(drop)).detach()
136    }
137}
138
139pub(crate) trait ProviderConnector {
140    fn connect(&self) -> Result<ClientEnd<ProviderMarker>, io::Error>;
141}
142
143trait LookupConnector {
144    fn connect(&self) -> Result<LookupProxy, io::Error>;
145}
146
147#[derive(Clone)]
148struct RealServiceConnector {
149    name_lookup_connector: ServiceReconnector<LookupMarker>,
150}
151
152impl RealServiceConnector {
153    fn new() -> Self {
154        RealServiceConnector { name_lookup_connector: ServiceReconnector::<LookupMarker>::new() }
155    }
156}
157
158impl ProviderConnector for RealServiceConnector {
159    fn connect(&self) -> Result<ClientEnd<ProviderMarker>, io::Error> {
160        let (client_end, server_end) = create_endpoints::<ProviderMarker>();
161        fuchsia_component::client::connect_channel_to_protocol::<ProviderMarker>(server_end.into())
162            .map_err(|err| {
163                io::Error::other(format!("failed to connect to socket provider service: {}", err))
164            })?;
165        Ok(client_end)
166    }
167}
168
169impl LookupConnector for RealServiceConnector {
170    fn connect(&self) -> Result<LookupProxy, io::Error> {
171        self.name_lookup_connector.connect().map_err(|err| {
172            io::Error::other(format!("failed to connect to name lookup service: {}", err))
173        })
174    }
175}
176
177async fn connect_to_addr<T: ProviderConnector + LookupConnector + 'static>(
178    provider: T,
179    host: &str,
180    port: u16,
181    bind_device: Option<&str>,
182) -> Result<net::TcpStream, io::Error> {
183    if let Some(addr) = parse_ip_addr_with_provider(&provider, host, port).await? {
184        return connect_and_bind_device(&provider, addr, bind_device)?.await;
185    }
186
187    happy_eyeballs::happy_eyeballs(
188        resolve_ip_addr(&provider, host, port).await?,
189        RealSocketConnector::new(provider),
190        happy_eyeballs::RECOMMENDED_MIN_CONN_ATT_DELAY,
191        happy_eyeballs::RECOMMENDED_CONN_ATT_DELAY,
192        bind_device,
193    )
194    .await
195}
196
197async fn resolve_ip_addr<T: LookupConnector>(
198    name_lookup: &T,
199    host: &str,
200    port: u16,
201) -> Result<impl Iterator<Item = SocketAddr> + use<T>, io::Error> {
202    let proxy = name_lookup.connect()?;
203    let LookupResult { addresses, .. } = proxy
204        .lookup_ip(
205            host,
206            &LookupIpOptions {
207                ipv4_lookup: Some(true),
208                ipv6_lookup: Some(true),
209                sort_addresses: Some(true),
210                ..Default::default()
211            },
212        )
213        .await
214        .map_err(|err| io::Error::other(format!("failed to call NameProvider.LookupIp: {}", err)))?
215        .map_err(|err| {
216            // Match stdlib's behavior, which maps all GAI errors but EAI_SYSTEM
217            // to io::ErrorKind::Other.
218            io::Error::other(format!("NameProvider.LookupIp failure: {:?}", err))
219        })?;
220
221    Ok(addresses
222        .ok_or_else(|| io::Error::other("addresses not provided in NameProvider response"))?
223        .into_iter()
224        .map(move |addr| {
225            let fidl_fuchsia_net_ext::IpAddress(addr) = addr.into();
226            SocketAddr::new(addr, port)
227        }))
228}
229
230async fn parse_ip_addr_with_provider(
231    provider: &impl ProviderConnector,
232    host: &str,
233    port: u16,
234) -> Result<Option<SocketAddr>, io::Error> {
235    parse_ip_addr(host, port, |zone_id| async {
236        let proxy = provider.connect()?.into_proxy();
237        let id = proxy
238            .interface_name_to_index(zone_id)
239            .await
240            .map_err(|err| {
241                io::Error::other(format!(
242                    "failed to get interface index from socket provider: {}",
243                    err
244                ))
245            })?
246            .map_err(|status| zx::Status::from_raw(status).into_io_error())?;
247
248        // SocketAddrV6 only works with 32 bit scope ids.
249        u32::try_from(id).map_err(|TryFromIntError { .. }| {
250            io::Error::other("interface index too large to convert to scope_id")
251        })
252    })
253    .await
254}
255
256pub(crate) fn stream_socket<T: ProviderConnector>(
257    provider: &T,
258    domain: Domain,
259    proto: StreamSocketProtocol,
260) -> io::Result<socket2::Socket> {
261    let socket_provider = provider.connect()?.into_sync_proxy();
262    let sock = socket_provider
263        .stream_socket(domain, proto, zx::MonotonicInstant::INFINITE)
264        .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?
265        .map_err(|e| std::io::Error::from_raw_os_error(e.into_primitive()))?;
266
267    Ok(fdio::create_fd(sock.into()).map_err(|s| s.into_io_error())?.into())
268}
269
270#[cfg(test)]
271mod test {
272    use super::*;
273    use crate::*;
274    use assert_matches::assert_matches;
275    use fidl::endpoints::{create_proxy_and_stream, create_request_stream};
276    use fidl_fuchsia_net_name::{LookupError, LookupRequest};
277    use fidl_fuchsia_posix_socket::ProviderRequest;
278    use fuchsia_async::net::TcpListener;
279    use fuchsia_async::{self as fasync, LocalExecutor};
280    use futures::prelude::*;
281    use std::cell::RefCell;
282
283    struct PanicConnector;
284
285    impl ProviderConnector for PanicConnector {
286        fn connect(&self) -> Result<ClientEnd<ProviderMarker>, io::Error> {
287            panic!("should not be trying to talk to the Provider service")
288        }
289    }
290
291    #[test]
292    fn can_create_client() {
293        let _exec = LocalExecutor::default();
294        let _client = new_client();
295    }
296
297    #[test]
298    fn can_create_https_client() {
299        let _exec = LocalExecutor::default();
300        let _client = new_https_client();
301    }
302
303    #[fasync::run_singlethreaded(test)]
304    async fn hyper_connector_sets_tcp_options() {
305        let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0);
306        let listener = TcpListener::bind(&addr).unwrap();
307        let addr = listener.local_addr().unwrap();
308
309        let idle = std::time::Duration::from_secs(36);
310        let interval = std::time::Duration::from_secs(47);
311        let count = 58;
312        let uri = format!("https://{}", addr).parse::<hyper::Uri>().unwrap();
313        let (TcpStream { stream }, _server) = future::try_join(
314            HyperConnector::from_tcp_options(TcpOptions {
315                keepalive_idle: Some(idle),
316                keepalive_interval: Some(interval),
317                keepalive_count: Some(count),
318                ..Default::default()
319            })
320            .call(uri),
321            listener.accept_stream().try_next(),
322        )
323        .await
324        .unwrap();
325
326        let stream = socket2::SockRef::from(stream.std());
327
328        assert_matches!(stream.keepalive(), Ok(v) if v);
329        assert_matches!(stream.keepalive_time(), Ok(v) if v == idle);
330        assert_matches!(stream.keepalive_interval(), Ok(v) if v == interval);
331        assert_matches!(stream.keepalive_retries(), Ok(v) if v == count);
332    }
333
334    #[fasync::run_singlethreaded(test)]
335    async fn test_parse_ipv6_addr_with_provider() {
336        let expected = "fe80::1:2:3:4".parse::<Ipv6Addr>().unwrap();
337
338        assert_matches!(
339            parse_ip_addr_with_provider(&PanicConnector, "[fe80::1:2:3:4%250]", 8080).await,
340            Ok(Some(addr)) if addr == SocketAddr::V6(SocketAddrV6::new(expected, 8080, 0, 0))
341        );
342
343        assert_matches!(
344            parse_ip_addr_with_provider(&PanicConnector, "[fe80::1:2:3:4%252]", 8080).await,
345            Ok(Some(addr)) if addr == SocketAddr::V6(SocketAddrV6::new(expected, 8080, 0, 2))
346        );
347    }
348
349    #[fasync::run_singlethreaded(test)]
350    async fn test_parse_ipv6_addr_with_provider_supports_interface_names() {
351        let connector = RealServiceConnector::new();
352        let expected = "fe80::1:2:3:4".parse::<Ipv6Addr>().unwrap();
353
354        assert_matches!(
355            parse_ip_addr_with_provider(&connector, "[fe80::1:2:3:4%25lo]", 8080).await,
356            Ok(Some(addr)) if addr == SocketAddr::V6(SocketAddrV6::new(expected, 8080, 0, 1))
357        );
358
359        assert_matches!(
360            parse_ip_addr_with_provider(&connector, "[fe80::1:2:3:4%25]", 8080).await,
361            Err(err) if err.kind() == io::ErrorKind::NotFound
362        );
363
364        assert_matches!(
365            parse_ip_addr_with_provider(&connector, "[fe80::1:2:3:4%25unknownif]", 8080).await,
366            Err(err) if err.kind() == io::ErrorKind::NotFound
367        );
368    }
369
370    #[fasync::run_singlethreaded(test)]
371    async fn test_parse_ipv6_addr_handles_connection_errors() {
372        struct ErrorConnector;
373
374        impl ProviderConnector for ErrorConnector {
375            fn connect(&self) -> Result<ClientEnd<ProviderMarker>, io::Error> {
376                Err(io::Error::other("something bad happened"))
377            }
378        }
379
380        assert_matches!(parse_ip_addr_with_provider(&ErrorConnector, "[fe80::1:2:3:4%25lo]", 8080).await,
381            Err(err) if err.kind() == io::ErrorKind::Other);
382    }
383
384    #[fasync::run_singlethreaded(test)]
385    async fn test_parse_ipv6_addr_handles_large_interface_indices() {
386        let (client_end, mut stream) = create_request_stream::<ProviderMarker>();
387
388        let provider_fut = async move {
389            while let Some(req) = stream.try_next().await.unwrap_or(None) {
390                match req {
391                    ProviderRequest::InterfaceNameToIndex { name: _, responder } => {
392                        responder.send(Ok(u64::MAX)).unwrap()
393                    }
394                    _ => panic!("unexpected request"),
395                }
396            }
397        };
398
399        struct ErrorConnector {
400            client_end: RefCell<Option<ClientEnd<ProviderMarker>>>,
401        }
402
403        impl ProviderConnector for ErrorConnector {
404            fn connect(&self) -> Result<ClientEnd<ProviderMarker>, io::Error> {
405                let client_end = self.client_end.borrow_mut().take().unwrap();
406                Ok(client_end)
407            }
408        }
409
410        let connector = ErrorConnector { client_end: RefCell::new(Some(client_end)) };
411
412        let parse_ip_fut = parse_ip_addr_with_provider(&connector, "[fe80::1:2:3:4%25lo]", 8080);
413
414        // Join the two futures to make sure they both complete.
415        let ((), res) = future::join(provider_fut, parse_ip_fut).await;
416
417        assert_matches!(res, Err(err) if err.kind() == io::ErrorKind::Other);
418    }
419
420    struct ProxyConnector<T> {
421        proxy: T,
422    }
423
424    impl LookupConnector for ProxyConnector<LookupProxy> {
425        fn connect(&self) -> Result<LookupProxy, io::Error> {
426            Ok(self.proxy.clone())
427        }
428    }
429
430    #[fasync::run_singlethreaded(test)]
431    async fn test_resolve_ip_addr() {
432        let (sender, receiver) =
433            futures::channel::mpsc::unbounded::<Result<LookupResult, LookupError>>();
434        let (proxy, stream) = create_proxy_and_stream::<LookupMarker>();
435        const TEST_HOSTNAME: &'static str = "foobar.com";
436        let name_lookup_fut = stream.zip(receiver).for_each(|(req, rsp)| match req {
437            Ok(LookupRequest::LookupIp { hostname, options, responder }) => {
438                assert_eq!(hostname.as_str(), TEST_HOSTNAME);
439                assert_eq!(
440                    options,
441                    LookupIpOptions {
442                        ipv4_lookup: Some(true),
443                        ipv6_lookup: Some(true),
444                        sort_addresses: Some(true),
445                        ..Default::default()
446                    }
447                );
448                let rsp = rsp.as_ref().map_err(|e| *e);
449                futures::future::ready(responder.send(rsp).expect("failed to send FIDL response"))
450            }
451            req => panic!("unexpected item in request stream {:?}", req),
452        });
453
454        let connector = ProxyConnector { proxy };
455
456        let ip_v4 = Ipv4Addr::LOCALHOST.into();
457        let ip_v6 = Ipv6Addr::LOCALHOST.into();
458        const PORT1: u16 = 1234;
459        const PORT2: u16 = 4321;
460
461        let test_fut = async move {
462            // Test expectation's error variant is a tuple of the lookup error
463            // to inject and the expected io error kind returned.
464            type Expectation = Result<Vec<std::net::IpAddr>, (LookupError, io::ErrorKind)>;
465            let test_resolve = |port, expect: Expectation| {
466                let fidl_response = expect
467                    .clone()
468                    .map(|addrs| LookupResult {
469                        addresses: Some(
470                            addrs
471                                .into_iter()
472                                .map(|std| fidl_fuchsia_net_ext::IpAddress(std).into())
473                                .collect(),
474                        ),
475                        ..Default::default()
476                    })
477                    .map_err(|(fidl_err, _io_err)| fidl_err);
478                let expect = expect
479                    .map(|addrs| {
480                        addrs.into_iter().map(|addr| SocketAddr::new(addr, port)).collect()
481                    })
482                    .map_err(|(_fidl_err, io_err)| io_err);
483                let () = sender.unbounded_send(fidl_response).expect("failed to send expectation");
484                resolve_ip_addr(&connector, TEST_HOSTNAME, port)
485                    .map_ok(Iterator::collect::<Vec<_>>)
486                    // Map IO error to kind so we can do equality.
487                    .map_err(|err| err.kind())
488                    .map(move |result| {
489                        assert_eq!(result, expect);
490                    })
491            };
492            let () = test_resolve(PORT1, Ok(vec![ip_v4])).await;
493            let () = test_resolve(PORT2, Ok(vec![ip_v6])).await;
494            let () = test_resolve(PORT1, Ok(vec![ip_v4, ip_v6])).await;
495            let () = test_resolve(PORT1, Err((LookupError::NotFound, io::ErrorKind::Other))).await;
496        };
497
498        let ((), ()) = futures::future::join(name_lookup_fut, test_fut).await;
499    }
500}