1use futures::future::{Future, FutureExt};
6use futures::io::{self, AsyncRead, AsyncWrite};
7use futures::task::{Context, Poll};
8use hyper::Body;
9use hyper::client::Client;
10use hyper::client::connect::{Connected, Connection};
11#[cfg(not(target_os = "fuchsia"))]
12use netext::MultithreadedTokioAsyncWrapper;
13use std::marker::PhantomData;
14use std::net::{AddrParseError, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
15use std::num::ParseIntError;
16use std::pin::Pin;
17use tokio::io::ReadBuf;
18
19#[cfg(not(target_os = "fuchsia"))]
20use tokio::net;
21
22#[cfg(target_os = "fuchsia")]
23use fidl_fuchsia_posix_socket as fposix_socket;
24
25#[cfg(target_os = "fuchsia")]
26use fuchsia_async::net;
27
28#[cfg(not(target_os = "fuchsia"))]
29mod not_fuchsia;
30#[cfg(not(target_os = "fuchsia"))]
31pub use not_fuchsia::*;
32
33#[cfg(target_os = "fuchsia")]
34mod fuchsia;
35#[cfg(target_os = "fuchsia")]
36pub use crate::fuchsia::*;
37
38#[cfg(target_os = "fuchsia")]
39mod happy_eyeballs;
40
41pub type HttpClient = Client<HyperConnector, Body>;
43
44pub type HttpsClient = Client<hyper_rustls::HttpsConnector<HyperConnector>, Body>;
46
47pub trait MakeClientBuilder: Sized {
50 fn builder() -> HttpClientBuilder<Self> {
51 HttpClientBuilder::default()
52 }
53}
54
55impl MakeClientBuilder for HttpClient {}
56impl MakeClientBuilder for HttpsClient {}
57
58#[must_use = "futures do nothing unless polled"]
60pub struct HyperConnectorFuture {
61 fut: Pin<Box<dyn Future<Output = Result<TcpStream, io::Error>> + Send>>,
64}
65
66impl Future for HyperConnectorFuture {
67 type Output = Result<TcpStream, io::Error>;
68
69 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
70 self.fut.as_mut().poll(cx)
71 }
72}
73
74pub struct TcpStream {
75 #[cfg(target_os = "fuchsia")]
76 pub stream: net::TcpStream,
77 #[cfg(not(target_os = "fuchsia"))]
78 pub stream: MultithreadedTokioAsyncWrapper<net::TcpStream>,
79}
80
81impl tokio::io::AsyncRead for TcpStream {
82 fn poll_read(
83 mut self: Pin<&mut Self>,
84 cx: &mut Context<'_>,
85 buf: &mut ReadBuf<'_>,
86 ) -> Poll<io::Result<()>> {
87 Pin::new(&mut self.stream).poll_read(cx, buf.initialize_unfilled()).map_ok(|sz| {
88 buf.advance(sz);
89 ()
90 })
91 }
92
93 }
95
96impl tokio::io::AsyncWrite for TcpStream {
97 fn poll_write(
98 mut self: Pin<&mut Self>,
99 cx: &mut Context<'_>,
100 buf: &[u8],
101 ) -> Poll<io::Result<usize>> {
102 Pin::new(&mut self.stream).poll_write(cx, buf)
103 }
104
105 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
106 Pin::new(&mut self.get_mut().stream).poll_flush(cx)
107 }
108
109 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
110 Pin::new(&mut self.get_mut().stream).poll_close(cx)
111 }
112
113 }
115
116impl Connection for TcpStream {
117 fn connected(&self) -> Connected {
118 Connected::new()
119 }
120}
121
122#[non_exhaustive]
123#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
124pub struct TcpOptions {
126 pub keepalive_idle: Option<std::time::Duration>,
128 pub keepalive_interval: Option<std::time::Duration>,
130 pub keepalive_count: Option<u32>,
132}
133
134impl TcpOptions {
135 pub fn keepalive_timeout(dur: std::time::Duration) -> Self {
143 if dur.as_secs() == 0 {
144 return TcpOptions::default();
145 }
146
147 TcpOptions {
148 keepalive_idle: dur.checked_div(2),
149 keepalive_interval: dur.checked_div(6),
150 keepalive_count: Some(3),
151 }
152 }
153
154 pub(crate) fn apply<T: std::os::fd::AsFd>(&self, stream: &T) -> io::Result<()> {
155 let stream = socket2::SockRef::from(stream);
156 let mut any = false;
157 let mut keepalive = socket2::TcpKeepalive::new();
158 if let Some(idle) = self.keepalive_idle {
159 any = true;
160 keepalive = keepalive.with_time(idle);
161 };
162 if let Some(interval) = self.keepalive_interval {
163 any = true;
164 keepalive = keepalive.with_interval(interval);
165 }
166 if let Some(count) = self.keepalive_count {
167 any = true;
168 keepalive = keepalive.with_retries(count);
169 }
170 if any { stream.set_tcp_keepalive(&keepalive) } else { Ok(()) }
171 }
172}
173
174#[derive(Clone, Debug, Default)]
177pub struct SocketOptions {
178 pub bind_device: Option<String>,
180}
181
182#[derive(Clone)]
183pub struct Executor;
184
185impl<F: Future + Send + 'static> hyper::rt::Executor<F> for Executor {
186 fn execute(&self, fut: F) {
187 fuchsia_async::Task::spawn(fut.map(|_| ())).detach()
188 }
189}
190
191#[derive(Clone)]
192pub struct LocalExecutor;
193
194#[derive(Clone)]
196pub struct HttpClientBuilder<T> {
197 tcp_options: Option<TcpOptions>,
198 socket_options: Option<SocketOptions>,
199 tls: Option<rustls::ClientConfig>,
200 phantom: PhantomData<T>,
201}
202
203impl<T> Default for HttpClientBuilder<T> {
204 fn default() -> Self {
205 Self {
206 tcp_options: Default::default(),
207 socket_options: Default::default(),
208 tls: Default::default(),
209 phantom: Default::default(),
210 }
211 }
212}
213
214impl HttpClientBuilder<HttpClient> {
215 pub fn build(mut self) -> HttpClient {
217 Client::builder().executor(Executor).build(self.connector())
218 }
219}
220
221impl HttpClientBuilder<HttpsClient> {
222 pub fn build(mut self) -> HttpsClient {
224 let https = hyper_rustls::HttpsConnector::from((
225 self.connector(),
226 self.tls.unwrap_or_else(|| {
227 let root_store = new_root_cert_store();
228 rustls::ClientConfig::builder()
229 .with_safe_defaults()
230 .with_root_certificates(root_store)
231 .with_no_client_auth()
232 }),
233 ));
234 Client::builder().executor(Executor).build(https)
235 }
236
237 pub fn tls(self, tls: rustls::ClientConfig) -> Self {
239 Self { tls: Some(tls), ..self }
240 }
241}
242
243impl<T> HttpClientBuilder<T> {
244 fn connector(&mut self) -> HyperConnector {
245 HyperConnector::from((
246 self.tcp_options.take().unwrap_or_default(),
247 self.socket_options.take().unwrap_or_default(),
248 ))
249 }
250
251 pub fn tcp_options(self, tcp_options: TcpOptions) -> Self {
253 Self { tcp_options: Some(tcp_options), ..self }
254 }
255
256 pub fn socket_options(self, socket_options: SocketOptions) -> Self {
258 Self { socket_options: Some(socket_options), ..self }
259 }
260}
261
262impl<F: Future + 'static> hyper::rt::Executor<F> for LocalExecutor {
263 fn execute(&self, fut: F) {
264 fuchsia_async::Task::local(fut.map(drop)).detach()
265 }
266}
267
268pub fn new_client() -> HttpClient {
270 HttpClient::builder().build()
271}
272
273pub fn new_https_client_dangerous(
274 tls: rustls::ClientConfig,
275 tcp_options: TcpOptions,
276) -> HttpsClient {
277 HttpsClient::builder().tls(tls).tcp_options(tcp_options).build()
278}
279
280pub fn new_https_client_from_tcp_options(tcp_options: TcpOptions) -> HttpsClient {
282 HttpsClient::builder().tcp_options(tcp_options).build()
283}
284
285pub fn new_https_client() -> HttpsClient {
287 HttpsClient::builder().build()
288}
289
290pub(crate) async fn parse_ip_addr<'a, F, Fut>(
291 host: &'a str,
292 port: u16,
293 interface_name_to_index: F,
294) -> Result<Option<SocketAddr>, io::Error>
295where
296 F: Fn(&'a str) -> Fut + 'a,
297 Fut: Future<Output = Result<u32, io::Error>> + 'a,
298{
299 match host.parse::<Ipv4Addr>() {
300 Ok(addr) => {
301 return Ok(Some(SocketAddr::V4(SocketAddrV4::new(addr, port))));
302 }
303 Err(AddrParseError { .. }) => {}
304 }
305
306 if !host.starts_with("[") || !host.ends_with(']') {
308 return Ok(None);
309 }
310
311 let host = &host[1..host.len() - 1];
312
313 let (host, zone_id) = if let Some((host, zone_id)) = host.split_once("%25") {
315 (host, Some(zone_id))
316 } else {
317 (host, None)
318 };
319
320 let addr = match host.parse::<Ipv6Addr>() {
321 Ok(addr) => addr,
322 Err(AddrParseError { .. }) => {
323 return Ok(None);
324 }
325 };
326
327 let scope_id = if let Some(zone_id) = zone_id {
328 if addr.segments()[..4] != [0xfe80, 0, 0, 0] {
349 return Err(io::Error::other("zone_id is only usable with link local addresses"));
350 }
351
352 match zone_id.parse::<u32>() {
354 Ok(scope_id) => scope_id,
355 Err(ParseIntError { .. }) => interface_name_to_index(zone_id).await?,
356 }
357 } else {
358 0
359 };
360
361 Ok(Some(SocketAddr::V6(SocketAddrV6::new(addr, port, 0, scope_id))))
362}
363
364#[cfg(target_os = "fuchsia")]
365pub(crate) fn connect_and_bind_device<D: AsRef<[u8]>, T: ProviderConnector>(
366 provider: &T,
367 addr: SocketAddr,
368 bind_device: Option<D>,
369) -> io::Result<net::TcpConnector> {
370 let socket = stream_socket(
373 provider,
374 match addr {
375 SocketAddr::V4(_) => fposix_socket::Domain::Ipv4,
376 SocketAddr::V6(_) => fposix_socket::Domain::Ipv6,
377 },
378 fposix_socket::StreamSocketProtocol::Tcp,
379 )?;
380 if let Some(bind_device) = bind_device {
381 socket.bind_device(Some(bind_device.as_ref()))?;
382 }
383 net::TcpStream::connect_from_raw(socket, addr)
384}
385
386#[cfg(test)]
387mod tests {
388 use super::*;
389 use assert_matches::assert_matches;
390 use fuchsia_async::{self as fasync};
391
392 async fn unsupported(_name: &str) -> Result<u32, io::Error> {
393 panic!("should not have happened")
394 }
395
396 #[fasync::run_singlethreaded(test)]
397 async fn test_parse_ipv4_addr() {
398 let expected = "1.2.3.4:8080".parse::<SocketAddr>().unwrap();
399 assert_matches!(
400 parse_ip_addr("1.2.3.4", 8080, unsupported).await,
401 Ok(Some(addr)) if addr == expected);
402 }
403
404 #[fasync::run_singlethreaded(test)]
405 async fn test_parse_invalid_addresses() {
406 assert_matches!(parse_ip_addr("1.2.3", 8080, unsupported).await, Ok(None));
407 assert_matches!(parse_ip_addr("1.2.3.4.5", 8080, unsupported).await, Ok(None));
408 assert_matches!(parse_ip_addr("localhost", 8080, unsupported).await, Ok(None));
409 assert_matches!(parse_ip_addr("[fe80::1:2:3:4", 8080, unsupported).await, Ok(None));
410 assert_matches!(parse_ip_addr("[[fe80::1:2:3:4]", 8080, unsupported).await, Ok(None));
411 assert_matches!(parse_ip_addr("[]", 8080, unsupported).await, Ok(None));
412 }
413
414 #[fasync::run_singlethreaded(test)]
415 async fn test_parse_ipv6_addr() {
416 let expected = "[fe80::1:2:3:4]:8080".parse::<SocketAddr>().unwrap();
417 assert_matches!(
418 parse_ip_addr("[fe80::1:2:3:4]", 8080, unsupported).await,
419 Ok(Some(addr)) if addr == expected
420 );
421 }
422
423 #[fasync::run_singlethreaded(test)]
424 async fn test_parse_ipv6_addr_with_zone_must_be_local() {
425 assert_matches!(
426 parse_ip_addr("[fe81::1:2:3:4%252]", 8080, unsupported).await,
427 Err(err) if err.kind() == io::ErrorKind::Other
428 );
429 }
430}