fuchsia_hyper/
fuchsia.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::happy_eyeballs::{self, RealSocketConnector};
6use crate::{
7    HyperConnectorFuture, SocketOptions, TcpOptions, TcpStream, connect_and_bind_device,
8    parse_ip_addr,
9};
10use fidl_connector::{Connect, ServiceReconnector};
11use fidl_fuchsia_net_name::{LookupIpOptions, LookupMarker, LookupProxy, LookupResult};
12use fidl_fuchsia_posix_socket::{ProviderMarker, ProviderProxy};
13use fuchsia_async::net;
14
15use futures::future::{Future, FutureExt};
16use futures::io;
17use futures::task::{Context, Poll};
18use http::uri::{Scheme, Uri};
19use hyper::service::Service;
20use rustls::RootCertStore;
21use std::convert::TryFrom as _;
22use std::net::SocketAddr;
23use std::num::TryFromIntError;
24use std::sync::{Arc, LazyLock};
25
26pub fn new_root_cert_store() -> Arc<RootCertStore> {
27    // It can be expensive to parse the certs, so cache them
28    static ROOT_STORE: LazyLock<Arc<RootCertStore>> = LazyLock::new(|| {
29        let mut root_store = rustls::RootCertStore::empty();
30
31        root_store.add_trust_anchors(webpki_roots_fuchsia::TLS_SERVER_ROOTS.iter().map(|cert| {
32            rustls::OwnedTrustAnchor::from_subject_spki_name_constraints(
33                cert.subject,
34                cert.spki,
35                cert.name_constraints,
36            )
37        }));
38
39        Arc::new(root_store)
40    });
41
42    Arc::clone(&ROOT_STORE)
43}
44
45/// A Fuchsia-compatible implementation of hyper's `Connect` trait which allows
46/// creating a TcpStream to a particular destination.
47#[derive(Clone)]
48pub struct HyperConnector {
49    tcp_options: TcpOptions,
50    socket_options: SocketOptions,
51    provider: RealServiceConnector,
52}
53
54impl From<(TcpOptions, SocketOptions)> for HyperConnector {
55    fn from((tcp_options, socket_options): (TcpOptions, SocketOptions)) -> Self {
56        Self { tcp_options, socket_options, provider: RealServiceConnector::new() }
57    }
58}
59
60impl HyperConnector {
61    pub fn new() -> Self {
62        Self::from_tcp_options(TcpOptions::default())
63    }
64
65    pub fn from_tcp_options(tcp_options: TcpOptions) -> Self {
66        Self {
67            tcp_options,
68            socket_options: SocketOptions::default(),
69            provider: RealServiceConnector::new(),
70        }
71    }
72}
73
74impl Service<Uri> for HyperConnector {
75    type Response = TcpStream;
76    type Error = std::io::Error;
77    type Future = HyperConnectorFuture;
78
79    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
80        // This connector is always ready, but others might not be.
81        Poll::Ready(Ok(()))
82    }
83
84    fn call(&mut self, dst: Uri) -> Self::Future {
85        let self_ = self.clone();
86        HyperConnectorFuture { fut: Box::pin(async move { self_.call_async(dst).await }) }
87    }
88}
89
90impl HyperConnector {
91    async fn call_async(&self, dst: Uri) -> Result<TcpStream, io::Error> {
92        let host = dst.host().ok_or_else(|| io::Error::other("destination host is unspecified"))?;
93        let port = match dst.port() {
94            Some(port) => port.as_u16(),
95            None => {
96                if dst.scheme() == Some(&Scheme::HTTPS) {
97                    443
98                } else {
99                    80
100                }
101            }
102        };
103
104        let stream =
105            connect_to_addr(&self.provider, host, port, self.socket_options.bind_device.as_deref())
106                .await?;
107        let () = self.tcp_options.apply(stream.std())?;
108
109        Ok(TcpStream { stream })
110    }
111}
112
113#[allow(dead_code)] // TODO(https://fxbug.dev/421409340)
114#[derive(Clone)]
115pub struct Executor;
116
117impl<F: Future + Send + 'static> hyper::rt::Executor<F> for Executor {
118    fn execute(&self, fut: F) {
119        fuchsia_async::Task::spawn(fut.map(|_| ())).detach()
120    }
121}
122
123#[allow(dead_code)] // TODO(https://fxbug.dev/421409340)
124#[derive(Clone)]
125pub struct LocalExecutor;
126
127impl<F: Future + 'static> hyper::rt::Executor<F> for LocalExecutor {
128    fn execute(&self, fut: F) {
129        fuchsia_async::Task::local(fut.map(drop)).detach()
130    }
131}
132
133trait ProviderConnector {
134    fn connect(&self) -> Result<ProviderProxy, io::Error>;
135}
136
137trait LookupConnector {
138    fn connect(&self) -> Result<LookupProxy, io::Error>;
139}
140
141#[derive(Clone)]
142struct RealServiceConnector {
143    socket_provider_connector: ServiceReconnector<ProviderMarker>,
144    name_lookup_connector: ServiceReconnector<LookupMarker>,
145}
146
147impl RealServiceConnector {
148    fn new() -> Self {
149        RealServiceConnector {
150            socket_provider_connector: ServiceReconnector::<ProviderMarker>::new(),
151            name_lookup_connector: ServiceReconnector::<LookupMarker>::new(),
152        }
153    }
154}
155
156impl ProviderConnector for RealServiceConnector {
157    fn connect(&self) -> Result<ProviderProxy, io::Error> {
158        self.socket_provider_connector.connect().map_err(|err| {
159            io::Error::other(format!("failed to connect to socket provider service: {}", err))
160        })
161    }
162}
163
164impl LookupConnector for RealServiceConnector {
165    fn connect(&self) -> Result<LookupProxy, io::Error> {
166        self.name_lookup_connector.connect().map_err(|err| {
167            io::Error::other(format!("failed to connect to name lookup service: {}", err))
168        })
169    }
170}
171
172async fn connect_to_addr<T: ProviderConnector + LookupConnector>(
173    provider: &T,
174    host: &str,
175    port: u16,
176    bind_device: Option<&str>,
177) -> Result<net::TcpStream, io::Error> {
178    if let Some(addr) = parse_ip_addr_with_provider(provider, host, port).await? {
179        return connect_and_bind_device(addr, bind_device)?.await;
180    }
181
182    happy_eyeballs::happy_eyeballs(
183        resolve_ip_addr(provider, host, port).await?,
184        RealSocketConnector,
185        happy_eyeballs::RECOMMENDED_MIN_CONN_ATT_DELAY,
186        happy_eyeballs::RECOMMENDED_CONN_ATT_DELAY,
187        bind_device,
188    )
189    .await
190}
191
192async fn resolve_ip_addr(
193    name_lookup: &impl LookupConnector,
194    host: &str,
195    port: u16,
196) -> Result<impl Iterator<Item = SocketAddr>, io::Error> {
197    let proxy = name_lookup.connect()?;
198    let LookupResult { addresses, .. } = proxy
199        .lookup_ip(
200            host,
201            &LookupIpOptions {
202                ipv4_lookup: Some(true),
203                ipv6_lookup: Some(true),
204                sort_addresses: Some(true),
205                ..Default::default()
206            },
207        )
208        .await
209        .map_err(|err| io::Error::other(format!("failed to call NameProvider.LookupIp: {}", err)))?
210        .map_err(|err| {
211            // Match stdlib's behavior, which maps all GAI errors but EAI_SYSTEM
212            // to io::ErrorKind::Other.
213            io::Error::other(format!("NameProvider.LookupIp failure: {:?}", err))
214        })?;
215
216    Ok(addresses
217        .ok_or_else(|| io::Error::other("addresses not provided in NameProvider response"))?
218        .into_iter()
219        .map(move |addr| {
220            let fidl_fuchsia_net_ext::IpAddress(addr) = addr.into();
221            SocketAddr::new(addr, port)
222        }))
223}
224
225async fn parse_ip_addr_with_provider(
226    provider: &impl ProviderConnector,
227    host: &str,
228    port: u16,
229) -> Result<Option<SocketAddr>, io::Error> {
230    parse_ip_addr(host, port, |zone_id| async {
231        let proxy = provider.connect()?;
232        let id = proxy
233            .interface_name_to_index(zone_id)
234            .await
235            .map_err(|err| {
236                io::Error::other(format!(
237                    "failed to get interface index from socket provider: {}",
238                    err
239                ))
240            })?
241            .map_err(|status| zx::Status::from_raw(status).into_io_error())?;
242
243        // SocketAddrV6 only works with 32 bit scope ids.
244        u32::try_from(id).map_err(|TryFromIntError { .. }| {
245            io::Error::other("interface index too large to convert to scope_id")
246        })
247    })
248    .await
249}
250
251#[cfg(test)]
252mod test {
253    use super::*;
254    use crate::*;
255    use assert_matches::assert_matches;
256    use fidl::endpoints::create_proxy_and_stream;
257    use fidl_fuchsia_net_name::{LookupError, LookupRequest};
258    use fidl_fuchsia_posix_socket::ProviderRequest;
259    use fuchsia_async::net::TcpListener;
260    use fuchsia_async::{self as fasync, LocalExecutor};
261    use futures::prelude::*;
262    use std::cell::RefCell;
263
264    struct PanicConnector;
265
266    impl ProviderConnector for PanicConnector {
267        fn connect(&self) -> Result<ProviderProxy, io::Error> {
268            panic!("should not be trying to talk to the Provider service")
269        }
270    }
271
272    #[test]
273    fn can_create_client() {
274        let _exec = LocalExecutor::default();
275        let _client = new_client();
276    }
277
278    #[test]
279    fn can_create_https_client() {
280        let _exec = LocalExecutor::default();
281        let _client = new_https_client();
282    }
283
284    #[fasync::run_singlethreaded(test)]
285    async fn hyper_connector_sets_tcp_options() {
286        let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0);
287        let listener = TcpListener::bind(&addr).unwrap();
288        let addr = listener.local_addr().unwrap();
289
290        let idle = std::time::Duration::from_secs(36);
291        let interval = std::time::Duration::from_secs(47);
292        let count = 58;
293        let uri = format!("https://{}", addr).parse::<hyper::Uri>().unwrap();
294        let (TcpStream { stream }, _server) = future::try_join(
295            HyperConnector::from_tcp_options(TcpOptions {
296                keepalive_idle: Some(idle),
297                keepalive_interval: Some(interval),
298                keepalive_count: Some(count),
299                ..Default::default()
300            })
301            .call(uri),
302            listener.accept_stream().try_next(),
303        )
304        .await
305        .unwrap();
306
307        let stream = socket2::SockRef::from(stream.std());
308
309        assert_matches!(stream.keepalive(), Ok(v) if v);
310        assert_matches!(stream.keepalive_time(), Ok(v) if v == idle);
311        assert_matches!(stream.keepalive_interval(), Ok(v) if v == interval);
312        assert_matches!(stream.keepalive_retries(), Ok(v) if v == count);
313    }
314
315    #[fasync::run_singlethreaded(test)]
316    async fn test_parse_ipv6_addr_with_provider() {
317        let expected = "fe80::1:2:3:4".parse::<Ipv6Addr>().unwrap();
318
319        assert_matches!(
320            parse_ip_addr_with_provider(&PanicConnector, "[fe80::1:2:3:4%250]", 8080).await,
321            Ok(Some(addr)) if addr == SocketAddr::V6(SocketAddrV6::new(expected, 8080, 0, 0))
322        );
323
324        assert_matches!(
325            parse_ip_addr_with_provider(&PanicConnector, "[fe80::1:2:3:4%252]", 8080).await,
326            Ok(Some(addr)) if addr == SocketAddr::V6(SocketAddrV6::new(expected, 8080, 0, 2))
327        );
328    }
329
330    #[fasync::run_singlethreaded(test)]
331    async fn test_parse_ipv6_addr_with_provider_supports_interface_names() {
332        let connector = RealServiceConnector::new();
333        let expected = "fe80::1:2:3:4".parse::<Ipv6Addr>().unwrap();
334
335        assert_matches!(
336            parse_ip_addr_with_provider(&connector, "[fe80::1:2:3:4%25lo]", 8080).await,
337            Ok(Some(addr)) if addr == SocketAddr::V6(SocketAddrV6::new(expected, 8080, 0, 1))
338        );
339
340        assert_matches!(
341            parse_ip_addr_with_provider(&connector, "[fe80::1:2:3:4%25]", 8080).await,
342            Err(err) if err.kind() == io::ErrorKind::NotFound
343        );
344
345        assert_matches!(
346            parse_ip_addr_with_provider(&connector, "[fe80::1:2:3:4%25unknownif]", 8080).await,
347            Err(err) if err.kind() == io::ErrorKind::NotFound
348        );
349    }
350
351    #[fasync::run_singlethreaded(test)]
352    async fn test_parse_ipv6_addr_handles_connection_errors() {
353        struct ErrorConnector;
354
355        impl ProviderConnector for ErrorConnector {
356            fn connect(&self) -> Result<ProviderProxy, io::Error> {
357                Err(io::Error::other("something bad happened"))
358            }
359        }
360
361        assert_matches!(parse_ip_addr_with_provider(&ErrorConnector, "[fe80::1:2:3:4%25lo]", 8080).await,
362            Err(err) if err.kind() == io::ErrorKind::Other);
363    }
364
365    #[fasync::run_singlethreaded(test)]
366    async fn test_parse_ipv6_addr_handles_large_interface_indices() {
367        let (proxy, mut stream) = create_proxy_and_stream::<ProviderMarker>();
368
369        let provider_fut = async move {
370            while let Some(req) = stream.try_next().await.unwrap_or(None) {
371                match req {
372                    ProviderRequest::InterfaceNameToIndex { name: _, responder } => {
373                        responder.send(Ok(u64::MAX)).unwrap()
374                    }
375                    _ => panic!("unexpected request"),
376                }
377            }
378        };
379
380        struct ErrorConnector {
381            proxy: RefCell<Option<ProviderProxy>>,
382        }
383
384        impl ProviderConnector for ErrorConnector {
385            fn connect(&self) -> Result<ProviderProxy, io::Error> {
386                let proxy = self.proxy.borrow_mut().take().unwrap();
387                Ok(proxy)
388            }
389        }
390
391        let connector = ErrorConnector { proxy: RefCell::new(Some(proxy)) };
392
393        let parse_ip_fut = parse_ip_addr_with_provider(&connector, "[fe80::1:2:3:4%25lo]", 8080);
394
395        // Join the two futures to make sure they both complete.
396        let ((), res) = future::join(provider_fut, parse_ip_fut).await;
397
398        assert_matches!(res, Err(err) if err.kind() == io::ErrorKind::Other);
399    }
400
401    struct ProxyConnector<T> {
402        proxy: T,
403    }
404
405    impl LookupConnector for ProxyConnector<LookupProxy> {
406        fn connect(&self) -> Result<LookupProxy, io::Error> {
407            Ok(self.proxy.clone())
408        }
409    }
410
411    #[fasync::run_singlethreaded(test)]
412    async fn test_resolve_ip_addr() {
413        let (sender, receiver) =
414            futures::channel::mpsc::unbounded::<Result<LookupResult, LookupError>>();
415        let (proxy, stream) = create_proxy_and_stream::<LookupMarker>();
416        const TEST_HOSTNAME: &'static str = "foobar.com";
417        let name_lookup_fut = stream.zip(receiver).for_each(|(req, rsp)| match req {
418            Ok(LookupRequest::LookupIp { hostname, options, responder }) => {
419                assert_eq!(hostname.as_str(), TEST_HOSTNAME);
420                assert_eq!(
421                    options,
422                    LookupIpOptions {
423                        ipv4_lookup: Some(true),
424                        ipv6_lookup: Some(true),
425                        sort_addresses: Some(true),
426                        ..Default::default()
427                    }
428                );
429                let rsp = rsp.as_ref().map_err(|e| *e);
430                futures::future::ready(responder.send(rsp).expect("failed to send FIDL response"))
431            }
432            req => panic!("unexpected item in request stream {:?}", req),
433        });
434
435        let connector = ProxyConnector { proxy };
436
437        let ip_v4 = Ipv4Addr::LOCALHOST.into();
438        let ip_v6 = Ipv6Addr::LOCALHOST.into();
439        const PORT1: u16 = 1234;
440        const PORT2: u16 = 4321;
441
442        let test_fut = async move {
443            // Test expectation's error variant is a tuple of the lookup error
444            // to inject and the expected io error kind returned.
445            type Expectation = Result<Vec<std::net::IpAddr>, (LookupError, io::ErrorKind)>;
446            let test_resolve = |port, expect: Expectation| {
447                let fidl_response = expect
448                    .clone()
449                    .map(|addrs| LookupResult {
450                        addresses: Some(
451                            addrs
452                                .into_iter()
453                                .map(|std| fidl_fuchsia_net_ext::IpAddress(std).into())
454                                .collect(),
455                        ),
456                        ..Default::default()
457                    })
458                    .map_err(|(fidl_err, _io_err)| fidl_err);
459                let expect = expect
460                    .map(|addrs| {
461                        addrs.into_iter().map(|addr| SocketAddr::new(addr, port)).collect()
462                    })
463                    .map_err(|(_fidl_err, io_err)| io_err);
464                let () = sender.unbounded_send(fidl_response).expect("failed to send expectation");
465                resolve_ip_addr(&connector, TEST_HOSTNAME, port)
466                    .map_ok(Iterator::collect::<Vec<_>>)
467                    // Map IO error to kind so we can do equality.
468                    .map_err(|err| err.kind())
469                    .map(move |result| {
470                        assert_eq!(result, expect);
471                    })
472            };
473            let () = test_resolve(PORT1, Ok(vec![ip_v4])).await;
474            let () = test_resolve(PORT2, Ok(vec![ip_v6])).await;
475            let () = test_resolve(PORT1, Ok(vec![ip_v4, ip_v6])).await;
476            let () = test_resolve(PORT1, Err((LookupError::NotFound, io::ErrorKind::Other))).await;
477        };
478
479        let ((), ()) = futures::future::join(name_lookup_fut, test_fut).await;
480    }
481}