1use std::error::Error as StdError;
2use std::fmt;
3use std::mem;
4use std::time::Duration;
5
6use futures_channel::oneshot;
7use futures_util::future::{self, Either, FutureExt as _, TryFutureExt as _};
8use http::header::{HeaderValue, HOST};
9use http::uri::{Port, Scheme};
10use http::{Method, Request, Response, Uri, Version};
11use tracing::{debug, trace, warn};
12
13use super::conn;
14use super::connect::{self, sealed::Connect, Alpn, Connected, Connection};
15use super::pool::{
16 self, CheckoutIsClosedError, Key as PoolKey, Pool, Poolable, Pooled, Reservation,
17};
18#[cfg(feature = "tcp")]
19use super::HttpConnector;
20use crate::body::{Body, HttpBody};
21use crate::common::{exec::BoxSendFuture, sync_wrapper::SyncWrapper, lazy as hyper_lazy, task, Future, Lazy, Pin, Poll};
22use crate::rt::Executor;
23
24#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
29pub struct Client<C, B = Body> {
30 config: Config,
31 conn_builder: conn::Builder,
32 connector: C,
33 pool: Pool<PoolClient<B>>,
34}
35
36#[derive(Clone, Copy, Debug)]
37struct Config {
38 retry_canceled_requests: bool,
39 set_host: bool,
40 ver: Ver,
41}
42
43#[must_use = "futures do nothing unless polled"]
47pub struct ResponseFuture {
48 inner: SyncWrapper<Pin<Box<dyn Future<Output = crate::Result<Response<Body>>> + Send>>>,
49}
50
51#[cfg(feature = "tcp")]
54impl Client<HttpConnector, Body> {
55 #[cfg_attr(docsrs, doc(cfg(feature = "tcp")))]
63 #[inline]
64 pub fn new() -> Client<HttpConnector, Body> {
65 Builder::default().build_http()
66 }
67}
68
69#[cfg(feature = "tcp")]
70impl Default for Client<HttpConnector, Body> {
71 fn default() -> Client<HttpConnector, Body> {
72 Client::new()
73 }
74}
75
76impl Client<(), Body> {
77 #[inline]
97 pub fn builder() -> Builder {
98 Builder::default()
99 }
100}
101
102impl<C, B> Client<C, B>
103where
104 C: Connect + Clone + Send + Sync + 'static,
105 B: HttpBody + Send + 'static,
106 B::Data: Send,
107 B::Error: Into<Box<dyn StdError + Send + Sync>>,
108{
109 pub fn get(&self, uri: Uri) -> ResponseFuture
131 where
132 B: Default,
133 {
134 let body = B::default();
135 if !body.is_end_stream() {
136 warn!("default HttpBody used for get() does not return true for is_end_stream");
137 }
138
139 let mut req = Request::new(body);
140 *req.uri_mut() = uri;
141 self.request(req)
142 }
143
144 pub fn request(&self, mut req: Request<B>) -> ResponseFuture {
166 let is_http_connect = req.method() == Method::CONNECT;
167 match req.version() {
168 Version::HTTP_11 => (),
169 Version::HTTP_10 => {
170 if is_http_connect {
171 warn!("CONNECT is not allowed for HTTP/1.0");
172 return ResponseFuture::new(future::err(
173 crate::Error::new_user_unsupported_request_method(),
174 ));
175 }
176 }
177 Version::HTTP_2 => (),
178 other => return ResponseFuture::error_version(other),
180 };
181
182 let pool_key = match extract_domain(req.uri_mut(), is_http_connect) {
183 Ok(s) => s,
184 Err(err) => {
185 return ResponseFuture::new(future::err(err));
186 }
187 };
188
189 ResponseFuture::new(self.clone().retryably_send_request(req, pool_key))
190 }
191
192 async fn retryably_send_request(
193 self,
194 mut req: Request<B>,
195 pool_key: PoolKey,
196 ) -> crate::Result<Response<Body>> {
197 let uri = req.uri().clone();
198
199 loop {
200 req = match self.send_request(req, pool_key.clone()).await {
201 Ok(resp) => return Ok(resp),
202 Err(ClientError::Normal(err)) => return Err(err),
203 Err(ClientError::Canceled {
204 connection_reused,
205 mut req,
206 reason,
207 }) => {
208 if !self.config.retry_canceled_requests || !connection_reused {
209 return Err(reason);
212 }
213
214 trace!(
215 "unstarted request canceled, trying again (reason={:?})",
216 reason
217 );
218 *req.uri_mut() = uri.clone();
219 req
220 }
221 }
222 }
223 }
224
225 async fn send_request(
226 &self,
227 mut req: Request<B>,
228 pool_key: PoolKey,
229 ) -> Result<Response<Body>, ClientError<B>> {
230 let mut pooled = match self.connection_for(pool_key).await {
231 Ok(pooled) => pooled,
232 Err(ClientConnectError::Normal(err)) => return Err(ClientError::Normal(err)),
233 Err(ClientConnectError::H2CheckoutIsClosed(reason)) => {
234 return Err(ClientError::Canceled {
235 connection_reused: true,
236 req,
237 reason,
238 })
239 }
240 };
241
242 if pooled.is_http1() {
243 if req.version() == Version::HTTP_2 {
244 warn!("Connection is HTTP/1, but request requires HTTP/2");
245 return Err(ClientError::Normal(
246 crate::Error::new_user_unsupported_version(),
247 ));
248 }
249
250 if self.config.set_host {
251 let uri = req.uri().clone();
252 req.headers_mut().entry(HOST).or_insert_with(|| {
253 let hostname = uri.host().expect("authority implies host");
254 if let Some(port) = get_non_default_port(&uri) {
255 let s = format!("{}:{}", hostname, port);
256 HeaderValue::from_str(&s)
257 } else {
258 HeaderValue::from_str(hostname)
259 }
260 .expect("uri host is valid header value")
261 });
262 }
263
264 if req.method() == Method::CONNECT {
266 authority_form(req.uri_mut());
267 } else if pooled.conn_info.is_proxied {
268 absolute_form(req.uri_mut());
269 } else {
270 origin_form(req.uri_mut());
271 }
272 } else if req.method() == Method::CONNECT {
273 authority_form(req.uri_mut());
274 }
275
276 let fut = pooled
277 .send_request_retryable(req)
278 .map_err(ClientError::map_with_reused(pooled.is_reused()));
279
280 let extra_info = pooled.conn_info.extra.clone();
282 let fut = fut.map_ok(move |mut res| {
283 if let Some(extra) = extra_info {
284 extra.set(res.extensions_mut());
285 }
286 res
287 });
288
289 if pooled.is_closed() {
297 return fut.await;
298 }
299
300 let mut res = fut.await?;
301
302 if pooled.is_http2() || !pooled.is_pool_enabled() || pooled.is_ready() {
313 drop(pooled);
314 } else if !res.body().is_end_stream() {
315 let (delayed_tx, delayed_rx) = oneshot::channel();
316 res.body_mut().delayed_eof(delayed_rx);
317 let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(move |_| {
318 drop(delayed_tx);
321 });
322
323 self.conn_builder.exec.execute(on_idle);
324 } else {
325 let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(|_| ());
328
329 self.conn_builder.exec.execute(on_idle);
330 }
331
332 Ok(res)
333 }
334
335 async fn connection_for(
336 &self,
337 pool_key: PoolKey,
338 ) -> Result<Pooled<PoolClient<B>>, ClientConnectError> {
339 let checkout = self.pool.checkout(pool_key.clone());
353 let connect = self.connect_to(pool_key);
354 let is_ver_h2 = self.config.ver == Ver::Http2;
355
356 match future::select(checkout, connect).await {
359 Either::Left((Ok(checked_out), connecting)) => {
364 if connecting.started() {
372 let bg = connecting
373 .map_err(|err| {
374 trace!("background connect error: {}", err);
375 })
376 .map(|_pooled| {
377 });
380 self.conn_builder.exec.execute(bg);
383 }
384 Ok(checked_out)
385 }
386 Either::Right((Ok(connected), _checkout)) => Ok(connected),
388 Either::Left((Err(err), connecting)) => {
397 if err.is_canceled() {
398 connecting.await.map_err(ClientConnectError::Normal)
399 } else {
400 Err(ClientConnectError::Normal(err))
401 }
402 }
403 Either::Right((Err(err), checkout)) => {
404 if err.is_canceled() {
405 checkout.await.map_err(move |err| {
406 if is_ver_h2
407 && err.is_canceled()
408 && err.find_source::<CheckoutIsClosedError>().is_some()
409 {
410 ClientConnectError::H2CheckoutIsClosed(err)
411 } else {
412 ClientConnectError::Normal(err)
413 }
414 })
415 } else {
416 Err(ClientConnectError::Normal(err))
417 }
418 }
419 }
420 }
421
422 fn connect_to(
423 &self,
424 pool_key: PoolKey,
425 ) -> impl Lazy<Output = crate::Result<Pooled<PoolClient<B>>>> + Unpin {
426 let executor = self.conn_builder.exec.clone();
427 let pool = self.pool.clone();
428 #[cfg(not(feature = "http2"))]
429 let conn_builder = self.conn_builder.clone();
430 #[cfg(feature = "http2")]
431 let mut conn_builder = self.conn_builder.clone();
432 let ver = self.config.ver;
433 let is_ver_h2 = ver == Ver::Http2;
434 let connector = self.connector.clone();
435 let dst = domain_as_uri(pool_key.clone());
436 hyper_lazy(move || {
437 let connecting = match pool.connecting(&pool_key, ver) {
443 Some(lock) => lock,
444 None => {
445 let canceled =
446 crate::Error::new_canceled().with("HTTP/2 connection in progress");
447 return Either::Right(future::err(canceled));
448 }
449 };
450 Either::Left(
451 connector
452 .connect(connect::sealed::Internal, dst)
453 .map_err(crate::Error::new_connect)
454 .and_then(move |io| {
455 let connected = io.connected();
456 let connecting = if connected.alpn == Alpn::H2 && !is_ver_h2 {
460 match connecting.alpn_h2(&pool) {
461 Some(lock) => {
462 trace!("ALPN negotiated h2, updating pool");
463 lock
464 }
465 None => {
466 let canceled = crate::Error::new_canceled()
469 .with("ALPN upgraded to HTTP/2");
470 return Either::Right(future::err(canceled));
471 }
472 }
473 } else {
474 connecting
475 };
476
477 #[cfg_attr(not(feature = "http2"), allow(unused))]
478 let is_h2 = is_ver_h2 || connected.alpn == Alpn::H2;
479 #[cfg(feature = "http2")]
480 {
481 conn_builder.http2_only(is_h2);
482 }
483
484 Either::Left(Box::pin(async move {
485 let (tx, conn) = conn_builder.handshake(io).await?;
486
487 trace!("handshake complete, spawning background dispatcher task");
488 executor.execute(
489 conn.map_err(|e| debug!("client connection error: {}", e))
490 .map(|_| ()),
491 );
492
493 let tx = tx.when_ready().await?;
496
497 let tx = {
498 #[cfg(feature = "http2")]
499 {
500 if is_h2 {
501 PoolTx::Http2(tx.into_http2())
502 } else {
503 PoolTx::Http1(tx)
504 }
505 }
506 #[cfg(not(feature = "http2"))]
507 PoolTx::Http1(tx)
508 };
509
510 Ok(pool.pooled(
511 connecting,
512 PoolClient {
513 conn_info: connected,
514 tx,
515 },
516 ))
517 }))
518 }),
519 )
520 })
521 }
522}
523
524impl<C, B> tower_service::Service<Request<B>> for Client<C, B>
525where
526 C: Connect + Clone + Send + Sync + 'static,
527 B: HttpBody + Send + 'static,
528 B::Data: Send,
529 B::Error: Into<Box<dyn StdError + Send + Sync>>,
530{
531 type Response = Response<Body>;
532 type Error = crate::Error;
533 type Future = ResponseFuture;
534
535 fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
536 Poll::Ready(Ok(()))
537 }
538
539 fn call(&mut self, req: Request<B>) -> Self::Future {
540 self.request(req)
541 }
542}
543
544impl<C, B> tower_service::Service<Request<B>> for &'_ Client<C, B>
545where
546 C: Connect + Clone + Send + Sync + 'static,
547 B: HttpBody + Send + 'static,
548 B::Data: Send,
549 B::Error: Into<Box<dyn StdError + Send + Sync>>,
550{
551 type Response = Response<Body>;
552 type Error = crate::Error;
553 type Future = ResponseFuture;
554
555 fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
556 Poll::Ready(Ok(()))
557 }
558
559 fn call(&mut self, req: Request<B>) -> Self::Future {
560 self.request(req)
561 }
562}
563
564impl<C: Clone, B> Clone for Client<C, B> {
565 fn clone(&self) -> Client<C, B> {
566 Client {
567 config: self.config.clone(),
568 conn_builder: self.conn_builder.clone(),
569 connector: self.connector.clone(),
570 pool: self.pool.clone(),
571 }
572 }
573}
574
575impl<C, B> fmt::Debug for Client<C, B> {
576 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
577 f.debug_struct("Client").finish()
578 }
579}
580
581impl ResponseFuture {
584 fn new<F>(value: F) -> Self
585 where
586 F: Future<Output = crate::Result<Response<Body>>> + Send + 'static,
587 {
588 Self {
589 inner: SyncWrapper::new(Box::pin(value))
590 }
591 }
592
593 fn error_version(ver: Version) -> Self {
594 warn!("Request has unsupported version \"{:?}\"", ver);
595 ResponseFuture::new(Box::pin(future::err(
596 crate::Error::new_user_unsupported_version(),
597 )))
598 }
599}
600
601impl fmt::Debug for ResponseFuture {
602 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
603 f.pad("Future<Response>")
604 }
605}
606
607impl Future for ResponseFuture {
608 type Output = crate::Result<Response<Body>>;
609
610 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
611 self.inner.get_mut().as_mut().poll(cx)
612 }
613}
614
615#[allow(missing_debug_implementations)]
619struct PoolClient<B> {
620 conn_info: Connected,
621 tx: PoolTx<B>,
622}
623
624enum PoolTx<B> {
625 Http1(conn::SendRequest<B>),
626 #[cfg(feature = "http2")]
627 Http2(conn::Http2SendRequest<B>),
628}
629
630impl<B> PoolClient<B> {
631 fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
632 match self.tx {
633 PoolTx::Http1(ref mut tx) => tx.poll_ready(cx),
634 #[cfg(feature = "http2")]
635 PoolTx::Http2(_) => Poll::Ready(Ok(())),
636 }
637 }
638
639 fn is_http1(&self) -> bool {
640 !self.is_http2()
641 }
642
643 fn is_http2(&self) -> bool {
644 match self.tx {
645 PoolTx::Http1(_) => false,
646 #[cfg(feature = "http2")]
647 PoolTx::Http2(_) => true,
648 }
649 }
650
651 fn is_ready(&self) -> bool {
652 match self.tx {
653 PoolTx::Http1(ref tx) => tx.is_ready(),
654 #[cfg(feature = "http2")]
655 PoolTx::Http2(ref tx) => tx.is_ready(),
656 }
657 }
658
659 fn is_closed(&self) -> bool {
660 match self.tx {
661 PoolTx::Http1(ref tx) => tx.is_closed(),
662 #[cfg(feature = "http2")]
663 PoolTx::Http2(ref tx) => tx.is_closed(),
664 }
665 }
666}
667
668impl<B: HttpBody + 'static> PoolClient<B> {
669 fn send_request_retryable(
670 &mut self,
671 req: Request<B>,
672 ) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>>
673 where
674 B: Send,
675 {
676 match self.tx {
677 #[cfg(not(feature = "http2"))]
678 PoolTx::Http1(ref mut tx) => tx.send_request_retryable(req),
679 #[cfg(feature = "http2")]
680 PoolTx::Http1(ref mut tx) => Either::Left(tx.send_request_retryable(req)),
681 #[cfg(feature = "http2")]
682 PoolTx::Http2(ref mut tx) => Either::Right(tx.send_request_retryable(req)),
683 }
684 }
685}
686
687impl<B> Poolable for PoolClient<B>
688where
689 B: Send + 'static,
690{
691 fn is_open(&self) -> bool {
692 match self.tx {
693 PoolTx::Http1(ref tx) => tx.is_ready(),
694 #[cfg(feature = "http2")]
695 PoolTx::Http2(ref tx) => tx.is_ready(),
696 }
697 }
698
699 fn reserve(self) -> Reservation<Self> {
700 match self.tx {
701 PoolTx::Http1(tx) => Reservation::Unique(PoolClient {
702 conn_info: self.conn_info,
703 tx: PoolTx::Http1(tx),
704 }),
705 #[cfg(feature = "http2")]
706 PoolTx::Http2(tx) => {
707 let b = PoolClient {
708 conn_info: self.conn_info.clone(),
709 tx: PoolTx::Http2(tx.clone()),
710 };
711 let a = PoolClient {
712 conn_info: self.conn_info,
713 tx: PoolTx::Http2(tx),
714 };
715 Reservation::Shared(a, b)
716 }
717 }
718 }
719
720 fn can_share(&self) -> bool {
721 self.is_http2()
722 }
723}
724
725#[allow(missing_debug_implementations)]
729enum ClientError<B> {
730 Normal(crate::Error),
731 Canceled {
732 connection_reused: bool,
733 req: Request<B>,
734 reason: crate::Error,
735 },
736}
737
738impl<B> ClientError<B> {
739 fn map_with_reused(conn_reused: bool) -> impl Fn((crate::Error, Option<Request<B>>)) -> Self {
740 move |(err, orig_req)| {
741 if let Some(req) = orig_req {
742 ClientError::Canceled {
743 connection_reused: conn_reused,
744 reason: err,
745 req,
746 }
747 } else {
748 ClientError::Normal(err)
749 }
750 }
751 }
752}
753
754enum ClientConnectError {
755 Normal(crate::Error),
756 H2CheckoutIsClosed(crate::Error),
757}
758
759#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
761pub(super) enum Ver {
762 Auto,
763 Http2,
764}
765
766fn origin_form(uri: &mut Uri) {
767 let path = match uri.path_and_query() {
768 Some(path) if path.as_str() != "/" => {
769 let mut parts = ::http::uri::Parts::default();
770 parts.path_and_query = Some(path.clone());
771 Uri::from_parts(parts).expect("path is valid uri")
772 }
773 _none_or_just_slash => {
774 debug_assert!(Uri::default() == "/");
775 Uri::default()
776 }
777 };
778 *uri = path
779}
780
781fn absolute_form(uri: &mut Uri) {
782 debug_assert!(uri.scheme().is_some(), "absolute_form needs a scheme");
783 debug_assert!(
784 uri.authority().is_some(),
785 "absolute_form needs an authority"
786 );
787 if uri.scheme() == Some(&Scheme::HTTPS) {
791 origin_form(uri);
792 }
793}
794
795fn authority_form(uri: &mut Uri) {
796 if let Some(path) = uri.path_and_query() {
797 if path != "/" {
800 warn!("HTTP/1.1 CONNECT request stripping path: {:?}", path);
801 }
802 }
803 *uri = match uri.authority() {
804 Some(auth) => {
805 let mut parts = ::http::uri::Parts::default();
806 parts.authority = Some(auth.clone());
807 Uri::from_parts(parts).expect("authority is valid")
808 }
809 None => {
810 unreachable!("authority_form with relative uri");
811 }
812 };
813}
814
815fn extract_domain(uri: &mut Uri, is_http_connect: bool) -> crate::Result<PoolKey> {
816 let uri_clone = uri.clone();
817 match (uri_clone.scheme(), uri_clone.authority()) {
818 (Some(scheme), Some(auth)) => Ok((scheme.clone(), auth.clone())),
819 (None, Some(auth)) if is_http_connect => {
820 let scheme = match auth.port_u16() {
821 Some(443) => {
822 set_scheme(uri, Scheme::HTTPS);
823 Scheme::HTTPS
824 }
825 _ => {
826 set_scheme(uri, Scheme::HTTP);
827 Scheme::HTTP
828 }
829 };
830 Ok((scheme, auth.clone()))
831 }
832 _ => {
833 debug!("Client requires absolute-form URIs, received: {:?}", uri);
834 Err(crate::Error::new_user_absolute_uri_required())
835 }
836 }
837}
838
839fn domain_as_uri((scheme, auth): PoolKey) -> Uri {
840 http::uri::Builder::new()
841 .scheme(scheme)
842 .authority(auth)
843 .path_and_query("/")
844 .build()
845 .expect("domain is valid Uri")
846}
847
848fn set_scheme(uri: &mut Uri, scheme: Scheme) {
849 debug_assert!(
850 uri.scheme().is_none(),
851 "set_scheme expects no existing scheme"
852 );
853 let old = mem::replace(uri, Uri::default());
854 let mut parts: ::http::uri::Parts = old.into();
855 parts.scheme = Some(scheme);
856 parts.path_and_query = Some("/".parse().expect("slash is a valid path"));
857 *uri = Uri::from_parts(parts).expect("scheme is valid");
858}
859
860fn get_non_default_port(uri: &Uri) -> Option<Port<&str>> {
861 match (uri.port().map(|p| p.as_u16()), is_schema_secure(uri)) {
862 (Some(443), true) => None,
863 (Some(80), false) => None,
864 _ => uri.port(),
865 }
866}
867
868fn is_schema_secure(uri: &Uri) -> bool {
869 uri.scheme_str()
870 .map(|scheme_str| matches!(scheme_str, "wss" | "https"))
871 .unwrap_or_default()
872}
873
874#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
894#[derive(Clone)]
895pub struct Builder {
896 client_config: Config,
897 conn_builder: conn::Builder,
898 pool_config: pool::Config,
899}
900
901impl Default for Builder {
902 fn default() -> Self {
903 Self {
904 client_config: Config {
905 retry_canceled_requests: true,
906 set_host: true,
907 ver: Ver::Auto,
908 },
909 conn_builder: conn::Builder::new(),
910 pool_config: pool::Config {
911 idle_timeout: Some(Duration::from_secs(90)),
912 max_idle_per_host: std::usize::MAX,
913 },
914 }
915 }
916}
917
918impl Builder {
919 #[doc(hidden)]
920 #[deprecated(
921 note = "name is confusing, to disable the connection pool, call pool_max_idle_per_host(0)"
922 )]
923 pub fn keep_alive(&mut self, val: bool) -> &mut Self {
924 if !val {
925 self.pool_max_idle_per_host(0)
927 } else if self.pool_config.max_idle_per_host == 0 {
928 self.pool_max_idle_per_host(std::usize::MAX)
930 } else {
931 self
933 }
934 }
935
936 #[doc(hidden)]
937 #[deprecated(note = "renamed to `pool_idle_timeout`")]
938 pub fn keep_alive_timeout<D>(&mut self, val: D) -> &mut Self
939 where
940 D: Into<Option<Duration>>,
941 {
942 self.pool_idle_timeout(val)
943 }
944
945 pub fn pool_idle_timeout<D>(&mut self, val: D) -> &mut Self
951 where
952 D: Into<Option<Duration>>,
953 {
954 self.pool_config.idle_timeout = val.into();
955 self
956 }
957
958 #[doc(hidden)]
959 #[deprecated(note = "renamed to `pool_max_idle_per_host`")]
960 pub fn max_idle_per_host(&mut self, max_idle: usize) -> &mut Self {
961 self.pool_config.max_idle_per_host = max_idle;
962 self
963 }
964
965 pub fn pool_max_idle_per_host(&mut self, max_idle: usize) -> &mut Self {
969 self.pool_config.max_idle_per_host = max_idle;
970 self
971 }
972
973 pub fn http1_read_buf_exact_size(&mut self, sz: usize) -> &mut Self {
981 self.conn_builder.http1_read_buf_exact_size(Some(sz));
982 self
983 }
984
985 #[cfg(feature = "http1")]
995 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
996 pub fn http1_max_buf_size(&mut self, max: usize) -> &mut Self {
997 self.conn_builder.http1_max_buf_size(max);
998 self
999 }
1000
1001 pub fn http1_allow_spaces_after_header_name_in_responses(&mut self, val: bool) -> &mut Self {
1024 self.conn_builder
1025 .http1_allow_spaces_after_header_name_in_responses(val);
1026 self
1027 }
1028
1029 pub fn http1_allow_obsolete_multiline_headers_in_responses(&mut self, val: bool) -> &mut Self {
1061 self.conn_builder
1062 .http1_allow_obsolete_multiline_headers_in_responses(val);
1063 self
1064 }
1065
1066 pub fn http1_writev(&mut self, enabled: bool) -> &mut Builder {
1079 self.conn_builder.http1_writev(enabled);
1080 self
1081 }
1082
1083 pub fn http1_title_case_headers(&mut self, val: bool) -> &mut Self {
1090 self.conn_builder.http1_title_case_headers(val);
1091 self
1092 }
1093
1094 pub fn http1_preserve_header_case(&mut self, val: bool) -> &mut Self {
1108 self.conn_builder.http1_preserve_header_case(val);
1109 self
1110 }
1111
1112 pub fn http09_responses(&mut self, val: bool) -> &mut Self {
1116 self.conn_builder.http09_responses(val);
1117 self
1118 }
1119
1120 #[cfg(feature = "http2")]
1131 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1132 pub fn http2_only(&mut self, val: bool) -> &mut Self {
1133 self.client_config.ver = if val { Ver::Http2 } else { Ver::Auto };
1134 self
1135 }
1136
1137 #[cfg(feature = "http2")]
1146 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1147 pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
1148 self.conn_builder
1149 .http2_initial_stream_window_size(sz.into());
1150 self
1151 }
1152
1153 #[cfg(feature = "http2")]
1159 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1160 pub fn http2_initial_connection_window_size(
1161 &mut self,
1162 sz: impl Into<Option<u32>>,
1163 ) -> &mut Self {
1164 self.conn_builder
1165 .http2_initial_connection_window_size(sz.into());
1166 self
1167 }
1168
1169 #[cfg(feature = "http2")]
1175 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1176 pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
1177 self.conn_builder.http2_adaptive_window(enabled);
1178 self
1179 }
1180
1181 #[cfg(feature = "http2")]
1187 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1188 pub fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
1189 self.conn_builder.http2_max_frame_size(sz);
1190 self
1191 }
1192
1193 #[cfg(feature = "runtime")]
1204 #[cfg(feature = "http2")]
1205 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1206 pub fn http2_keep_alive_interval(
1207 &mut self,
1208 interval: impl Into<Option<Duration>>,
1209 ) -> &mut Self {
1210 self.conn_builder.http2_keep_alive_interval(interval);
1211 self
1212 }
1213
1214 #[cfg(feature = "runtime")]
1225 #[cfg(feature = "http2")]
1226 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1227 pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
1228 self.conn_builder.http2_keep_alive_timeout(timeout);
1229 self
1230 }
1231
1232 #[cfg(feature = "runtime")]
1245 #[cfg(feature = "http2")]
1246 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1247 pub fn http2_keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self {
1248 self.conn_builder.http2_keep_alive_while_idle(enabled);
1249 self
1250 }
1251
1252 #[cfg(feature = "http2")]
1261 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1262 pub fn http2_max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
1263 self.conn_builder.http2_max_concurrent_reset_streams(max);
1264 self
1265 }
1266
1267 #[cfg(feature = "http2")]
1275 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1276 pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self {
1277 self.conn_builder.http2_max_send_buf_size(max);
1278 self
1279 }
1280
1281 #[inline]
1293 pub fn retry_canceled_requests(&mut self, val: bool) -> &mut Self {
1294 self.client_config.retry_canceled_requests = val;
1295 self
1296 }
1297
1298 #[inline]
1305 pub fn set_host(&mut self, val: bool) -> &mut Self {
1306 self.client_config.set_host = val;
1307 self
1308 }
1309
1310 pub fn executor<E>(&mut self, exec: E) -> &mut Self
1312 where
1313 E: Executor<BoxSendFuture> + Send + Sync + 'static,
1314 {
1315 self.conn_builder.executor(exec);
1316 self
1317 }
1318
1319 #[cfg(feature = "tcp")]
1321 pub fn build_http<B>(&self) -> Client<HttpConnector, B>
1322 where
1323 B: HttpBody + Send,
1324 B::Data: Send,
1325 {
1326 let mut connector = HttpConnector::new();
1327 if self.pool_config.is_enabled() {
1328 connector.set_keepalive(self.pool_config.idle_timeout);
1329 }
1330 self.build(connector)
1331 }
1332
1333 pub fn build<C, B>(&self, connector: C) -> Client<C, B>
1335 where
1336 C: Connect + Clone,
1337 B: HttpBody + Send,
1338 B::Data: Send,
1339 {
1340 Client {
1341 config: self.client_config,
1342 conn_builder: self.conn_builder.clone(),
1343 connector,
1344 pool: Pool::new(self.pool_config, &self.conn_builder.exec),
1345 }
1346 }
1347}
1348
1349impl fmt::Debug for Builder {
1350 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1351 f.debug_struct("Builder")
1352 .field("client_config", &self.client_config)
1353 .field("conn_builder", &self.conn_builder)
1354 .field("pool_config", &self.pool_config)
1355 .finish()
1356 }
1357}
1358
1359#[cfg(test)]
1360mod unit_tests {
1361 use super::*;
1362
1363 #[test]
1364 fn response_future_is_sync() {
1365 fn assert_sync<T: Sync>() {}
1366 assert_sync::<ResponseFuture>();
1367 }
1368
1369 #[test]
1370 fn set_relative_uri_with_implicit_path() {
1371 let mut uri = "http://hyper.rs".parse().unwrap();
1372 origin_form(&mut uri);
1373 assert_eq!(uri.to_string(), "/");
1374 }
1375
1376 #[test]
1377 fn test_origin_form() {
1378 let mut uri = "http://hyper.rs/guides".parse().unwrap();
1379 origin_form(&mut uri);
1380 assert_eq!(uri.to_string(), "/guides");
1381
1382 let mut uri = "http://hyper.rs/guides?foo=bar".parse().unwrap();
1383 origin_form(&mut uri);
1384 assert_eq!(uri.to_string(), "/guides?foo=bar");
1385 }
1386
1387 #[test]
1388 fn test_absolute_form() {
1389 let mut uri = "http://hyper.rs/guides".parse().unwrap();
1390 absolute_form(&mut uri);
1391 assert_eq!(uri.to_string(), "http://hyper.rs/guides");
1392
1393 let mut uri = "https://hyper.rs/guides".parse().unwrap();
1394 absolute_form(&mut uri);
1395 assert_eq!(uri.to_string(), "/guides");
1396 }
1397
1398 #[test]
1399 fn test_authority_form() {
1400 let _ = pretty_env_logger::try_init();
1401
1402 let mut uri = "http://hyper.rs".parse().unwrap();
1403 authority_form(&mut uri);
1404 assert_eq!(uri.to_string(), "hyper.rs");
1405
1406 let mut uri = "hyper.rs".parse().unwrap();
1407 authority_form(&mut uri);
1408 assert_eq!(uri.to_string(), "hyper.rs");
1409 }
1410
1411 #[test]
1412 fn test_extract_domain_connect_no_port() {
1413 let mut uri = "hyper.rs".parse().unwrap();
1414 let (scheme, host) = extract_domain(&mut uri, true).expect("extract domain");
1415 assert_eq!(scheme, *"http");
1416 assert_eq!(host, "hyper.rs");
1417 }
1418
1419 #[test]
1420 fn test_is_secure() {
1421 assert_eq!(
1422 is_schema_secure(&"http://hyper.rs".parse::<Uri>().unwrap()),
1423 false
1424 );
1425 assert_eq!(is_schema_secure(&"hyper.rs".parse::<Uri>().unwrap()), false);
1426 assert_eq!(
1427 is_schema_secure(&"wss://hyper.rs".parse::<Uri>().unwrap()),
1428 true
1429 );
1430 assert_eq!(
1431 is_schema_secure(&"ws://hyper.rs".parse::<Uri>().unwrap()),
1432 false
1433 );
1434 }
1435
1436 #[test]
1437 fn test_get_non_default_port() {
1438 assert!(get_non_default_port(&"http://hyper.rs".parse::<Uri>().unwrap()).is_none());
1439 assert!(get_non_default_port(&"http://hyper.rs:80".parse::<Uri>().unwrap()).is_none());
1440 assert!(get_non_default_port(&"https://hyper.rs:443".parse::<Uri>().unwrap()).is_none());
1441 assert!(get_non_default_port(&"hyper.rs:80".parse::<Uri>().unwrap()).is_none());
1442
1443 assert_eq!(
1444 get_non_default_port(&"http://hyper.rs:123".parse::<Uri>().unwrap())
1445 .unwrap()
1446 .as_u16(),
1447 123
1448 );
1449 assert_eq!(
1450 get_non_default_port(&"https://hyper.rs:80".parse::<Uri>().unwrap())
1451 .unwrap()
1452 .as_u16(),
1453 80
1454 );
1455 assert_eq!(
1456 get_non_default_port(&"hyper.rs:123".parse::<Uri>().unwrap())
1457 .unwrap()
1458 .as_u16(),
1459 123
1460 );
1461 }
1462}