hyper/client/
conn.rs

1//! Lower-level client connection API.
2//!
3//! The types in this module are to provide a lower-level API based around a
4//! single connection. Connecting to a host, pooling connections, and the like
5//! are not handled at this level. This module provides the building blocks to
6//! customize those things externally.
7//!
8//! If don't have need to manage connections yourself, consider using the
9//! higher-level [Client](super) API.
10//!
11//! ## Example
12//! A simple example that uses the `SendRequest` struct to talk HTTP over a Tokio TCP stream
13//! ```no_run
14//! # #[cfg(all(feature = "client", feature = "http1", feature = "runtime"))]
15//! # mod rt {
16//! use tower::ServiceExt;
17//! use http::{Request, StatusCode};
18//! use hyper::{client::conn, Body};
19//! use tokio::net::TcpStream;
20//!
21//! #[tokio::main]
22//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
23//!     let target_stream = TcpStream::connect("example.com:80").await?;
24//!
25//!     let (mut request_sender, connection) = conn::handshake(target_stream).await?;
26//!
27//!     // spawn a task to poll the connection and drive the HTTP state
28//!     tokio::spawn(async move {
29//!         if let Err(e) = connection.await {
30//!             eprintln!("Error in connection: {}", e);
31//!         }
32//!     });
33//!
34//!     let request = Request::builder()
35//!         // We need to manually add the host header because SendRequest does not
36//!         .header("Host", "example.com")
37//!         .method("GET")
38//!         .body(Body::from(""))?;
39//!     let response = request_sender.send_request(request).await?;
40//!     assert!(response.status() == StatusCode::OK);
41//!
42//!     // To send via the same connection again, it may not work as it may not be ready,
43//!     // so we have to wait until the request_sender becomes ready.
44//!     request_sender.ready().await?;
45//!     let request = Request::builder()
46//!         .header("Host", "example.com")
47//!         .method("GET")
48//!         .body(Body::from(""))?;
49//!     let response = request_sender.send_request(request).await?;
50//!     assert!(response.status() == StatusCode::OK);
51//!     Ok(())
52//! }
53//!
54//! # }
55//! ```
56
57use std::error::Error as StdError;
58use std::fmt;
59#[cfg(not(all(feature = "http1", feature = "http2")))]
60use std::marker::PhantomData;
61use std::sync::Arc;
62#[cfg(all(feature = "runtime", feature = "http2"))]
63use std::time::Duration;
64
65use bytes::Bytes;
66use futures_util::future::{self, Either, FutureExt as _};
67use httparse::ParserConfig;
68use pin_project_lite::pin_project;
69use tokio::io::{AsyncRead, AsyncWrite};
70use tower_service::Service;
71use tracing::{debug, trace};
72
73use super::dispatch;
74use crate::body::HttpBody;
75#[cfg(not(all(feature = "http1", feature = "http2")))]
76use crate::common::Never;
77use crate::common::{
78    exec::{BoxSendFuture, Exec},
79    task, Future, Pin, Poll,
80};
81use crate::proto;
82use crate::rt::Executor;
83#[cfg(feature = "http1")]
84use crate::upgrade::Upgraded;
85use crate::{Body, Request, Response};
86
87#[cfg(feature = "http1")]
88type Http1Dispatcher<T, B> =
89    proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, proto::h1::ClientTransaction>;
90
91#[cfg(not(feature = "http1"))]
92type Http1Dispatcher<T, B> = (Never, PhantomData<(T, Pin<Box<B>>)>);
93
94#[cfg(feature = "http2")]
95type Http2ClientTask<B> = proto::h2::ClientTask<B>;
96
97#[cfg(not(feature = "http2"))]
98type Http2ClientTask<B> = (Never, PhantomData<Pin<Box<B>>>);
99
100pin_project! {
101    #[project = ProtoClientProj]
102    enum ProtoClient<T, B>
103    where
104        B: HttpBody,
105    {
106        H1 {
107            #[pin]
108            h1: Http1Dispatcher<T, B>,
109        },
110        H2 {
111            #[pin]
112            h2: Http2ClientTask<B>,
113        },
114    }
115}
116
117/// Returns a handshake future over some IO.
118///
119/// This is a shortcut for `Builder::new().handshake(io)`.
120/// See [`client::conn`](crate::client::conn) for more.
121pub async fn handshake<T>(
122    io: T,
123) -> crate::Result<(SendRequest<crate::Body>, Connection<T, crate::Body>)>
124where
125    T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
126{
127    Builder::new().handshake(io).await
128}
129
130/// The sender side of an established connection.
131pub struct SendRequest<B> {
132    dispatch: dispatch::Sender<Request<B>, Response<Body>>,
133}
134
135/// A future that processes all HTTP state for the IO object.
136///
137/// In most cases, this should just be spawned into an executor, so that it
138/// can process incoming and outgoing messages, notice hangups, and the like.
139#[must_use = "futures do nothing unless polled"]
140pub struct Connection<T, B>
141where
142    T: AsyncRead + AsyncWrite + Send + 'static,
143    B: HttpBody + 'static,
144{
145    inner: Option<ProtoClient<T, B>>,
146}
147
148/// A builder to configure an HTTP connection.
149///
150/// After setting options, the builder is used to create a handshake future.
151#[derive(Clone, Debug)]
152pub struct Builder {
153    pub(super) exec: Exec,
154    h09_responses: bool,
155    h1_parser_config: ParserConfig,
156    h1_writev: Option<bool>,
157    h1_title_case_headers: bool,
158    h1_preserve_header_case: bool,
159    #[cfg(feature = "ffi")]
160    h1_preserve_header_order: bool,
161    h1_read_buf_exact_size: Option<usize>,
162    h1_max_buf_size: Option<usize>,
163    #[cfg(feature = "ffi")]
164    h1_headers_raw: bool,
165    #[cfg(feature = "http2")]
166    h2_builder: proto::h2::client::Config,
167    version: Proto,
168}
169
170#[derive(Clone, Debug)]
171enum Proto {
172    #[cfg(feature = "http1")]
173    Http1,
174    #[cfg(feature = "http2")]
175    Http2,
176}
177
178/// A future returned by `SendRequest::send_request`.
179///
180/// Yields a `Response` if successful.
181#[must_use = "futures do nothing unless polled"]
182pub struct ResponseFuture {
183    inner: ResponseFutureState,
184}
185
186enum ResponseFutureState {
187    Waiting(dispatch::Promise<Response<Body>>),
188    // Option is to be able to `take()` it in `poll`
189    Error(Option<crate::Error>),
190}
191
192/// Deconstructed parts of a `Connection`.
193///
194/// This allows taking apart a `Connection` at a later time, in order to
195/// reclaim the IO object, and additional related pieces.
196#[derive(Debug)]
197pub struct Parts<T> {
198    /// The original IO object used in the handshake.
199    pub io: T,
200    /// A buffer of bytes that have been read but not processed as HTTP.
201    ///
202    /// For instance, if the `Connection` is used for an HTTP upgrade request,
203    /// it is possible the server sent back the first bytes of the new protocol
204    /// along with the response upgrade.
205    ///
206    /// You will want to check for any existing bytes if you plan to continue
207    /// communicating on the IO object.
208    pub read_buf: Bytes,
209    _inner: (),
210}
211
212// ========== internal client api
213
214// A `SendRequest` that can be cloned to send HTTP2 requests.
215// private for now, probably not a great idea of a type...
216#[must_use = "futures do nothing unless polled"]
217#[cfg(feature = "http2")]
218pub(super) struct Http2SendRequest<B> {
219    dispatch: dispatch::UnboundedSender<Request<B>, Response<Body>>,
220}
221
222// ===== impl SendRequest
223
224impl<B> SendRequest<B> {
225    /// Polls to determine whether this sender can be used yet for a request.
226    ///
227    /// If the associated connection is closed, this returns an Error.
228    pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
229        self.dispatch.poll_ready(cx)
230    }
231
232    pub(super) async fn when_ready(self) -> crate::Result<Self> {
233        let mut me = Some(self);
234        future::poll_fn(move |cx| {
235            ready!(me.as_mut().unwrap().poll_ready(cx))?;
236            Poll::Ready(Ok(me.take().unwrap()))
237        })
238        .await
239    }
240
241    pub(super) fn is_ready(&self) -> bool {
242        self.dispatch.is_ready()
243    }
244
245    pub(super) fn is_closed(&self) -> bool {
246        self.dispatch.is_closed()
247    }
248
249    #[cfg(feature = "http2")]
250    pub(super) fn into_http2(self) -> Http2SendRequest<B> {
251        Http2SendRequest {
252            dispatch: self.dispatch.unbound(),
253        }
254    }
255}
256
257impl<B> SendRequest<B>
258where
259    B: HttpBody + 'static,
260{
261    /// Sends a `Request` on the associated connection.
262    ///
263    /// Returns a future that if successful, yields the `Response`.
264    ///
265    /// # Note
266    ///
267    /// There are some key differences in what automatic things the `Client`
268    /// does for you that will not be done here:
269    ///
270    /// - `Client` requires absolute-form `Uri`s, since the scheme and
271    ///   authority are needed to connect. They aren't required here.
272    /// - Since the `Client` requires absolute-form `Uri`s, it can add
273    ///   the `Host` header based on it. You must add a `Host` header yourself
274    ///   before calling this method.
275    /// - Since absolute-form `Uri`s are not required, if received, they will
276    ///   be serialized as-is.
277    ///
278    /// # Example
279    ///
280    /// ```
281    /// # use http::header::HOST;
282    /// # use hyper::client::conn::SendRequest;
283    /// # use hyper::Body;
284    /// use hyper::Request;
285    ///
286    /// # async fn doc(mut tx: SendRequest<Body>) -> hyper::Result<()> {
287    /// // build a Request
288    /// let req = Request::builder()
289    ///     .uri("/foo/bar")
290    ///     .header(HOST, "hyper.rs")
291    ///     .body(Body::empty())
292    ///     .unwrap();
293    ///
294    /// // send it and await a Response
295    /// let res = tx.send_request(req).await?;
296    /// // assert the Response
297    /// assert!(res.status().is_success());
298    /// # Ok(())
299    /// # }
300    /// # fn main() {}
301    /// ```
302    pub fn send_request(&mut self, req: Request<B>) -> ResponseFuture {
303        let inner = match self.dispatch.send(req) {
304            Ok(rx) => ResponseFutureState::Waiting(rx),
305            Err(_req) => {
306                debug!("connection was not ready");
307                let err = crate::Error::new_canceled().with("connection was not ready");
308                ResponseFutureState::Error(Some(err))
309            }
310        };
311
312        ResponseFuture { inner }
313    }
314
315    pub(super) fn send_request_retryable(
316        &mut self,
317        req: Request<B>,
318    ) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>> + Unpin
319    where
320        B: Send,
321    {
322        match self.dispatch.try_send(req) {
323            Ok(rx) => {
324                Either::Left(rx.then(move |res| {
325                    match res {
326                        Ok(Ok(res)) => future::ok(res),
327                        Ok(Err(err)) => future::err(err),
328                        // this is definite bug if it happens, but it shouldn't happen!
329                        Err(_) => panic!("dispatch dropped without returning error"),
330                    }
331                }))
332            }
333            Err(req) => {
334                debug!("connection was not ready");
335                let err = crate::Error::new_canceled().with("connection was not ready");
336                Either::Right(future::err((err, Some(req))))
337            }
338        }
339    }
340}
341
342impl<B> Service<Request<B>> for SendRequest<B>
343where
344    B: HttpBody + 'static,
345{
346    type Response = Response<Body>;
347    type Error = crate::Error;
348    type Future = ResponseFuture;
349
350    fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
351        self.poll_ready(cx)
352    }
353
354    fn call(&mut self, req: Request<B>) -> Self::Future {
355        self.send_request(req)
356    }
357}
358
359impl<B> fmt::Debug for SendRequest<B> {
360    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
361        f.debug_struct("SendRequest").finish()
362    }
363}
364
365// ===== impl Http2SendRequest
366
367#[cfg(feature = "http2")]
368impl<B> Http2SendRequest<B> {
369    pub(super) fn is_ready(&self) -> bool {
370        self.dispatch.is_ready()
371    }
372
373    pub(super) fn is_closed(&self) -> bool {
374        self.dispatch.is_closed()
375    }
376}
377
378#[cfg(feature = "http2")]
379impl<B> Http2SendRequest<B>
380where
381    B: HttpBody + 'static,
382{
383    pub(super) fn send_request_retryable(
384        &mut self,
385        req: Request<B>,
386    ) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>>
387    where
388        B: Send,
389    {
390        match self.dispatch.try_send(req) {
391            Ok(rx) => {
392                Either::Left(rx.then(move |res| {
393                    match res {
394                        Ok(Ok(res)) => future::ok(res),
395                        Ok(Err(err)) => future::err(err),
396                        // this is definite bug if it happens, but it shouldn't happen!
397                        Err(_) => panic!("dispatch dropped without returning error"),
398                    }
399                }))
400            }
401            Err(req) => {
402                debug!("connection was not ready");
403                let err = crate::Error::new_canceled().with("connection was not ready");
404                Either::Right(future::err((err, Some(req))))
405            }
406        }
407    }
408}
409
410#[cfg(feature = "http2")]
411impl<B> fmt::Debug for Http2SendRequest<B> {
412    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
413        f.debug_struct("Http2SendRequest").finish()
414    }
415}
416
417#[cfg(feature = "http2")]
418impl<B> Clone for Http2SendRequest<B> {
419    fn clone(&self) -> Self {
420        Http2SendRequest {
421            dispatch: self.dispatch.clone(),
422        }
423    }
424}
425
426// ===== impl Connection
427
428impl<T, B> Connection<T, B>
429where
430    T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
431    B: HttpBody + Unpin + Send + 'static,
432    B::Data: Send,
433    B::Error: Into<Box<dyn StdError + Send + Sync>>,
434{
435    /// Return the inner IO object, and additional information.
436    ///
437    /// Only works for HTTP/1 connections. HTTP/2 connections will panic.
438    pub fn into_parts(self) -> Parts<T> {
439        match self.inner.expect("already upgraded") {
440            #[cfg(feature = "http1")]
441            ProtoClient::H1 { h1 } => {
442                let (io, read_buf, _) = h1.into_inner();
443                Parts {
444                    io,
445                    read_buf,
446                    _inner: (),
447                }
448            }
449            ProtoClient::H2 { .. } => {
450                panic!("http2 cannot into_inner");
451            }
452
453            #[cfg(not(feature = "http1"))]
454            ProtoClient::H1 { h1 } => match h1.0 {},
455        }
456    }
457
458    /// Poll the connection for completion, but without calling `shutdown`
459    /// on the underlying IO.
460    ///
461    /// This is useful to allow running a connection while doing an HTTP
462    /// upgrade. Once the upgrade is completed, the connection would be "done",
463    /// but it is not desired to actually shutdown the IO object. Instead you
464    /// would take it back using `into_parts`.
465    ///
466    /// Use [`poll_fn`](https://docs.rs/futures/0.1.25/futures/future/fn.poll_fn.html)
467    /// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html)
468    /// to work with this function; or use the `without_shutdown` wrapper.
469    pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
470        match *self.inner.as_mut().expect("already upgraded") {
471            #[cfg(feature = "http1")]
472            ProtoClient::H1 { ref mut h1 } => h1.poll_without_shutdown(cx),
473            #[cfg(feature = "http2")]
474            ProtoClient::H2 { ref mut h2, .. } => Pin::new(h2).poll(cx).map_ok(|_| ()),
475
476            #[cfg(not(feature = "http1"))]
477            ProtoClient::H1 { ref mut h1 } => match h1.0 {},
478            #[cfg(not(feature = "http2"))]
479            ProtoClient::H2 { ref mut h2, .. } => match h2.0 {},
480        }
481    }
482
483    /// Prevent shutdown of the underlying IO object at the end of service the request,
484    /// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`.
485    pub fn without_shutdown(self) -> impl Future<Output = crate::Result<Parts<T>>> {
486        let mut conn = Some(self);
487        future::poll_fn(move |cx| -> Poll<crate::Result<Parts<T>>> {
488            ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?;
489            Poll::Ready(Ok(conn.take().unwrap().into_parts()))
490        })
491    }
492
493    /// Returns whether the [extended CONNECT protocol][1] is enabled or not.
494    ///
495    /// This setting is configured by the server peer by sending the
496    /// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame.
497    /// This method returns the currently acknowledged value received from the
498    /// remote.
499    ///
500    /// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
501    /// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3
502    #[cfg(feature = "http2")]
503    pub fn http2_is_extended_connect_protocol_enabled(&self) -> bool {
504        match self.inner.as_ref().unwrap() {
505            ProtoClient::H1 { .. } => false,
506            ProtoClient::H2 { h2 } => h2.is_extended_connect_protocol_enabled(),
507        }
508    }
509}
510
511impl<T, B> Future for Connection<T, B>
512where
513    T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
514    B: HttpBody + Send + 'static,
515    B::Data: Send,
516    B::Error: Into<Box<dyn StdError + Send + Sync>>,
517{
518    type Output = crate::Result<()>;
519
520    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
521        match ready!(Pin::new(self.inner.as_mut().unwrap()).poll(cx))? {
522            proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
523            #[cfg(feature = "http1")]
524            proto::Dispatched::Upgrade(pending) => match self.inner.take() {
525                Some(ProtoClient::H1 { h1 }) => {
526                    let (io, buf, _) = h1.into_inner();
527                    pending.fulfill(Upgraded::new(io, buf));
528                    Poll::Ready(Ok(()))
529                }
530                _ => {
531                    drop(pending);
532                    unreachable!("Upgrade expects h1");
533                }
534            },
535        }
536    }
537}
538
539impl<T, B> fmt::Debug for Connection<T, B>
540where
541    T: AsyncRead + AsyncWrite + fmt::Debug + Send + 'static,
542    B: HttpBody + 'static,
543{
544    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
545        f.debug_struct("Connection").finish()
546    }
547}
548
549// ===== impl Builder
550
551impl Builder {
552    /// Creates a new connection builder.
553    #[inline]
554    pub fn new() -> Builder {
555        Builder {
556            exec: Exec::Default,
557            h09_responses: false,
558            h1_writev: None,
559            h1_read_buf_exact_size: None,
560            h1_parser_config: Default::default(),
561            h1_title_case_headers: false,
562            h1_preserve_header_case: false,
563            #[cfg(feature = "ffi")]
564            h1_preserve_header_order: false,
565            h1_max_buf_size: None,
566            #[cfg(feature = "ffi")]
567            h1_headers_raw: false,
568            #[cfg(feature = "http2")]
569            h2_builder: Default::default(),
570            #[cfg(feature = "http1")]
571            version: Proto::Http1,
572            #[cfg(not(feature = "http1"))]
573            version: Proto::Http2,
574        }
575    }
576
577    /// Provide an executor to execute background HTTP2 tasks.
578    pub fn executor<E>(&mut self, exec: E) -> &mut Builder
579    where
580        E: Executor<BoxSendFuture> + Send + Sync + 'static,
581    {
582        self.exec = Exec::Executor(Arc::new(exec));
583        self
584    }
585
586    /// Set whether HTTP/0.9 responses should be tolerated.
587    ///
588    /// Default is false.
589    pub fn http09_responses(&mut self, enabled: bool) -> &mut Builder {
590        self.h09_responses = enabled;
591        self
592    }
593
594    /// Set whether HTTP/1 connections will accept spaces between header names
595    /// and the colon that follow them in responses.
596    ///
597    /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
598    /// to say about it:
599    ///
600    /// > No whitespace is allowed between the header field-name and colon. In
601    /// > the past, differences in the handling of such whitespace have led to
602    /// > security vulnerabilities in request routing and response handling. A
603    /// > server MUST reject any received request message that contains
604    /// > whitespace between a header field-name and colon with a response code
605    /// > of 400 (Bad Request). A proxy MUST remove any such whitespace from a
606    /// > response message before forwarding the message downstream.
607    ///
608    /// Note that this setting does not affect HTTP/2.
609    ///
610    /// Default is false.
611    ///
612    /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
613    pub fn http1_allow_spaces_after_header_name_in_responses(
614        &mut self,
615        enabled: bool,
616    ) -> &mut Builder {
617        self.h1_parser_config
618            .allow_spaces_after_header_name_in_responses(enabled);
619        self
620    }
621
622    /// Set whether HTTP/1 connections will accept obsolete line folding for
623    /// header values.
624    ///
625    /// Newline codepoints (`\r` and `\n`) will be transformed to spaces when
626    /// parsing.
627    ///
628    /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
629    /// to say about it:
630    ///
631    /// > A server that receives an obs-fold in a request message that is not
632    /// > within a message/http container MUST either reject the message by
633    /// > sending a 400 (Bad Request), preferably with a representation
634    /// > explaining that obsolete line folding is unacceptable, or replace
635    /// > each received obs-fold with one or more SP octets prior to
636    /// > interpreting the field value or forwarding the message downstream.
637    ///
638    /// > A proxy or gateway that receives an obs-fold in a response message
639    /// > that is not within a message/http container MUST either discard the
640    /// > message and replace it with a 502 (Bad Gateway) response, preferably
641    /// > with a representation explaining that unacceptable line folding was
642    /// > received, or replace each received obs-fold with one or more SP
643    /// > octets prior to interpreting the field value or forwarding the
644    /// > message downstream.
645    ///
646    /// > A user agent that receives an obs-fold in a response message that is
647    /// > not within a message/http container MUST replace each received
648    /// > obs-fold with one or more SP octets prior to interpreting the field
649    /// > value.
650    ///
651    /// Note that this setting does not affect HTTP/2.
652    ///
653    /// Default is false.
654    ///
655    /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
656    pub fn http1_allow_obsolete_multiline_headers_in_responses(
657        &mut self,
658        enabled: bool,
659    ) -> &mut Builder {
660        self.h1_parser_config
661            .allow_obsolete_multiline_headers_in_responses(enabled);
662        self
663    }
664
665    /// Set whether HTTP/1 connections should try to use vectored writes,
666    /// or always flatten into a single buffer.
667    ///
668    /// Note that setting this to false may mean more copies of body data,
669    /// but may also improve performance when an IO transport doesn't
670    /// support vectored writes well, such as most TLS implementations.
671    ///
672    /// Setting this to true will force hyper to use queued strategy
673    /// which may eliminate unnecessary cloning on some TLS backends
674    ///
675    /// Default is `auto`. In this mode hyper will try to guess which
676    /// mode to use
677    pub fn http1_writev(&mut self, enabled: bool) -> &mut Builder {
678        self.h1_writev = Some(enabled);
679        self
680    }
681
682    /// Set whether HTTP/1 connections will write header names as title case at
683    /// the socket level.
684    ///
685    /// Note that this setting does not affect HTTP/2.
686    ///
687    /// Default is false.
688    pub fn http1_title_case_headers(&mut self, enabled: bool) -> &mut Builder {
689        self.h1_title_case_headers = enabled;
690        self
691    }
692
693    /// Set whether to support preserving original header cases.
694    ///
695    /// Currently, this will record the original cases received, and store them
696    /// in a private extension on the `Response`. It will also look for and use
697    /// such an extension in any provided `Request`.
698    ///
699    /// Since the relevant extension is still private, there is no way to
700    /// interact with the original cases. The only effect this can have now is
701    /// to forward the cases in a proxy-like fashion.
702    ///
703    /// Note that this setting does not affect HTTP/2.
704    ///
705    /// Default is false.
706    pub fn http1_preserve_header_case(&mut self, enabled: bool) -> &mut Builder {
707        self.h1_preserve_header_case = enabled;
708        self
709    }
710
711    /// Set whether to support preserving original header order.
712    ///
713    /// Currently, this will record the order in which headers are received, and store this
714    /// ordering in a private extension on the `Response`. It will also look for and use
715    /// such an extension in any provided `Request`.
716    ///
717    /// Note that this setting does not affect HTTP/2.
718    ///
719    /// Default is false.
720    #[cfg(feature = "ffi")]
721    pub fn http1_preserve_header_order(&mut self, enabled: bool) -> &mut Builder {
722        self.h1_preserve_header_order = enabled;
723        self
724    }
725
726    /// Sets the exact size of the read buffer to *always* use.
727    ///
728    /// Note that setting this option unsets the `http1_max_buf_size` option.
729    ///
730    /// Default is an adaptive read buffer.
731    pub fn http1_read_buf_exact_size(&mut self, sz: Option<usize>) -> &mut Builder {
732        self.h1_read_buf_exact_size = sz;
733        self.h1_max_buf_size = None;
734        self
735    }
736
737    /// Set the maximum buffer size for the connection.
738    ///
739    /// Default is ~400kb.
740    ///
741    /// Note that setting this option unsets the `http1_read_exact_buf_size` option.
742    ///
743    /// # Panics
744    ///
745    /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
746    #[cfg(feature = "http1")]
747    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
748    pub fn http1_max_buf_size(&mut self, max: usize) -> &mut Self {
749        assert!(
750            max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE,
751            "the max_buf_size cannot be smaller than the minimum that h1 specifies."
752        );
753
754        self.h1_max_buf_size = Some(max);
755        self.h1_read_buf_exact_size = None;
756        self
757    }
758
759    #[cfg(feature = "ffi")]
760    pub(crate) fn http1_headers_raw(&mut self, enabled: bool) -> &mut Self {
761        self.h1_headers_raw = enabled;
762        self
763    }
764
765    /// Sets whether HTTP2 is required.
766    ///
767    /// Default is false.
768    #[cfg(feature = "http2")]
769    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
770    pub fn http2_only(&mut self, enabled: bool) -> &mut Builder {
771        if enabled {
772            self.version = Proto::Http2
773        }
774        self
775    }
776
777    /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
778    /// stream-level flow control.
779    ///
780    /// Passing `None` will do nothing.
781    ///
782    /// If not set, hyper will use a default.
783    ///
784    /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
785    #[cfg(feature = "http2")]
786    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
787    pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
788        if let Some(sz) = sz.into() {
789            self.h2_builder.adaptive_window = false;
790            self.h2_builder.initial_stream_window_size = sz;
791        }
792        self
793    }
794
795    /// Sets the max connection-level flow control for HTTP2
796    ///
797    /// Passing `None` will do nothing.
798    ///
799    /// If not set, hyper will use a default.
800    #[cfg(feature = "http2")]
801    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
802    pub fn http2_initial_connection_window_size(
803        &mut self,
804        sz: impl Into<Option<u32>>,
805    ) -> &mut Self {
806        if let Some(sz) = sz.into() {
807            self.h2_builder.adaptive_window = false;
808            self.h2_builder.initial_conn_window_size = sz;
809        }
810        self
811    }
812
813    /// Sets whether to use an adaptive flow control.
814    ///
815    /// Enabling this will override the limits set in
816    /// `http2_initial_stream_window_size` and
817    /// `http2_initial_connection_window_size`.
818    #[cfg(feature = "http2")]
819    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
820    pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
821        use proto::h2::SPEC_WINDOW_SIZE;
822
823        self.h2_builder.adaptive_window = enabled;
824        if enabled {
825            self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE;
826            self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE;
827        }
828        self
829    }
830
831    /// Sets the maximum frame size to use for HTTP2.
832    ///
833    /// Passing `None` will do nothing.
834    ///
835    /// If not set, hyper will use a default.
836    #[cfg(feature = "http2")]
837    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
838    pub fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
839        if let Some(sz) = sz.into() {
840            self.h2_builder.max_frame_size = sz;
841        }
842        self
843    }
844
845    /// Sets an interval for HTTP2 Ping frames should be sent to keep a
846    /// connection alive.
847    ///
848    /// Pass `None` to disable HTTP2 keep-alive.
849    ///
850    /// Default is currently disabled.
851    ///
852    /// # Cargo Feature
853    ///
854    /// Requires the `runtime` cargo feature to be enabled.
855    #[cfg(feature = "runtime")]
856    #[cfg(feature = "http2")]
857    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
858    pub fn http2_keep_alive_interval(
859        &mut self,
860        interval: impl Into<Option<Duration>>,
861    ) -> &mut Self {
862        self.h2_builder.keep_alive_interval = interval.into();
863        self
864    }
865
866    /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
867    ///
868    /// If the ping is not acknowledged within the timeout, the connection will
869    /// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
870    ///
871    /// Default is 20 seconds.
872    ///
873    /// # Cargo Feature
874    ///
875    /// Requires the `runtime` cargo feature to be enabled.
876    #[cfg(feature = "runtime")]
877    #[cfg(feature = "http2")]
878    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
879    pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
880        self.h2_builder.keep_alive_timeout = timeout;
881        self
882    }
883
884    /// Sets whether HTTP2 keep-alive should apply while the connection is idle.
885    ///
886    /// If disabled, keep-alive pings are only sent while there are open
887    /// request/responses streams. If enabled, pings are also sent when no
888    /// streams are active. Does nothing if `http2_keep_alive_interval` is
889    /// disabled.
890    ///
891    /// Default is `false`.
892    ///
893    /// # Cargo Feature
894    ///
895    /// Requires the `runtime` cargo feature to be enabled.
896    #[cfg(feature = "runtime")]
897    #[cfg(feature = "http2")]
898    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
899    pub fn http2_keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self {
900        self.h2_builder.keep_alive_while_idle = enabled;
901        self
902    }
903
904    /// Sets the maximum number of HTTP2 concurrent locally reset streams.
905    ///
906    /// See the documentation of [`h2::client::Builder::max_concurrent_reset_streams`] for more
907    /// details.
908    ///
909    /// The default value is determined by the `h2` crate.
910    ///
911    /// [`h2::client::Builder::max_concurrent_reset_streams`]: https://docs.rs/h2/client/struct.Builder.html#method.max_concurrent_reset_streams
912    #[cfg(feature = "http2")]
913    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
914    pub fn http2_max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
915        self.h2_builder.max_concurrent_reset_streams = Some(max);
916        self
917    }
918
919    /// Set the maximum write buffer size for each HTTP/2 stream.
920    ///
921    /// Default is currently 1MB, but may change.
922    ///
923    /// # Panics
924    ///
925    /// The value must be no larger than `u32::MAX`.
926    #[cfg(feature = "http2")]
927    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
928    pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self {
929        assert!(max <= std::u32::MAX as usize);
930        self.h2_builder.max_send_buffer_size = max;
931        self
932    }
933
934    /// Constructs a connection with the configured options and IO.
935    /// See [`client::conn`](crate::client::conn) for more.
936    ///
937    /// Note, if [`Connection`] is not `await`-ed, [`SendRequest`] will
938    /// do nothing.
939    pub fn handshake<T, B>(
940        &self,
941        io: T,
942    ) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>>
943    where
944        T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
945        B: HttpBody + 'static,
946        B::Data: Send,
947        B::Error: Into<Box<dyn StdError + Send + Sync>>,
948    {
949        let opts = self.clone();
950
951        async move {
952            trace!("client handshake {:?}", opts.version);
953
954            let (tx, rx) = dispatch::channel();
955            let proto = match opts.version {
956                #[cfg(feature = "http1")]
957                Proto::Http1 => {
958                    let mut conn = proto::Conn::new(io);
959                    conn.set_h1_parser_config(opts.h1_parser_config);
960                    if let Some(writev) = opts.h1_writev {
961                        if writev {
962                            conn.set_write_strategy_queue();
963                        } else {
964                            conn.set_write_strategy_flatten();
965                        }
966                    }
967                    if opts.h1_title_case_headers {
968                        conn.set_title_case_headers();
969                    }
970                    if opts.h1_preserve_header_case {
971                        conn.set_preserve_header_case();
972                    }
973                    #[cfg(feature = "ffi")]
974                    if opts.h1_preserve_header_order {
975                        conn.set_preserve_header_order();
976                    }
977                    if opts.h09_responses {
978                        conn.set_h09_responses();
979                    }
980
981                    #[cfg(feature = "ffi")]
982                    conn.set_raw_headers(opts.h1_headers_raw);
983
984                    if let Some(sz) = opts.h1_read_buf_exact_size {
985                        conn.set_read_buf_exact_size(sz);
986                    }
987                    if let Some(max) = opts.h1_max_buf_size {
988                        conn.set_max_buf_size(max);
989                    }
990                    let cd = proto::h1::dispatch::Client::new(rx);
991                    let dispatch = proto::h1::Dispatcher::new(cd, conn);
992                    ProtoClient::H1 { h1: dispatch }
993                }
994                #[cfg(feature = "http2")]
995                Proto::Http2 => {
996                    let h2 =
997                        proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec.clone())
998                            .await?;
999                    ProtoClient::H2 { h2 }
1000                }
1001            };
1002
1003            Ok((
1004                SendRequest { dispatch: tx },
1005                Connection { inner: Some(proto) },
1006            ))
1007        }
1008    }
1009}
1010
1011// ===== impl ResponseFuture
1012
1013impl Future for ResponseFuture {
1014    type Output = crate::Result<Response<Body>>;
1015
1016    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1017        match self.inner {
1018            ResponseFutureState::Waiting(ref mut rx) => {
1019                Pin::new(rx).poll(cx).map(|res| match res {
1020                    Ok(Ok(resp)) => Ok(resp),
1021                    Ok(Err(err)) => Err(err),
1022                    // this is definite bug if it happens, but it shouldn't happen!
1023                    Err(_canceled) => panic!("dispatch dropped without returning error"),
1024                })
1025            }
1026            ResponseFutureState::Error(ref mut err) => {
1027                Poll::Ready(Err(err.take().expect("polled after ready")))
1028            }
1029        }
1030    }
1031}
1032
1033impl fmt::Debug for ResponseFuture {
1034    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1035        f.debug_struct("ResponseFuture").finish()
1036    }
1037}
1038
1039// ===== impl ProtoClient
1040
1041impl<T, B> Future for ProtoClient<T, B>
1042where
1043    T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
1044    B: HttpBody + Send + 'static,
1045    B::Data: Send,
1046    B::Error: Into<Box<dyn StdError + Send + Sync>>,
1047{
1048    type Output = crate::Result<proto::Dispatched>;
1049
1050    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1051        match self.project() {
1052            #[cfg(feature = "http1")]
1053            ProtoClientProj::H1 { h1 } => h1.poll(cx),
1054            #[cfg(feature = "http2")]
1055            ProtoClientProj::H2 { h2, .. } => h2.poll(cx),
1056
1057            #[cfg(not(feature = "http1"))]
1058            ProtoClientProj::H1 { h1 } => match h1.0 {},
1059            #[cfg(not(feature = "http2"))]
1060            ProtoClientProj::H2 { h2, .. } => match h2.0 {},
1061        }
1062    }
1063}
1064
1065// assert trait markers
1066
1067trait AssertSend: Send {}
1068trait AssertSendSync: Send + Sync {}
1069
1070#[doc(hidden)]
1071impl<B: Send> AssertSendSync for SendRequest<B> {}
1072
1073#[doc(hidden)]
1074impl<T: Send, B: Send> AssertSend for Connection<T, B>
1075where
1076    T: AsyncRead + AsyncWrite + Send + 'static,
1077    B: HttpBody + 'static,
1078    B::Data: Send,
1079{
1080}
1081
1082#[doc(hidden)]
1083impl<T: Send + Sync, B: Send + Sync> AssertSendSync for Connection<T, B>
1084where
1085    T: AsyncRead + AsyncWrite + Send + 'static,
1086    B: HttpBody + 'static,
1087    B::Data: Send + Sync + 'static,
1088{
1089}
1090
1091#[doc(hidden)]
1092impl AssertSendSync for Builder {}
1093
1094#[doc(hidden)]
1095impl AssertSend for ResponseFuture {}