1use crate::happy_eyeballs::{self, RealSocketConnector};
6use crate::{
7 HyperConnectorFuture, SocketOptions, TcpOptions, TcpStream, connect_and_bind_device,
8 parse_ip_addr,
9};
10use fidl_connector::{Connect, ServiceReconnector};
11use fidl_fuchsia_net_name::{LookupIpOptions, LookupMarker, LookupProxy, LookupResult};
12use fidl_fuchsia_posix_socket::{ProviderMarker, ProviderProxy};
13use fuchsia_async::net;
14
15use futures::future::{Future, FutureExt};
16use futures::io;
17use futures::task::{Context, Poll};
18use http::uri::{Scheme, Uri};
19use hyper::service::Service;
20use rustls::RootCertStore;
21use std::convert::TryFrom as _;
22use std::net::SocketAddr;
23use std::num::TryFromIntError;
24use std::sync::{Arc, LazyLock};
25
26pub fn new_root_cert_store() -> Arc<RootCertStore> {
27 static ROOT_STORE: LazyLock<Arc<RootCertStore>> = LazyLock::new(|| {
29 let mut root_store = rustls::RootCertStore::empty();
30
31 root_store.add_trust_anchors(webpki_roots_fuchsia::TLS_SERVER_ROOTS.iter().map(|cert| {
32 rustls::OwnedTrustAnchor::from_subject_spki_name_constraints(
33 cert.subject,
34 cert.spki,
35 cert.name_constraints,
36 )
37 }));
38
39 Arc::new(root_store)
40 });
41
42 Arc::clone(&ROOT_STORE)
43}
44
45#[derive(Clone)]
48pub struct HyperConnector {
49 tcp_options: TcpOptions,
50 socket_options: SocketOptions,
51 provider: RealServiceConnector,
52}
53
54impl From<(TcpOptions, SocketOptions)> for HyperConnector {
55 fn from((tcp_options, socket_options): (TcpOptions, SocketOptions)) -> Self {
56 Self { tcp_options, socket_options, provider: RealServiceConnector::new() }
57 }
58}
59
60impl HyperConnector {
61 pub fn new() -> Self {
62 Self::from_tcp_options(TcpOptions::default())
63 }
64
65 pub fn from_tcp_options(tcp_options: TcpOptions) -> Self {
66 Self {
67 tcp_options,
68 socket_options: SocketOptions::default(),
69 provider: RealServiceConnector::new(),
70 }
71 }
72}
73
74impl Service<Uri> for HyperConnector {
75 type Response = TcpStream;
76 type Error = std::io::Error;
77 type Future = HyperConnectorFuture;
78
79 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
80 Poll::Ready(Ok(()))
82 }
83
84 fn call(&mut self, dst: Uri) -> Self::Future {
85 let self_ = self.clone();
86 HyperConnectorFuture { fut: Box::pin(async move { self_.call_async(dst).await }) }
87 }
88}
89
90impl HyperConnector {
91 async fn call_async(&self, dst: Uri) -> Result<TcpStream, io::Error> {
92 let host = dst.host().ok_or_else(|| io::Error::other("destination host is unspecified"))?;
93 let port = match dst.port() {
94 Some(port) => port.as_u16(),
95 None => {
96 if dst.scheme() == Some(&Scheme::HTTPS) {
97 443
98 } else {
99 80
100 }
101 }
102 };
103
104 let stream =
105 connect_to_addr(&self.provider, host, port, self.socket_options.bind_device.as_deref())
106 .await?;
107 let () = self.tcp_options.apply(stream.std())?;
108
109 Ok(TcpStream { stream })
110 }
111}
112
113#[allow(dead_code)] #[derive(Clone)]
115pub struct Executor;
116
117impl<F: Future + Send + 'static> hyper::rt::Executor<F> for Executor {
118 fn execute(&self, fut: F) {
119 fuchsia_async::Task::spawn(fut.map(|_| ())).detach()
120 }
121}
122
123#[allow(dead_code)] #[derive(Clone)]
125pub struct LocalExecutor;
126
127impl<F: Future + 'static> hyper::rt::Executor<F> for LocalExecutor {
128 fn execute(&self, fut: F) {
129 fuchsia_async::Task::local(fut.map(drop)).detach()
130 }
131}
132
133trait ProviderConnector {
134 fn connect(&self) -> Result<ProviderProxy, io::Error>;
135}
136
137trait LookupConnector {
138 fn connect(&self) -> Result<LookupProxy, io::Error>;
139}
140
141#[derive(Clone)]
142struct RealServiceConnector {
143 socket_provider_connector: ServiceReconnector<ProviderMarker>,
144 name_lookup_connector: ServiceReconnector<LookupMarker>,
145}
146
147impl RealServiceConnector {
148 fn new() -> Self {
149 RealServiceConnector {
150 socket_provider_connector: ServiceReconnector::<ProviderMarker>::new(),
151 name_lookup_connector: ServiceReconnector::<LookupMarker>::new(),
152 }
153 }
154}
155
156impl ProviderConnector for RealServiceConnector {
157 fn connect(&self) -> Result<ProviderProxy, io::Error> {
158 self.socket_provider_connector.connect().map_err(|err| {
159 io::Error::other(format!("failed to connect to socket provider service: {}", err))
160 })
161 }
162}
163
164impl LookupConnector for RealServiceConnector {
165 fn connect(&self) -> Result<LookupProxy, io::Error> {
166 self.name_lookup_connector.connect().map_err(|err| {
167 io::Error::other(format!("failed to connect to name lookup service: {}", err))
168 })
169 }
170}
171
172async fn connect_to_addr<T: ProviderConnector + LookupConnector>(
173 provider: &T,
174 host: &str,
175 port: u16,
176 bind_device: Option<&str>,
177) -> Result<net::TcpStream, io::Error> {
178 if let Some(addr) = parse_ip_addr_with_provider(provider, host, port).await? {
179 return connect_and_bind_device(addr, bind_device)?.await;
180 }
181
182 happy_eyeballs::happy_eyeballs(
183 resolve_ip_addr(provider, host, port).await?,
184 RealSocketConnector,
185 happy_eyeballs::RECOMMENDED_MIN_CONN_ATT_DELAY,
186 happy_eyeballs::RECOMMENDED_CONN_ATT_DELAY,
187 bind_device,
188 )
189 .await
190}
191
192async fn resolve_ip_addr(
193 name_lookup: &impl LookupConnector,
194 host: &str,
195 port: u16,
196) -> Result<impl Iterator<Item = SocketAddr>, io::Error> {
197 let proxy = name_lookup.connect()?;
198 let LookupResult { addresses, .. } = proxy
199 .lookup_ip(
200 host,
201 &LookupIpOptions {
202 ipv4_lookup: Some(true),
203 ipv6_lookup: Some(true),
204 sort_addresses: Some(true),
205 ..Default::default()
206 },
207 )
208 .await
209 .map_err(|err| io::Error::other(format!("failed to call NameProvider.LookupIp: {}", err)))?
210 .map_err(|err| {
211 io::Error::other(format!("NameProvider.LookupIp failure: {:?}", err))
214 })?;
215
216 Ok(addresses
217 .ok_or_else(|| io::Error::other("addresses not provided in NameProvider response"))?
218 .into_iter()
219 .map(move |addr| {
220 let fidl_fuchsia_net_ext::IpAddress(addr) = addr.into();
221 SocketAddr::new(addr, port)
222 }))
223}
224
225async fn parse_ip_addr_with_provider(
226 provider: &impl ProviderConnector,
227 host: &str,
228 port: u16,
229) -> Result<Option<SocketAddr>, io::Error> {
230 parse_ip_addr(host, port, |zone_id| async {
231 let proxy = provider.connect()?;
232 let id = proxy
233 .interface_name_to_index(zone_id)
234 .await
235 .map_err(|err| {
236 io::Error::other(format!(
237 "failed to get interface index from socket provider: {}",
238 err
239 ))
240 })?
241 .map_err(|status| zx::Status::from_raw(status).into_io_error())?;
242
243 u32::try_from(id).map_err(|TryFromIntError { .. }| {
245 io::Error::other("interface index too large to convert to scope_id")
246 })
247 })
248 .await
249}
250
251#[cfg(test)]
252mod test {
253 use super::*;
254 use crate::*;
255 use assert_matches::assert_matches;
256 use fidl::endpoints::create_proxy_and_stream;
257 use fidl_fuchsia_net_name::{LookupError, LookupRequest};
258 use fidl_fuchsia_posix_socket::ProviderRequest;
259 use fuchsia_async::net::TcpListener;
260 use fuchsia_async::{self as fasync, LocalExecutor};
261 use futures::prelude::*;
262 use std::cell::RefCell;
263
264 struct PanicConnector;
265
266 impl ProviderConnector for PanicConnector {
267 fn connect(&self) -> Result<ProviderProxy, io::Error> {
268 panic!("should not be trying to talk to the Provider service")
269 }
270 }
271
272 #[test]
273 fn can_create_client() {
274 let _exec = LocalExecutor::default();
275 let _client = new_client();
276 }
277
278 #[test]
279 fn can_create_https_client() {
280 let _exec = LocalExecutor::default();
281 let _client = new_https_client();
282 }
283
284 #[fasync::run_singlethreaded(test)]
285 async fn hyper_connector_sets_tcp_options() {
286 let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0);
287 let listener = TcpListener::bind(&addr).unwrap();
288 let addr = listener.local_addr().unwrap();
289
290 let idle = std::time::Duration::from_secs(36);
291 let interval = std::time::Duration::from_secs(47);
292 let count = 58;
293 let uri = format!("https://{}", addr).parse::<hyper::Uri>().unwrap();
294 let (TcpStream { stream }, _server) = future::try_join(
295 HyperConnector::from_tcp_options(TcpOptions {
296 keepalive_idle: Some(idle),
297 keepalive_interval: Some(interval),
298 keepalive_count: Some(count),
299 ..Default::default()
300 })
301 .call(uri),
302 listener.accept_stream().try_next(),
303 )
304 .await
305 .unwrap();
306
307 let stream = socket2::SockRef::from(stream.std());
308
309 assert_matches!(stream.keepalive(), Ok(v) if v);
310 assert_matches!(stream.keepalive_time(), Ok(v) if v == idle);
311 assert_matches!(stream.keepalive_interval(), Ok(v) if v == interval);
312 assert_matches!(stream.keepalive_retries(), Ok(v) if v == count);
313 }
314
315 #[fasync::run_singlethreaded(test)]
316 async fn test_parse_ipv6_addr_with_provider() {
317 let expected = "fe80::1:2:3:4".parse::<Ipv6Addr>().unwrap();
318
319 assert_matches!(
320 parse_ip_addr_with_provider(&PanicConnector, "[fe80::1:2:3:4%250]", 8080).await,
321 Ok(Some(addr)) if addr == SocketAddr::V6(SocketAddrV6::new(expected, 8080, 0, 0))
322 );
323
324 assert_matches!(
325 parse_ip_addr_with_provider(&PanicConnector, "[fe80::1:2:3:4%252]", 8080).await,
326 Ok(Some(addr)) if addr == SocketAddr::V6(SocketAddrV6::new(expected, 8080, 0, 2))
327 );
328 }
329
330 #[fasync::run_singlethreaded(test)]
331 async fn test_parse_ipv6_addr_with_provider_supports_interface_names() {
332 let connector = RealServiceConnector::new();
333 let expected = "fe80::1:2:3:4".parse::<Ipv6Addr>().unwrap();
334
335 assert_matches!(
336 parse_ip_addr_with_provider(&connector, "[fe80::1:2:3:4%25lo]", 8080).await,
337 Ok(Some(addr)) if addr == SocketAddr::V6(SocketAddrV6::new(expected, 8080, 0, 1))
338 );
339
340 assert_matches!(
341 parse_ip_addr_with_provider(&connector, "[fe80::1:2:3:4%25]", 8080).await,
342 Err(err) if err.kind() == io::ErrorKind::NotFound
343 );
344
345 assert_matches!(
346 parse_ip_addr_with_provider(&connector, "[fe80::1:2:3:4%25unknownif]", 8080).await,
347 Err(err) if err.kind() == io::ErrorKind::NotFound
348 );
349 }
350
351 #[fasync::run_singlethreaded(test)]
352 async fn test_parse_ipv6_addr_handles_connection_errors() {
353 struct ErrorConnector;
354
355 impl ProviderConnector for ErrorConnector {
356 fn connect(&self) -> Result<ProviderProxy, io::Error> {
357 Err(io::Error::other("something bad happened"))
358 }
359 }
360
361 assert_matches!(parse_ip_addr_with_provider(&ErrorConnector, "[fe80::1:2:3:4%25lo]", 8080).await,
362 Err(err) if err.kind() == io::ErrorKind::Other);
363 }
364
365 #[fasync::run_singlethreaded(test)]
366 async fn test_parse_ipv6_addr_handles_large_interface_indices() {
367 let (proxy, mut stream) = create_proxy_and_stream::<ProviderMarker>();
368
369 let provider_fut = async move {
370 while let Some(req) = stream.try_next().await.unwrap_or(None) {
371 match req {
372 ProviderRequest::InterfaceNameToIndex { name: _, responder } => {
373 responder.send(Ok(u64::MAX)).unwrap()
374 }
375 _ => panic!("unexpected request"),
376 }
377 }
378 };
379
380 struct ErrorConnector {
381 proxy: RefCell<Option<ProviderProxy>>,
382 }
383
384 impl ProviderConnector for ErrorConnector {
385 fn connect(&self) -> Result<ProviderProxy, io::Error> {
386 let proxy = self.proxy.borrow_mut().take().unwrap();
387 Ok(proxy)
388 }
389 }
390
391 let connector = ErrorConnector { proxy: RefCell::new(Some(proxy)) };
392
393 let parse_ip_fut = parse_ip_addr_with_provider(&connector, "[fe80::1:2:3:4%25lo]", 8080);
394
395 let ((), res) = future::join(provider_fut, parse_ip_fut).await;
397
398 assert_matches!(res, Err(err) if err.kind() == io::ErrorKind::Other);
399 }
400
401 struct ProxyConnector<T> {
402 proxy: T,
403 }
404
405 impl LookupConnector for ProxyConnector<LookupProxy> {
406 fn connect(&self) -> Result<LookupProxy, io::Error> {
407 Ok(self.proxy.clone())
408 }
409 }
410
411 #[fasync::run_singlethreaded(test)]
412 async fn test_resolve_ip_addr() {
413 let (sender, receiver) =
414 futures::channel::mpsc::unbounded::<Result<LookupResult, LookupError>>();
415 let (proxy, stream) = create_proxy_and_stream::<LookupMarker>();
416 const TEST_HOSTNAME: &'static str = "foobar.com";
417 let name_lookup_fut = stream.zip(receiver).for_each(|(req, rsp)| match req {
418 Ok(LookupRequest::LookupIp { hostname, options, responder }) => {
419 assert_eq!(hostname.as_str(), TEST_HOSTNAME);
420 assert_eq!(
421 options,
422 LookupIpOptions {
423 ipv4_lookup: Some(true),
424 ipv6_lookup: Some(true),
425 sort_addresses: Some(true),
426 ..Default::default()
427 }
428 );
429 let rsp = rsp.as_ref().map_err(|e| *e);
430 futures::future::ready(responder.send(rsp).expect("failed to send FIDL response"))
431 }
432 req => panic!("unexpected item in request stream {:?}", req),
433 });
434
435 let connector = ProxyConnector { proxy };
436
437 let ip_v4 = Ipv4Addr::LOCALHOST.into();
438 let ip_v6 = Ipv6Addr::LOCALHOST.into();
439 const PORT1: u16 = 1234;
440 const PORT2: u16 = 4321;
441
442 let test_fut = async move {
443 type Expectation = Result<Vec<std::net::IpAddr>, (LookupError, io::ErrorKind)>;
446 let test_resolve = |port, expect: Expectation| {
447 let fidl_response = expect
448 .clone()
449 .map(|addrs| LookupResult {
450 addresses: Some(
451 addrs
452 .into_iter()
453 .map(|std| fidl_fuchsia_net_ext::IpAddress(std).into())
454 .collect(),
455 ),
456 ..Default::default()
457 })
458 .map_err(|(fidl_err, _io_err)| fidl_err);
459 let expect = expect
460 .map(|addrs| {
461 addrs.into_iter().map(|addr| SocketAddr::new(addr, port)).collect()
462 })
463 .map_err(|(_fidl_err, io_err)| io_err);
464 let () = sender.unbounded_send(fidl_response).expect("failed to send expectation");
465 resolve_ip_addr(&connector, TEST_HOSTNAME, port)
466 .map_ok(Iterator::collect::<Vec<_>>)
467 .map_err(|err| err.kind())
469 .map(move |result| {
470 assert_eq!(result, expect);
471 })
472 };
473 let () = test_resolve(PORT1, Ok(vec![ip_v4])).await;
474 let () = test_resolve(PORT2, Ok(vec![ip_v6])).await;
475 let () = test_resolve(PORT1, Ok(vec![ip_v4, ip_v6])).await;
476 let () = test_resolve(PORT1, Err((LookupError::NotFound, io::ErrorKind::Other))).await;
477 };
478
479 let ((), ()) = futures::future::join(name_lookup_fut, test_fut).await;
480 }
481}