1use crate::happy_eyeballs::{self, RealSocketConnector};
6use crate::{
7 HyperConnectorFuture, SocketOptions, TcpOptions, TcpStream, connect_and_bind_device,
8 parse_ip_addr,
9};
10use fidl::endpoints::{ClientEnd, create_endpoints};
11use fidl_connector::{Connect, ServiceReconnector};
12use fidl_fuchsia_net_name::{LookupIpOptions, LookupMarker, LookupProxy, LookupResult};
13use fidl_fuchsia_posix_socket::{Domain, ProviderMarker, StreamSocketProtocol};
14use fuchsia_async::net;
15
16use futures::future::{Future, FutureExt};
17use futures::io;
18use futures::task::{Context, Poll};
19use http::uri::{Scheme, Uri};
20use hyper::service::Service;
21use rustls::RootCertStore;
22use std::convert::TryFrom as _;
23use std::net::SocketAddr;
24use std::num::TryFromIntError;
25use std::sync::{Arc, LazyLock};
26use zx::StatusExt;
27
28pub fn new_root_cert_store() -> Arc<RootCertStore> {
29 static ROOT_STORE: LazyLock<Arc<RootCertStore>> = LazyLock::new(|| {
31 let mut root_store = rustls::RootCertStore::empty();
32
33 root_store.add_trust_anchors(webpki_roots_fuchsia::TLS_SERVER_ROOTS.iter().map(|cert| {
34 rustls::OwnedTrustAnchor::from_subject_spki_name_constraints(
35 cert.subject,
36 cert.spki,
37 cert.name_constraints,
38 )
39 }));
40
41 Arc::new(root_store)
42 });
43
44 Arc::clone(&ROOT_STORE)
45}
46
47#[derive(Clone)]
50pub struct HyperConnector {
51 tcp_options: TcpOptions,
52 socket_options: SocketOptions,
53 provider: RealServiceConnector,
54}
55
56impl From<(TcpOptions, SocketOptions)> for HyperConnector {
57 fn from((tcp_options, socket_options): (TcpOptions, SocketOptions)) -> Self {
58 Self { tcp_options, socket_options, provider: RealServiceConnector::new() }
59 }
60}
61
62impl HyperConnector {
63 pub fn new() -> Self {
64 Self::from_tcp_options(TcpOptions::default())
65 }
66
67 pub fn from_tcp_options(tcp_options: TcpOptions) -> Self {
68 Self {
69 tcp_options,
70 socket_options: SocketOptions::default(),
71 provider: RealServiceConnector::new(),
72 }
73 }
74}
75
76impl Service<Uri> for HyperConnector {
77 type Response = TcpStream;
78 type Error = std::io::Error;
79 type Future = HyperConnectorFuture;
80
81 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
82 Poll::Ready(Ok(()))
84 }
85
86 fn call(&mut self, dst: Uri) -> Self::Future {
87 let self_ = self.clone();
88 HyperConnectorFuture { fut: Box::pin(async move { self_.call_async(dst).await }) }
89 }
90}
91
92impl HyperConnector {
93 async fn call_async(&self, dst: Uri) -> Result<TcpStream, io::Error> {
94 let host = dst.host().ok_or_else(|| io::Error::other("destination host is unspecified"))?;
95 let port = match dst.port() {
96 Some(port) => port.as_u16(),
97 None => {
98 if dst.scheme() == Some(&Scheme::HTTPS) {
99 443
100 } else {
101 80
102 }
103 }
104 };
105
106 let stream = connect_to_addr(
107 self.provider.clone(),
108 host,
109 port,
110 self.socket_options.bind_device.as_deref(),
111 )
112 .await?;
113 let () = self.tcp_options.apply(stream.std())?;
114
115 Ok(TcpStream { stream })
116 }
117}
118
119#[allow(dead_code)] #[derive(Clone)]
121pub struct Executor;
122
123impl<F: Future + Send + 'static> hyper::rt::Executor<F> for Executor {
124 fn execute(&self, fut: F) {
125 fuchsia_async::Task::spawn(fut.map(|_| ())).detach()
126 }
127}
128
129#[allow(dead_code)] #[derive(Clone)]
131pub struct LocalExecutor;
132
133impl<F: Future + 'static> hyper::rt::Executor<F> for LocalExecutor {
134 fn execute(&self, fut: F) {
135 fuchsia_async::Task::local(fut.map(drop)).detach()
136 }
137}
138
139pub(crate) trait ProviderConnector {
140 fn connect(&self) -> Result<ClientEnd<ProviderMarker>, io::Error>;
141}
142
143trait LookupConnector {
144 fn connect(&self) -> Result<LookupProxy, io::Error>;
145}
146
147#[derive(Clone)]
148struct RealServiceConnector {
149 name_lookup_connector: ServiceReconnector<LookupMarker>,
150}
151
152impl RealServiceConnector {
153 fn new() -> Self {
154 RealServiceConnector { name_lookup_connector: ServiceReconnector::<LookupMarker>::new() }
155 }
156}
157
158impl ProviderConnector for RealServiceConnector {
159 fn connect(&self) -> Result<ClientEnd<ProviderMarker>, io::Error> {
160 let (client_end, server_end) = create_endpoints::<ProviderMarker>();
161 fuchsia_component::client::connect_channel_to_protocol::<ProviderMarker>(server_end.into())
162 .map_err(|err| {
163 io::Error::other(format!("failed to connect to socket provider service: {}", err))
164 })?;
165 Ok(client_end)
166 }
167}
168
169impl LookupConnector for RealServiceConnector {
170 fn connect(&self) -> Result<LookupProxy, io::Error> {
171 self.name_lookup_connector.connect().map_err(|err| {
172 io::Error::other(format!("failed to connect to name lookup service: {}", err))
173 })
174 }
175}
176
177async fn connect_to_addr<T: ProviderConnector + LookupConnector + 'static>(
178 provider: T,
179 host: &str,
180 port: u16,
181 bind_device: Option<&str>,
182) -> Result<net::TcpStream, io::Error> {
183 if let Some(addr) = parse_ip_addr_with_provider(&provider, host, port).await? {
184 return connect_and_bind_device(&provider, addr, bind_device)?.await;
185 }
186
187 happy_eyeballs::happy_eyeballs(
188 resolve_ip_addr(&provider, host, port).await?,
189 RealSocketConnector::new(provider),
190 happy_eyeballs::RECOMMENDED_MIN_CONN_ATT_DELAY,
191 happy_eyeballs::RECOMMENDED_CONN_ATT_DELAY,
192 bind_device,
193 )
194 .await
195}
196
197async fn resolve_ip_addr<T: LookupConnector>(
198 name_lookup: &T,
199 host: &str,
200 port: u16,
201) -> Result<impl Iterator<Item = SocketAddr> + use<T>, io::Error> {
202 let proxy = name_lookup.connect()?;
203 let LookupResult { addresses, .. } = proxy
204 .lookup_ip(
205 host,
206 &LookupIpOptions {
207 ipv4_lookup: Some(true),
208 ipv6_lookup: Some(true),
209 sort_addresses: Some(true),
210 ..Default::default()
211 },
212 )
213 .await
214 .map_err(|err| io::Error::other(format!("failed to call NameProvider.LookupIp: {}", err)))?
215 .map_err(|err| {
216 io::Error::other(format!("NameProvider.LookupIp failure: {:?}", err))
219 })?;
220
221 Ok(addresses
222 .ok_or_else(|| io::Error::other("addresses not provided in NameProvider response"))?
223 .into_iter()
224 .map(move |addr| {
225 let fidl_fuchsia_net_ext::IpAddress(addr) = addr.into();
226 SocketAddr::new(addr, port)
227 }))
228}
229
230async fn parse_ip_addr_with_provider(
231 provider: &impl ProviderConnector,
232 host: &str,
233 port: u16,
234) -> Result<Option<SocketAddr>, io::Error> {
235 parse_ip_addr(host, port, |zone_id| async {
236 let proxy = provider.connect()?.into_proxy();
237 let id = proxy
238 .interface_name_to_index(zone_id)
239 .await
240 .map_err(|err| {
241 io::Error::other(format!(
242 "failed to get interface index from socket provider: {}",
243 err
244 ))
245 })?
246 .map_err(|status| zx::Status::from_raw(status).into_io_error())?;
247
248 u32::try_from(id).map_err(|TryFromIntError { .. }| {
250 io::Error::other("interface index too large to convert to scope_id")
251 })
252 })
253 .await
254}
255
256pub(crate) fn stream_socket<T: ProviderConnector>(
257 provider: &T,
258 domain: Domain,
259 proto: StreamSocketProtocol,
260) -> io::Result<socket2::Socket> {
261 let socket_provider = provider.connect()?.into_sync_proxy();
262 let sock = socket_provider
263 .stream_socket(domain, proto, zx::MonotonicInstant::INFINITE)
264 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?
265 .map_err(|e| std::io::Error::from_raw_os_error(e.into_primitive()))?;
266
267 Ok(fdio::create_fd(sock.into()).map_err(|s| s.into_io_error())?.into())
268}
269
270#[cfg(test)]
271mod test {
272 use super::*;
273 use crate::*;
274 use assert_matches::assert_matches;
275 use fidl::endpoints::{create_proxy_and_stream, create_request_stream};
276 use fidl_fuchsia_net_name::{LookupError, LookupRequest};
277 use fidl_fuchsia_posix_socket::ProviderRequest;
278 use fuchsia_async::net::TcpListener;
279 use fuchsia_async::{self as fasync, LocalExecutor};
280 use futures::prelude::*;
281 use std::cell::RefCell;
282
283 struct PanicConnector;
284
285 impl ProviderConnector for PanicConnector {
286 fn connect(&self) -> Result<ClientEnd<ProviderMarker>, io::Error> {
287 panic!("should not be trying to talk to the Provider service")
288 }
289 }
290
291 #[test]
292 fn can_create_client() {
293 let _exec = LocalExecutor::default();
294 let _client = new_client();
295 }
296
297 #[test]
298 fn can_create_https_client() {
299 let _exec = LocalExecutor::default();
300 let _client = new_https_client();
301 }
302
303 #[fasync::run_singlethreaded(test)]
304 async fn hyper_connector_sets_tcp_options() {
305 let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0);
306 let listener = TcpListener::bind(&addr).unwrap();
307 let addr = listener.local_addr().unwrap();
308
309 let idle = std::time::Duration::from_secs(36);
310 let interval = std::time::Duration::from_secs(47);
311 let count = 58;
312 let uri = format!("https://{}", addr).parse::<hyper::Uri>().unwrap();
313 let (TcpStream { stream }, _server) = future::try_join(
314 HyperConnector::from_tcp_options(TcpOptions {
315 keepalive_idle: Some(idle),
316 keepalive_interval: Some(interval),
317 keepalive_count: Some(count),
318 ..Default::default()
319 })
320 .call(uri),
321 listener.accept_stream().try_next(),
322 )
323 .await
324 .unwrap();
325
326 let stream = socket2::SockRef::from(stream.std());
327
328 assert_matches!(stream.keepalive(), Ok(v) if v);
329 assert_matches!(stream.keepalive_time(), Ok(v) if v == idle);
330 assert_matches!(stream.keepalive_interval(), Ok(v) if v == interval);
331 assert_matches!(stream.keepalive_retries(), Ok(v) if v == count);
332 }
333
334 #[fasync::run_singlethreaded(test)]
335 async fn test_parse_ipv6_addr_with_provider() {
336 let expected = "fe80::1:2:3:4".parse::<Ipv6Addr>().unwrap();
337
338 assert_matches!(
339 parse_ip_addr_with_provider(&PanicConnector, "[fe80::1:2:3:4%250]", 8080).await,
340 Ok(Some(addr)) if addr == SocketAddr::V6(SocketAddrV6::new(expected, 8080, 0, 0))
341 );
342
343 assert_matches!(
344 parse_ip_addr_with_provider(&PanicConnector, "[fe80::1:2:3:4%252]", 8080).await,
345 Ok(Some(addr)) if addr == SocketAddr::V6(SocketAddrV6::new(expected, 8080, 0, 2))
346 );
347 }
348
349 #[fasync::run_singlethreaded(test)]
350 async fn test_parse_ipv6_addr_with_provider_supports_interface_names() {
351 let connector = RealServiceConnector::new();
352 let expected = "fe80::1:2:3:4".parse::<Ipv6Addr>().unwrap();
353
354 assert_matches!(
355 parse_ip_addr_with_provider(&connector, "[fe80::1:2:3:4%25lo]", 8080).await,
356 Ok(Some(addr)) if addr == SocketAddr::V6(SocketAddrV6::new(expected, 8080, 0, 1))
357 );
358
359 assert_matches!(
360 parse_ip_addr_with_provider(&connector, "[fe80::1:2:3:4%25]", 8080).await,
361 Err(err) if err.kind() == io::ErrorKind::NotFound
362 );
363
364 assert_matches!(
365 parse_ip_addr_with_provider(&connector, "[fe80::1:2:3:4%25unknownif]", 8080).await,
366 Err(err) if err.kind() == io::ErrorKind::NotFound
367 );
368 }
369
370 #[fasync::run_singlethreaded(test)]
371 async fn test_parse_ipv6_addr_handles_connection_errors() {
372 struct ErrorConnector;
373
374 impl ProviderConnector for ErrorConnector {
375 fn connect(&self) -> Result<ClientEnd<ProviderMarker>, io::Error> {
376 Err(io::Error::other("something bad happened"))
377 }
378 }
379
380 assert_matches!(parse_ip_addr_with_provider(&ErrorConnector, "[fe80::1:2:3:4%25lo]", 8080).await,
381 Err(err) if err.kind() == io::ErrorKind::Other);
382 }
383
384 #[fasync::run_singlethreaded(test)]
385 async fn test_parse_ipv6_addr_handles_large_interface_indices() {
386 let (client_end, mut stream) = create_request_stream::<ProviderMarker>();
387
388 let provider_fut = async move {
389 while let Some(req) = stream.try_next().await.unwrap_or(None) {
390 match req {
391 ProviderRequest::InterfaceNameToIndex { name: _, responder } => {
392 responder.send(Ok(u64::MAX)).unwrap()
393 }
394 _ => panic!("unexpected request"),
395 }
396 }
397 };
398
399 struct ErrorConnector {
400 client_end: RefCell<Option<ClientEnd<ProviderMarker>>>,
401 }
402
403 impl ProviderConnector for ErrorConnector {
404 fn connect(&self) -> Result<ClientEnd<ProviderMarker>, io::Error> {
405 let client_end = self.client_end.borrow_mut().take().unwrap();
406 Ok(client_end)
407 }
408 }
409
410 let connector = ErrorConnector { client_end: RefCell::new(Some(client_end)) };
411
412 let parse_ip_fut = parse_ip_addr_with_provider(&connector, "[fe80::1:2:3:4%25lo]", 8080);
413
414 let ((), res) = future::join(provider_fut, parse_ip_fut).await;
416
417 assert_matches!(res, Err(err) if err.kind() == io::ErrorKind::Other);
418 }
419
420 struct ProxyConnector<T> {
421 proxy: T,
422 }
423
424 impl LookupConnector for ProxyConnector<LookupProxy> {
425 fn connect(&self) -> Result<LookupProxy, io::Error> {
426 Ok(self.proxy.clone())
427 }
428 }
429
430 #[fasync::run_singlethreaded(test)]
431 async fn test_resolve_ip_addr() {
432 let (sender, receiver) =
433 futures::channel::mpsc::unbounded::<Result<LookupResult, LookupError>>();
434 let (proxy, stream) = create_proxy_and_stream::<LookupMarker>();
435 const TEST_HOSTNAME: &'static str = "foobar.com";
436 let name_lookup_fut = stream.zip(receiver).for_each(|(req, rsp)| match req {
437 Ok(LookupRequest::LookupIp { hostname, options, responder }) => {
438 assert_eq!(hostname.as_str(), TEST_HOSTNAME);
439 assert_eq!(
440 options,
441 LookupIpOptions {
442 ipv4_lookup: Some(true),
443 ipv6_lookup: Some(true),
444 sort_addresses: Some(true),
445 ..Default::default()
446 }
447 );
448 let rsp = rsp.as_ref().map_err(|e| *e);
449 futures::future::ready(responder.send(rsp).expect("failed to send FIDL response"))
450 }
451 req => panic!("unexpected item in request stream {:?}", req),
452 });
453
454 let connector = ProxyConnector { proxy };
455
456 let ip_v4 = Ipv4Addr::LOCALHOST.into();
457 let ip_v6 = Ipv6Addr::LOCALHOST.into();
458 const PORT1: u16 = 1234;
459 const PORT2: u16 = 4321;
460
461 let test_fut = async move {
462 type Expectation = Result<Vec<std::net::IpAddr>, (LookupError, io::ErrorKind)>;
465 let test_resolve = |port, expect: Expectation| {
466 let fidl_response = expect
467 .clone()
468 .map(|addrs| LookupResult {
469 addresses: Some(
470 addrs
471 .into_iter()
472 .map(|std| fidl_fuchsia_net_ext::IpAddress(std).into())
473 .collect(),
474 ),
475 ..Default::default()
476 })
477 .map_err(|(fidl_err, _io_err)| fidl_err);
478 let expect = expect
479 .map(|addrs| {
480 addrs.into_iter().map(|addr| SocketAddr::new(addr, port)).collect()
481 })
482 .map_err(|(_fidl_err, io_err)| io_err);
483 let () = sender.unbounded_send(fidl_response).expect("failed to send expectation");
484 resolve_ip_addr(&connector, TEST_HOSTNAME, port)
485 .map_ok(Iterator::collect::<Vec<_>>)
486 .map_err(|err| err.kind())
488 .map(move |result| {
489 assert_eq!(result, expect);
490 })
491 };
492 let () = test_resolve(PORT1, Ok(vec![ip_v4])).await;
493 let () = test_resolve(PORT2, Ok(vec![ip_v6])).await;
494 let () = test_resolve(PORT1, Ok(vec![ip_v4, ip_v6])).await;
495 let () = test_resolve(PORT1, Err((LookupError::NotFound, io::ErrorKind::Other))).await;
496 };
497
498 let ((), ()) = futures::future::join(name_lookup_fut, test_fut).await;
499 }
500}