1use crate::happy_eyeballs::{self, RealSocketConnector};
6use crate::{
7 HyperConnectorFuture, SocketOptions, TcpOptions, TcpStream, connect_and_bind_device,
8 parse_ip_addr,
9};
10use fidl::endpoints::{ClientEnd, create_endpoints};
11use fidl_connector::{Connect, ServiceReconnector};
12use fidl_fuchsia_net_name::{LookupIpOptions, LookupMarker, LookupProxy, LookupResult};
13use fidl_fuchsia_posix_socket::{Domain, ProviderMarker, StreamSocketProtocol};
14use fuchsia_async::net;
15
16use futures::future::{Future, FutureExt};
17use futures::io;
18use futures::task::{Context, Poll};
19use http::uri::{Scheme, Uri};
20use hyper::service::Service;
21use rustls::RootCertStore;
22use std::convert::TryFrom as _;
23use std::net::SocketAddr;
24use std::num::TryFromIntError;
25use std::sync::{Arc, LazyLock};
26
27pub fn new_root_cert_store() -> Arc<RootCertStore> {
28 static ROOT_STORE: LazyLock<Arc<RootCertStore>> = LazyLock::new(|| {
30 let mut root_store = rustls::RootCertStore::empty();
31
32 root_store.add_trust_anchors(webpki_roots_fuchsia::TLS_SERVER_ROOTS.iter().map(|cert| {
33 rustls::OwnedTrustAnchor::from_subject_spki_name_constraints(
34 cert.subject,
35 cert.spki,
36 cert.name_constraints,
37 )
38 }));
39
40 Arc::new(root_store)
41 });
42
43 Arc::clone(&ROOT_STORE)
44}
45
46#[derive(Clone)]
49pub struct HyperConnector {
50 tcp_options: TcpOptions,
51 socket_options: SocketOptions,
52 provider: RealServiceConnector,
53}
54
55impl From<(TcpOptions, SocketOptions)> for HyperConnector {
56 fn from((tcp_options, socket_options): (TcpOptions, SocketOptions)) -> Self {
57 Self { tcp_options, socket_options, provider: RealServiceConnector::new() }
58 }
59}
60
61impl HyperConnector {
62 pub fn new() -> Self {
63 Self::from_tcp_options(TcpOptions::default())
64 }
65
66 pub fn from_tcp_options(tcp_options: TcpOptions) -> Self {
67 Self {
68 tcp_options,
69 socket_options: SocketOptions::default(),
70 provider: RealServiceConnector::new(),
71 }
72 }
73}
74
75impl Service<Uri> for HyperConnector {
76 type Response = TcpStream;
77 type Error = std::io::Error;
78 type Future = HyperConnectorFuture;
79
80 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
81 Poll::Ready(Ok(()))
83 }
84
85 fn call(&mut self, dst: Uri) -> Self::Future {
86 let self_ = self.clone();
87 HyperConnectorFuture { fut: Box::pin(async move { self_.call_async(dst).await }) }
88 }
89}
90
91impl HyperConnector {
92 async fn call_async(&self, dst: Uri) -> Result<TcpStream, io::Error> {
93 let host = dst.host().ok_or_else(|| io::Error::other("destination host is unspecified"))?;
94 let port = match dst.port() {
95 Some(port) => port.as_u16(),
96 None => {
97 if dst.scheme() == Some(&Scheme::HTTPS) {
98 443
99 } else {
100 80
101 }
102 }
103 };
104
105 let stream = connect_to_addr(
106 self.provider.clone(),
107 host,
108 port,
109 self.socket_options.bind_device.as_deref(),
110 )
111 .await?;
112 let () = self.tcp_options.apply(stream.std())?;
113
114 Ok(TcpStream { stream })
115 }
116}
117
118#[allow(dead_code)] #[derive(Clone)]
120pub struct Executor;
121
122impl<F: Future + Send + 'static> hyper::rt::Executor<F> for Executor {
123 fn execute(&self, fut: F) {
124 fuchsia_async::Task::spawn(fut.map(|_| ())).detach()
125 }
126}
127
128#[allow(dead_code)] #[derive(Clone)]
130pub struct LocalExecutor;
131
132impl<F: Future + 'static> hyper::rt::Executor<F> for LocalExecutor {
133 fn execute(&self, fut: F) {
134 fuchsia_async::Task::local(fut.map(drop)).detach()
135 }
136}
137
138pub(crate) trait ProviderConnector {
139 fn connect(&self) -> Result<ClientEnd<ProviderMarker>, io::Error>;
140}
141
142trait LookupConnector {
143 fn connect(&self) -> Result<LookupProxy, io::Error>;
144}
145
146#[derive(Clone)]
147struct RealServiceConnector {
148 name_lookup_connector: ServiceReconnector<LookupMarker>,
149}
150
151impl RealServiceConnector {
152 fn new() -> Self {
153 RealServiceConnector { name_lookup_connector: ServiceReconnector::<LookupMarker>::new() }
154 }
155}
156
157impl ProviderConnector for RealServiceConnector {
158 fn connect(&self) -> Result<ClientEnd<ProviderMarker>, io::Error> {
159 let (client_end, server_end) = create_endpoints::<ProviderMarker>();
160 fuchsia_component::client::connect_channel_to_protocol::<ProviderMarker>(server_end.into())
161 .map_err(|err| {
162 io::Error::other(format!("failed to connect to socket provider service: {}", err))
163 })?;
164 Ok(client_end)
165 }
166}
167
168impl LookupConnector for RealServiceConnector {
169 fn connect(&self) -> Result<LookupProxy, io::Error> {
170 self.name_lookup_connector.connect().map_err(|err| {
171 io::Error::other(format!("failed to connect to name lookup service: {}", err))
172 })
173 }
174}
175
176async fn connect_to_addr<T: ProviderConnector + LookupConnector + 'static>(
177 provider: T,
178 host: &str,
179 port: u16,
180 bind_device: Option<&str>,
181) -> Result<net::TcpStream, io::Error> {
182 if let Some(addr) = parse_ip_addr_with_provider(&provider, host, port).await? {
183 return connect_and_bind_device(&provider, addr, bind_device)?.await;
184 }
185
186 happy_eyeballs::happy_eyeballs(
187 resolve_ip_addr(&provider, host, port).await?,
188 RealSocketConnector::new(provider),
189 happy_eyeballs::RECOMMENDED_MIN_CONN_ATT_DELAY,
190 happy_eyeballs::RECOMMENDED_CONN_ATT_DELAY,
191 bind_device,
192 )
193 .await
194}
195
196async fn resolve_ip_addr<T: LookupConnector>(
197 name_lookup: &T,
198 host: &str,
199 port: u16,
200) -> Result<impl Iterator<Item = SocketAddr> + use<T>, io::Error> {
201 let proxy = name_lookup.connect()?;
202 let LookupResult { addresses, .. } = proxy
203 .lookup_ip(
204 host,
205 &LookupIpOptions {
206 ipv4_lookup: Some(true),
207 ipv6_lookup: Some(true),
208 sort_addresses: Some(true),
209 ..Default::default()
210 },
211 )
212 .await
213 .map_err(|err| io::Error::other(format!("failed to call NameProvider.LookupIp: {}", err)))?
214 .map_err(|err| {
215 io::Error::other(format!("NameProvider.LookupIp failure: {:?}", err))
218 })?;
219
220 Ok(addresses
221 .ok_or_else(|| io::Error::other("addresses not provided in NameProvider response"))?
222 .into_iter()
223 .map(move |addr| {
224 let fidl_fuchsia_net_ext::IpAddress(addr) = addr.into();
225 SocketAddr::new(addr, port)
226 }))
227}
228
229async fn parse_ip_addr_with_provider(
230 provider: &impl ProviderConnector,
231 host: &str,
232 port: u16,
233) -> Result<Option<SocketAddr>, io::Error> {
234 parse_ip_addr(host, port, |zone_id| async {
235 let proxy = provider.connect()?.into_proxy();
236 let id = proxy
237 .interface_name_to_index(zone_id)
238 .await
239 .map_err(|err| {
240 io::Error::other(format!(
241 "failed to get interface index from socket provider: {}",
242 err
243 ))
244 })?
245 .map_err(|status| zx::Status::from_raw(status).into_io_error())?;
246
247 u32::try_from(id).map_err(|TryFromIntError { .. }| {
249 io::Error::other("interface index too large to convert to scope_id")
250 })
251 })
252 .await
253}
254
255pub(crate) fn stream_socket<T: ProviderConnector>(
256 provider: &T,
257 domain: Domain,
258 proto: StreamSocketProtocol,
259) -> io::Result<socket2::Socket> {
260 let socket_provider = provider.connect()?.into_sync_proxy();
261 let sock = socket_provider
262 .stream_socket(domain, proto, zx::MonotonicInstant::INFINITE)
263 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?
264 .map_err(|e| std::io::Error::from_raw_os_error(e.into_primitive()))?;
265
266 Ok(fdio::create_fd(sock.into())?.into())
267}
268
269#[cfg(test)]
270mod test {
271 use super::*;
272 use crate::*;
273 use assert_matches::assert_matches;
274 use fidl::endpoints::{create_proxy_and_stream, create_request_stream};
275 use fidl_fuchsia_net_name::{LookupError, LookupRequest};
276 use fidl_fuchsia_posix_socket::ProviderRequest;
277 use fuchsia_async::net::TcpListener;
278 use fuchsia_async::{self as fasync, LocalExecutor};
279 use futures::prelude::*;
280 use std::cell::RefCell;
281
282 struct PanicConnector;
283
284 impl ProviderConnector for PanicConnector {
285 fn connect(&self) -> Result<ClientEnd<ProviderMarker>, io::Error> {
286 panic!("should not be trying to talk to the Provider service")
287 }
288 }
289
290 #[test]
291 fn can_create_client() {
292 let _exec = LocalExecutor::default();
293 let _client = new_client();
294 }
295
296 #[test]
297 fn can_create_https_client() {
298 let _exec = LocalExecutor::default();
299 let _client = new_https_client();
300 }
301
302 #[fasync::run_singlethreaded(test)]
303 async fn hyper_connector_sets_tcp_options() {
304 let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0);
305 let listener = TcpListener::bind(&addr).unwrap();
306 let addr = listener.local_addr().unwrap();
307
308 let idle = std::time::Duration::from_secs(36);
309 let interval = std::time::Duration::from_secs(47);
310 let count = 58;
311 let uri = format!("https://{}", addr).parse::<hyper::Uri>().unwrap();
312 let (TcpStream { stream }, _server) = future::try_join(
313 HyperConnector::from_tcp_options(TcpOptions {
314 keepalive_idle: Some(idle),
315 keepalive_interval: Some(interval),
316 keepalive_count: Some(count),
317 ..Default::default()
318 })
319 .call(uri),
320 listener.accept_stream().try_next(),
321 )
322 .await
323 .unwrap();
324
325 let stream = socket2::SockRef::from(stream.std());
326
327 assert_matches!(stream.keepalive(), Ok(v) if v);
328 assert_matches!(stream.keepalive_time(), Ok(v) if v == idle);
329 assert_matches!(stream.keepalive_interval(), Ok(v) if v == interval);
330 assert_matches!(stream.keepalive_retries(), Ok(v) if v == count);
331 }
332
333 #[fasync::run_singlethreaded(test)]
334 async fn test_parse_ipv6_addr_with_provider() {
335 let expected = "fe80::1:2:3:4".parse::<Ipv6Addr>().unwrap();
336
337 assert_matches!(
338 parse_ip_addr_with_provider(&PanicConnector, "[fe80::1:2:3:4%250]", 8080).await,
339 Ok(Some(addr)) if addr == SocketAddr::V6(SocketAddrV6::new(expected, 8080, 0, 0))
340 );
341
342 assert_matches!(
343 parse_ip_addr_with_provider(&PanicConnector, "[fe80::1:2:3:4%252]", 8080).await,
344 Ok(Some(addr)) if addr == SocketAddr::V6(SocketAddrV6::new(expected, 8080, 0, 2))
345 );
346 }
347
348 #[fasync::run_singlethreaded(test)]
349 async fn test_parse_ipv6_addr_with_provider_supports_interface_names() {
350 let connector = RealServiceConnector::new();
351 let expected = "fe80::1:2:3:4".parse::<Ipv6Addr>().unwrap();
352
353 assert_matches!(
354 parse_ip_addr_with_provider(&connector, "[fe80::1:2:3:4%25lo]", 8080).await,
355 Ok(Some(addr)) if addr == SocketAddr::V6(SocketAddrV6::new(expected, 8080, 0, 1))
356 );
357
358 assert_matches!(
359 parse_ip_addr_with_provider(&connector, "[fe80::1:2:3:4%25]", 8080).await,
360 Err(err) if err.kind() == io::ErrorKind::NotFound
361 );
362
363 assert_matches!(
364 parse_ip_addr_with_provider(&connector, "[fe80::1:2:3:4%25unknownif]", 8080).await,
365 Err(err) if err.kind() == io::ErrorKind::NotFound
366 );
367 }
368
369 #[fasync::run_singlethreaded(test)]
370 async fn test_parse_ipv6_addr_handles_connection_errors() {
371 struct ErrorConnector;
372
373 impl ProviderConnector for ErrorConnector {
374 fn connect(&self) -> Result<ClientEnd<ProviderMarker>, io::Error> {
375 Err(io::Error::other("something bad happened"))
376 }
377 }
378
379 assert_matches!(parse_ip_addr_with_provider(&ErrorConnector, "[fe80::1:2:3:4%25lo]", 8080).await,
380 Err(err) if err.kind() == io::ErrorKind::Other);
381 }
382
383 #[fasync::run_singlethreaded(test)]
384 async fn test_parse_ipv6_addr_handles_large_interface_indices() {
385 let (client_end, mut stream) = create_request_stream::<ProviderMarker>();
386
387 let provider_fut = async move {
388 while let Some(req) = stream.try_next().await.unwrap_or(None) {
389 match req {
390 ProviderRequest::InterfaceNameToIndex { name: _, responder } => {
391 responder.send(Ok(u64::MAX)).unwrap()
392 }
393 _ => panic!("unexpected request"),
394 }
395 }
396 };
397
398 struct ErrorConnector {
399 client_end: RefCell<Option<ClientEnd<ProviderMarker>>>,
400 }
401
402 impl ProviderConnector for ErrorConnector {
403 fn connect(&self) -> Result<ClientEnd<ProviderMarker>, io::Error> {
404 let client_end = self.client_end.borrow_mut().take().unwrap();
405 Ok(client_end)
406 }
407 }
408
409 let connector = ErrorConnector { client_end: RefCell::new(Some(client_end)) };
410
411 let parse_ip_fut = parse_ip_addr_with_provider(&connector, "[fe80::1:2:3:4%25lo]", 8080);
412
413 let ((), res) = future::join(provider_fut, parse_ip_fut).await;
415
416 assert_matches!(res, Err(err) if err.kind() == io::ErrorKind::Other);
417 }
418
419 struct ProxyConnector<T> {
420 proxy: T,
421 }
422
423 impl LookupConnector for ProxyConnector<LookupProxy> {
424 fn connect(&self) -> Result<LookupProxy, io::Error> {
425 Ok(self.proxy.clone())
426 }
427 }
428
429 #[fasync::run_singlethreaded(test)]
430 async fn test_resolve_ip_addr() {
431 let (sender, receiver) =
432 futures::channel::mpsc::unbounded::<Result<LookupResult, LookupError>>();
433 let (proxy, stream) = create_proxy_and_stream::<LookupMarker>();
434 const TEST_HOSTNAME: &'static str = "foobar.com";
435 let name_lookup_fut = stream.zip(receiver).for_each(|(req, rsp)| match req {
436 Ok(LookupRequest::LookupIp { hostname, options, responder }) => {
437 assert_eq!(hostname.as_str(), TEST_HOSTNAME);
438 assert_eq!(
439 options,
440 LookupIpOptions {
441 ipv4_lookup: Some(true),
442 ipv6_lookup: Some(true),
443 sort_addresses: Some(true),
444 ..Default::default()
445 }
446 );
447 let rsp = rsp.as_ref().map_err(|e| *e);
448 futures::future::ready(responder.send(rsp).expect("failed to send FIDL response"))
449 }
450 req => panic!("unexpected item in request stream {:?}", req),
451 });
452
453 let connector = ProxyConnector { proxy };
454
455 let ip_v4 = Ipv4Addr::LOCALHOST.into();
456 let ip_v6 = Ipv6Addr::LOCALHOST.into();
457 const PORT1: u16 = 1234;
458 const PORT2: u16 = 4321;
459
460 let test_fut = async move {
461 type Expectation = Result<Vec<std::net::IpAddr>, (LookupError, io::ErrorKind)>;
464 let test_resolve = |port, expect: Expectation| {
465 let fidl_response = expect
466 .clone()
467 .map(|addrs| LookupResult {
468 addresses: Some(
469 addrs
470 .into_iter()
471 .map(|std| fidl_fuchsia_net_ext::IpAddress(std).into())
472 .collect(),
473 ),
474 ..Default::default()
475 })
476 .map_err(|(fidl_err, _io_err)| fidl_err);
477 let expect = expect
478 .map(|addrs| {
479 addrs.into_iter().map(|addr| SocketAddr::new(addr, port)).collect()
480 })
481 .map_err(|(_fidl_err, io_err)| io_err);
482 let () = sender.unbounded_send(fidl_response).expect("failed to send expectation");
483 resolve_ip_addr(&connector, TEST_HOSTNAME, port)
484 .map_ok(Iterator::collect::<Vec<_>>)
485 .map_err(|err| err.kind())
487 .map(move |result| {
488 assert_eq!(result, expect);
489 })
490 };
491 let () = test_resolve(PORT1, Ok(vec![ip_v4])).await;
492 let () = test_resolve(PORT2, Ok(vec![ip_v6])).await;
493 let () = test_resolve(PORT1, Ok(vec![ip_v4, ip_v6])).await;
494 let () = test_resolve(PORT1, Err((LookupError::NotFound, io::ErrorKind::Other))).await;
495 };
496
497 let ((), ()) = futures::future::join(name_lookup_fut, test_fut).await;
498 }
499}