1use futures::future::{Future, FutureExt};
6use futures::io::{self, AsyncRead, AsyncWrite};
7use futures::task::{Context, Poll};
8use hyper::client::connect::{Connected, Connection};
9use hyper::client::Client;
10use hyper::Body;
11#[cfg(not(target_os = "fuchsia"))]
12use netext::MultithreadedTokioAsyncWrapper;
13use std::marker::PhantomData;
14use std::net::{AddrParseError, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
15use std::num::ParseIntError;
16use std::pin::Pin;
17use tokio::io::ReadBuf;
18
19#[cfg(not(target_os = "fuchsia"))]
20use tokio::net;
21
22#[cfg(target_os = "fuchsia")]
23use fuchsia_async::net;
24
25#[cfg(not(target_os = "fuchsia"))]
26mod not_fuchsia;
27#[cfg(not(target_os = "fuchsia"))]
28pub use not_fuchsia::*;
29
30#[cfg(target_os = "fuchsia")]
31mod fuchsia;
32#[cfg(target_os = "fuchsia")]
33pub use crate::fuchsia::*;
34
35#[cfg(target_os = "fuchsia")]
36mod happy_eyeballs;
37
38pub type HttpClient = Client<HyperConnector, Body>;
40
41pub type HttpsClient = Client<hyper_rustls::HttpsConnector<HyperConnector>, Body>;
43
44pub trait MakeClientBuilder: Sized {
47 fn builder() -> HttpClientBuilder<Self> {
48 HttpClientBuilder::default()
49 }
50}
51
52impl MakeClientBuilder for HttpClient {}
53impl MakeClientBuilder for HttpsClient {}
54
55#[must_use = "futures do nothing unless polled"]
57pub struct HyperConnectorFuture {
58 fut: Pin<Box<dyn Future<Output = Result<TcpStream, io::Error>> + Send>>,
61}
62
63impl Future for HyperConnectorFuture {
64 type Output = Result<TcpStream, io::Error>;
65
66 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
67 self.fut.as_mut().poll(cx)
68 }
69}
70
71pub struct TcpStream {
72 #[cfg(target_os = "fuchsia")]
73 pub stream: net::TcpStream,
74 #[cfg(not(target_os = "fuchsia"))]
75 pub stream: MultithreadedTokioAsyncWrapper<net::TcpStream>,
76}
77
78impl tokio::io::AsyncRead for TcpStream {
79 fn poll_read(
80 mut self: Pin<&mut Self>,
81 cx: &mut Context<'_>,
82 buf: &mut ReadBuf<'_>,
83 ) -> Poll<io::Result<()>> {
84 Pin::new(&mut self.stream).poll_read(cx, buf.initialize_unfilled()).map_ok(|sz| {
85 buf.advance(sz);
86 ()
87 })
88 }
89
90 }
92
93impl tokio::io::AsyncWrite for TcpStream {
94 fn poll_write(
95 mut self: Pin<&mut Self>,
96 cx: &mut Context<'_>,
97 buf: &[u8],
98 ) -> Poll<io::Result<usize>> {
99 Pin::new(&mut self.stream).poll_write(cx, buf)
100 }
101
102 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
103 Pin::new(&mut self.get_mut().stream).poll_flush(cx)
104 }
105
106 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
107 Pin::new(&mut self.get_mut().stream).poll_close(cx)
108 }
109
110 }
112
113impl Connection for TcpStream {
114 fn connected(&self) -> Connected {
115 Connected::new()
116 }
117}
118
119#[non_exhaustive]
120#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
121pub struct TcpOptions {
123 pub keepalive_idle: Option<std::time::Duration>,
125 pub keepalive_interval: Option<std::time::Duration>,
127 pub keepalive_count: Option<u32>,
129}
130
131impl TcpOptions {
132 pub fn keepalive_timeout(dur: std::time::Duration) -> Self {
140 if dur.as_secs() == 0 {
141 return TcpOptions::default();
142 }
143
144 TcpOptions {
145 keepalive_idle: dur.checked_div(2),
146 keepalive_interval: dur.checked_div(6),
147 keepalive_count: Some(3),
148 }
149 }
150
151 pub(crate) fn apply<T: std::os::fd::AsFd>(&self, stream: &T) -> io::Result<()> {
152 let stream = socket2::SockRef::from(stream);
153 let mut any = false;
154 let mut keepalive = socket2::TcpKeepalive::new();
155 if let Some(idle) = self.keepalive_idle {
156 any = true;
157 keepalive = keepalive.with_time(idle);
158 };
159 if let Some(interval) = self.keepalive_interval {
160 any = true;
161 keepalive = keepalive.with_interval(interval);
162 }
163 if let Some(count) = self.keepalive_count {
164 any = true;
165 keepalive = keepalive.with_retries(count);
166 }
167 if any {
168 stream.set_tcp_keepalive(&keepalive)
169 } else {
170 Ok(())
171 }
172 }
173}
174
175#[derive(Clone, Debug, Default)]
178pub struct SocketOptions {
179 pub bind_device: Option<String>,
181}
182
183#[derive(Clone)]
184pub struct Executor;
185
186impl<F: Future + Send + 'static> hyper::rt::Executor<F> for Executor {
187 fn execute(&self, fut: F) {
188 fuchsia_async::Task::spawn(fut.map(|_| ())).detach()
189 }
190}
191
192#[derive(Clone)]
193pub struct LocalExecutor;
194
195#[derive(Clone)]
197pub struct HttpClientBuilder<T> {
198 tcp_options: Option<TcpOptions>,
199 socket_options: Option<SocketOptions>,
200 tls: Option<rustls::ClientConfig>,
201 phantom: PhantomData<T>,
202}
203
204impl<T> Default for HttpClientBuilder<T> {
205 fn default() -> Self {
206 Self {
207 tcp_options: Default::default(),
208 socket_options: Default::default(),
209 tls: Default::default(),
210 phantom: Default::default(),
211 }
212 }
213}
214
215impl HttpClientBuilder<HttpClient> {
216 pub fn build(mut self) -> HttpClient {
218 Client::builder().executor(Executor).build(self.connector())
219 }
220}
221
222impl HttpClientBuilder<HttpsClient> {
223 pub fn build(mut self) -> HttpsClient {
225 let https = hyper_rustls::HttpsConnector::from((
226 self.connector(),
227 self.tls.unwrap_or_else(|| {
228 let root_store = new_root_cert_store();
229 rustls::ClientConfig::builder()
230 .with_safe_defaults()
231 .with_root_certificates(root_store)
232 .with_no_client_auth()
233 }),
234 ));
235 Client::builder().executor(Executor).build(https)
236 }
237
238 pub fn tls(self, tls: rustls::ClientConfig) -> Self {
240 Self { tls: Some(tls), ..self }
241 }
242}
243
244impl<T> HttpClientBuilder<T> {
245 fn connector(&mut self) -> HyperConnector {
246 HyperConnector::from((
247 self.tcp_options.take().unwrap_or_default(),
248 self.socket_options.take().unwrap_or_default(),
249 ))
250 }
251
252 pub fn tcp_options(self, tcp_options: TcpOptions) -> Self {
254 Self { tcp_options: Some(tcp_options), ..self }
255 }
256
257 pub fn socket_options(self, socket_options: SocketOptions) -> Self {
259 Self { socket_options: Some(socket_options), ..self }
260 }
261}
262
263impl<F: Future + 'static> hyper::rt::Executor<F> for LocalExecutor {
264 fn execute(&self, fut: F) {
265 fuchsia_async::Task::local(fut.map(drop)).detach()
266 }
267}
268
269pub fn new_client() -> HttpClient {
271 HttpClient::builder().build()
272}
273
274pub fn new_https_client_dangerous(
275 tls: rustls::ClientConfig,
276 tcp_options: TcpOptions,
277) -> HttpsClient {
278 HttpsClient::builder().tls(tls).tcp_options(tcp_options).build()
279}
280
281pub fn new_https_client_from_tcp_options(tcp_options: TcpOptions) -> HttpsClient {
283 HttpsClient::builder().tcp_options(tcp_options).build()
284}
285
286pub fn new_https_client() -> HttpsClient {
288 HttpsClient::builder().build()
289}
290
291pub(crate) async fn parse_ip_addr<'a, F, Fut>(
292 host: &'a str,
293 port: u16,
294 interface_name_to_index: F,
295) -> Result<Option<SocketAddr>, io::Error>
296where
297 F: Fn(&'a str) -> Fut + 'a,
298 Fut: Future<Output = Result<u32, io::Error>> + 'a,
299{
300 match host.parse::<Ipv4Addr>() {
301 Ok(addr) => {
302 return Ok(Some(SocketAddr::V4(SocketAddrV4::new(addr, port))));
303 }
304 Err(AddrParseError { .. }) => {}
305 }
306
307 if !host.starts_with("[") || !host.ends_with(']') {
309 return Ok(None);
310 }
311
312 let host = &host[1..host.len() - 1];
313
314 let (host, zone_id) = if let Some((host, zone_id)) = host.split_once("%25") {
316 (host, Some(zone_id))
317 } else {
318 (host, None)
319 };
320
321 let addr = match host.parse::<Ipv6Addr>() {
322 Ok(addr) => addr,
323 Err(AddrParseError { .. }) => {
324 return Ok(None);
325 }
326 };
327
328 let scope_id = if let Some(zone_id) = zone_id {
329 if addr.segments()[..4] != [0xfe80, 0, 0, 0] {
350 return Err(io::Error::other("zone_id is only usable with link local addresses"));
351 }
352
353 match zone_id.parse::<u32>() {
355 Ok(scope_id) => scope_id,
356 Err(ParseIntError { .. }) => interface_name_to_index(zone_id).await?,
357 }
358 } else {
359 0
360 };
361
362 Ok(Some(SocketAddr::V6(SocketAddrV6::new(addr, port, 0, scope_id))))
363}
364
365#[cfg(target_os = "fuchsia")]
366pub(crate) fn connect_and_bind_device<D: AsRef<[u8]>>(
367 addr: SocketAddr,
368 bind_device: Option<D>,
369) -> io::Result<net::TcpConnector> {
370 let socket = socket2::Socket::new(
371 match addr {
372 SocketAddr::V4(_) => socket2::Domain::IPV4,
373 SocketAddr::V6(_) => socket2::Domain::IPV6,
374 },
375 socket2::Type::STREAM,
376 Some(socket2::Protocol::TCP),
377 )?;
378 if let Some(bind_device) = bind_device {
379 socket.bind_device(Some(bind_device.as_ref()))?;
380 }
381 net::TcpStream::connect_from_raw(socket, addr)
382}
383
384#[cfg(test)]
385mod tests {
386 use super::*;
387 use assert_matches::assert_matches;
388 use fuchsia_async::{self as fasync};
389
390 async fn unsupported(_name: &str) -> Result<u32, io::Error> {
391 panic!("should not have happened")
392 }
393
394 #[fasync::run_singlethreaded(test)]
395 async fn test_parse_ipv4_addr() {
396 let expected = "1.2.3.4:8080".parse::<SocketAddr>().unwrap();
397 assert_matches!(
398 parse_ip_addr("1.2.3.4", 8080, unsupported).await,
399 Ok(Some(addr)) if addr == expected);
400 }
401
402 #[fasync::run_singlethreaded(test)]
403 async fn test_parse_invalid_addresses() {
404 assert_matches!(parse_ip_addr("1.2.3", 8080, unsupported).await, Ok(None));
405 assert_matches!(parse_ip_addr("1.2.3.4.5", 8080, unsupported).await, Ok(None));
406 assert_matches!(parse_ip_addr("localhost", 8080, unsupported).await, Ok(None));
407 assert_matches!(parse_ip_addr("[fe80::1:2:3:4", 8080, unsupported).await, Ok(None));
408 assert_matches!(parse_ip_addr("[[fe80::1:2:3:4]", 8080, unsupported).await, Ok(None));
409 assert_matches!(parse_ip_addr("[]", 8080, unsupported).await, Ok(None));
410 }
411
412 #[fasync::run_singlethreaded(test)]
413 async fn test_parse_ipv6_addr() {
414 let expected = "[fe80::1:2:3:4]:8080".parse::<SocketAddr>().unwrap();
415 assert_matches!(
416 parse_ip_addr("[fe80::1:2:3:4]", 8080, unsupported).await,
417 Ok(Some(addr)) if addr == expected
418 );
419 }
420
421 #[fasync::run_singlethreaded(test)]
422 async fn test_parse_ipv6_addr_with_zone_must_be_local() {
423 assert_matches!(
424 parse_ip_addr("[fe81::1:2:3:4%252]", 8080, unsupported).await,
425 Err(err) if err.kind() == io::ErrorKind::Other
426 );
427 }
428}