hyper/server/
conn.rs

1//! Lower-level Server connection API.
2//!
3//! The types in this module are to provide a lower-level API based around a
4//! single connection. Accepting a connection and binding it with a service
5//! are not handled at this level. This module provides the building blocks to
6//! customize those things externally.
7//!
8//! If you don't have need to manage connections yourself, consider using the
9//! higher-level [Server](super) API.
10//!
11//! ## Example
12//! A simple example that uses the `Http` struct to talk HTTP over a Tokio TCP stream
13//! ```no_run
14//! # #[cfg(all(feature = "http1", feature = "runtime"))]
15//! # mod rt {
16//! use http::{Request, Response, StatusCode};
17//! use hyper::{server::conn::Http, service::service_fn, Body};
18//! use std::{net::SocketAddr, convert::Infallible};
19//! use tokio::net::TcpListener;
20//!
21//! #[tokio::main]
22//! async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
23//!     let addr: SocketAddr = ([127, 0, 0, 1], 8080).into();
24//!
25//!     let mut tcp_listener = TcpListener::bind(addr).await?;
26//!     loop {
27//!         let (tcp_stream, _) = tcp_listener.accept().await?;
28//!         tokio::task::spawn(async move {
29//!             if let Err(http_err) = Http::new()
30//!                     .http1_only(true)
31//!                     .http1_keep_alive(true)
32//!                     .serve_connection(tcp_stream, service_fn(hello))
33//!                     .await {
34//!                 eprintln!("Error while serving HTTP connection: {}", http_err);
35//!             }
36//!         });
37//!     }
38//! }
39//!
40//! async fn hello(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
41//!    Ok(Response::new(Body::from("Hello World!")))
42//! }
43//! # }
44//! ```
45
46#[cfg(all(
47    any(feature = "http1", feature = "http2"),
48    not(all(feature = "http1", feature = "http2"))
49))]
50use std::marker::PhantomData;
51#[cfg(all(any(feature = "http1", feature = "http2"), feature = "runtime"))]
52use std::time::Duration;
53
54#[cfg(feature = "http2")]
55use crate::common::io::Rewind;
56#[cfg(all(feature = "http1", feature = "http2"))]
57use crate::error::{Kind, Parse};
58#[cfg(feature = "http1")]
59use crate::upgrade::Upgraded;
60
61cfg_feature! {
62    #![any(feature = "http1", feature = "http2")]
63
64    use std::error::Error as StdError;
65    use std::fmt;
66
67    use bytes::Bytes;
68    use pin_project_lite::pin_project;
69    use tokio::io::{AsyncRead, AsyncWrite};
70    use tracing::trace;
71
72    pub use super::server::Connecting;
73    use crate::body::{Body, HttpBody};
74    use crate::common::{task, Future, Pin, Poll, Unpin};
75    #[cfg(not(all(feature = "http1", feature = "http2")))]
76    use crate::common::Never;
77    use crate::common::exec::{ConnStreamExec, Exec};
78    use crate::proto;
79    use crate::service::HttpService;
80
81    pub(super) use self::upgrades::UpgradeableConnection;
82}
83
84#[cfg(feature = "tcp")]
85pub use super::tcp::{AddrIncoming, AddrStream};
86
87/// A lower-level configuration of the HTTP protocol.
88///
89/// This structure is used to configure options for an HTTP server connection.
90///
91/// If you don't have need to manage connections yourself, consider using the
92/// higher-level [Server](super) API.
93#[derive(Clone, Debug)]
94#[cfg(any(feature = "http1", feature = "http2"))]
95#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
96pub struct Http<E = Exec> {
97    pub(crate) exec: E,
98    h1_half_close: bool,
99    h1_keep_alive: bool,
100    h1_title_case_headers: bool,
101    h1_preserve_header_case: bool,
102    #[cfg(all(feature = "http1", feature = "runtime"))]
103    h1_header_read_timeout: Option<Duration>,
104    h1_writev: Option<bool>,
105    #[cfg(feature = "http2")]
106    h2_builder: proto::h2::server::Config,
107    mode: ConnectionMode,
108    max_buf_size: Option<usize>,
109    pipeline_flush: bool,
110}
111
112/// The internal mode of HTTP protocol which indicates the behavior when a parse error occurs.
113#[cfg(any(feature = "http1", feature = "http2"))]
114#[derive(Clone, Debug, PartialEq)]
115enum ConnectionMode {
116    /// Always use HTTP/1 and do not upgrade when a parse error occurs.
117    #[cfg(feature = "http1")]
118    H1Only,
119    /// Always use HTTP/2.
120    #[cfg(feature = "http2")]
121    H2Only,
122    /// Use HTTP/1 and try to upgrade to h2 when a parse error occurs.
123    #[cfg(all(feature = "http1", feature = "http2"))]
124    Fallback,
125}
126
127#[cfg(any(feature = "http1", feature = "http2"))]
128pin_project! {
129    /// A future binding a connection with a Service.
130    ///
131    /// Polling this future will drive HTTP forward.
132    #[must_use = "futures do nothing unless polled"]
133    #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
134    pub struct Connection<T, S, E = Exec>
135    where
136        S: HttpService<Body>,
137    {
138        pub(super) conn: Option<ProtoServer<T, S::ResBody, S, E>>,
139        fallback: Fallback<E>,
140    }
141}
142
143#[cfg(feature = "http1")]
144type Http1Dispatcher<T, B, S> =
145    proto::h1::Dispatcher<proto::h1::dispatch::Server<S, Body>, B, T, proto::ServerTransaction>;
146
147#[cfg(all(not(feature = "http1"), feature = "http2"))]
148type Http1Dispatcher<T, B, S> = (Never, PhantomData<(T, Box<Pin<B>>, Box<Pin<S>>)>);
149
150#[cfg(feature = "http2")]
151type Http2Server<T, B, S, E> = proto::h2::Server<Rewind<T>, S, B, E>;
152
153#[cfg(all(not(feature = "http2"), feature = "http1"))]
154type Http2Server<T, B, S, E> = (
155    Never,
156    PhantomData<(T, Box<Pin<S>>, Box<Pin<B>>, Box<Pin<E>>)>,
157);
158
159#[cfg(any(feature = "http1", feature = "http2"))]
160pin_project! {
161    #[project = ProtoServerProj]
162    pub(super) enum ProtoServer<T, B, S, E = Exec>
163    where
164        S: HttpService<Body>,
165        B: HttpBody,
166    {
167        H1 {
168            #[pin]
169            h1: Http1Dispatcher<T, B, S>,
170        },
171        H2 {
172            #[pin]
173            h2: Http2Server<T, B, S, E>,
174        },
175    }
176}
177
178#[cfg(all(feature = "http1", feature = "http2"))]
179#[derive(Clone, Debug)]
180enum Fallback<E> {
181    ToHttp2(proto::h2::server::Config, E),
182    Http1Only,
183}
184
185#[cfg(all(
186    any(feature = "http1", feature = "http2"),
187    not(all(feature = "http1", feature = "http2"))
188))]
189type Fallback<E> = PhantomData<E>;
190
191#[cfg(all(feature = "http1", feature = "http2"))]
192impl<E> Fallback<E> {
193    fn to_h2(&self) -> bool {
194        match *self {
195            Fallback::ToHttp2(..) => true,
196            Fallback::Http1Only => false,
197        }
198    }
199}
200
201#[cfg(all(feature = "http1", feature = "http2"))]
202impl<E> Unpin for Fallback<E> {}
203
204/// Deconstructed parts of a `Connection`.
205///
206/// This allows taking apart a `Connection` at a later time, in order to
207/// reclaim the IO object, and additional related pieces.
208#[derive(Debug)]
209#[cfg(any(feature = "http1", feature = "http2"))]
210#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
211pub struct Parts<T, S> {
212    /// The original IO object used in the handshake.
213    pub io: T,
214    /// A buffer of bytes that have been read but not processed as HTTP.
215    ///
216    /// If the client sent additional bytes after its last request, and
217    /// this connection "ended" with an upgrade, the read buffer will contain
218    /// those bytes.
219    ///
220    /// You will want to check for any existing bytes if you plan to continue
221    /// communicating on the IO object.
222    pub read_buf: Bytes,
223    /// The `Service` used to serve this connection.
224    pub service: S,
225    _inner: (),
226}
227
228// ===== impl Http =====
229
230#[cfg(any(feature = "http1", feature = "http2"))]
231impl Http {
232    /// Creates a new instance of the HTTP protocol, ready to spawn a server or
233    /// start accepting connections.
234    pub fn new() -> Http {
235        Http {
236            exec: Exec::Default,
237            h1_half_close: false,
238            h1_keep_alive: true,
239            h1_title_case_headers: false,
240            h1_preserve_header_case: false,
241            #[cfg(all(feature = "http1", feature = "runtime"))]
242            h1_header_read_timeout: None,
243            h1_writev: None,
244            #[cfg(feature = "http2")]
245            h2_builder: Default::default(),
246            mode: ConnectionMode::default(),
247            max_buf_size: None,
248            pipeline_flush: false,
249        }
250    }
251}
252
253#[cfg(any(feature = "http1", feature = "http2"))]
254impl<E> Http<E> {
255    /// Sets whether HTTP1 is required.
256    ///
257    /// Default is false
258    #[cfg(feature = "http1")]
259    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
260    pub fn http1_only(&mut self, val: bool) -> &mut Self {
261        if val {
262            self.mode = ConnectionMode::H1Only;
263        } else {
264            #[cfg(feature = "http2")]
265            {
266                self.mode = ConnectionMode::Fallback;
267            }
268        }
269        self
270    }
271
272    /// Set whether HTTP/1 connections should support half-closures.
273    ///
274    /// Clients can chose to shutdown their write-side while waiting
275    /// for the server to respond. Setting this to `true` will
276    /// prevent closing the connection immediately if `read`
277    /// detects an EOF in the middle of a request.
278    ///
279    /// Default is `false`.
280    #[cfg(feature = "http1")]
281    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
282    pub fn http1_half_close(&mut self, val: bool) -> &mut Self {
283        self.h1_half_close = val;
284        self
285    }
286
287    /// Enables or disables HTTP/1 keep-alive.
288    ///
289    /// Default is true.
290    #[cfg(feature = "http1")]
291    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
292    pub fn http1_keep_alive(&mut self, val: bool) -> &mut Self {
293        self.h1_keep_alive = val;
294        self
295    }
296
297    /// Set whether HTTP/1 connections will write header names as title case at
298    /// the socket level.
299    ///
300    /// Note that this setting does not affect HTTP/2.
301    ///
302    /// Default is false.
303    #[cfg(feature = "http1")]
304    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
305    pub fn http1_title_case_headers(&mut self, enabled: bool) -> &mut Self {
306        self.h1_title_case_headers = enabled;
307        self
308    }
309
310    /// Set whether to support preserving original header cases.
311    ///
312    /// Currently, this will record the original cases received, and store them
313    /// in a private extension on the `Request`. It will also look for and use
314    /// such an extension in any provided `Response`.
315    ///
316    /// Since the relevant extension is still private, there is no way to
317    /// interact with the original cases. The only effect this can have now is
318    /// to forward the cases in a proxy-like fashion.
319    ///
320    /// Note that this setting does not affect HTTP/2.
321    ///
322    /// Default is false.
323    #[cfg(feature = "http1")]
324    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
325    pub fn http1_preserve_header_case(&mut self, enabled: bool) -> &mut Self {
326        self.h1_preserve_header_case = enabled;
327        self
328    }
329
330    /// Set a timeout for reading client request headers. If a client does not 
331    /// transmit the entire header within this time, the connection is closed.
332    ///
333    /// Default is None.
334    #[cfg(all(feature = "http1", feature = "runtime"))]
335    #[cfg_attr(docsrs, doc(cfg(all(feature = "http1", feature = "runtime"))))]
336    pub fn http1_header_read_timeout(&mut self, read_timeout: Duration) -> &mut Self {
337        self.h1_header_read_timeout = Some(read_timeout);
338        self
339    }
340
341    /// Set whether HTTP/1 connections should try to use vectored writes,
342    /// or always flatten into a single buffer.
343    ///
344    /// Note that setting this to false may mean more copies of body data,
345    /// but may also improve performance when an IO transport doesn't
346    /// support vectored writes well, such as most TLS implementations.
347    ///
348    /// Setting this to true will force hyper to use queued strategy
349    /// which may eliminate unnecessary cloning on some TLS backends
350    ///
351    /// Default is `auto`. In this mode hyper will try to guess which
352    /// mode to use
353    #[inline]
354    #[cfg(feature = "http1")]
355    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
356    pub fn http1_writev(&mut self, val: bool) -> &mut Self {
357        self.h1_writev = Some(val);
358        self
359    }
360
361    /// Sets whether HTTP2 is required.
362    ///
363    /// Default is false
364    #[cfg(feature = "http2")]
365    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
366    pub fn http2_only(&mut self, val: bool) -> &mut Self {
367        if val {
368            self.mode = ConnectionMode::H2Only;
369        } else {
370            #[cfg(feature = "http1")]
371            {
372                self.mode = ConnectionMode::Fallback;
373            }
374        }
375        self
376    }
377
378    /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
379    /// stream-level flow control.
380    ///
381    /// Passing `None` will do nothing.
382    ///
383    /// If not set, hyper will use a default.
384    ///
385    /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
386    #[cfg(feature = "http2")]
387    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
388    pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
389        if let Some(sz) = sz.into() {
390            self.h2_builder.adaptive_window = false;
391            self.h2_builder.initial_stream_window_size = sz;
392        }
393        self
394    }
395
396    /// Sets the max connection-level flow control for HTTP2.
397    ///
398    /// Passing `None` will do nothing.
399    ///
400    /// If not set, hyper will use a default.
401    #[cfg(feature = "http2")]
402    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
403    pub fn http2_initial_connection_window_size(
404        &mut self,
405        sz: impl Into<Option<u32>>,
406    ) -> &mut Self {
407        if let Some(sz) = sz.into() {
408            self.h2_builder.adaptive_window = false;
409            self.h2_builder.initial_conn_window_size = sz;
410        }
411        self
412    }
413
414    /// Sets whether to use an adaptive flow control.
415    ///
416    /// Enabling this will override the limits set in
417    /// `http2_initial_stream_window_size` and
418    /// `http2_initial_connection_window_size`.
419    #[cfg(feature = "http2")]
420    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
421    pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
422        use proto::h2::SPEC_WINDOW_SIZE;
423
424        self.h2_builder.adaptive_window = enabled;
425        if enabled {
426            self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE;
427            self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE;
428        }
429        self
430    }
431
432    /// Sets the maximum frame size to use for HTTP2.
433    ///
434    /// Passing `None` will do nothing.
435    ///
436    /// If not set, hyper will use a default.
437    #[cfg(feature = "http2")]
438    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
439    pub fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
440        if let Some(sz) = sz.into() {
441            self.h2_builder.max_frame_size = sz;
442        }
443        self
444    }
445
446    /// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2
447    /// connections.
448    ///
449    /// Default is no limit (`std::u32::MAX`). Passing `None` will do nothing.
450    ///
451    /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS
452    #[cfg(feature = "http2")]
453    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
454    pub fn http2_max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self {
455        self.h2_builder.max_concurrent_streams = max.into();
456        self
457    }
458
459    /// Sets an interval for HTTP2 Ping frames should be sent to keep a
460    /// connection alive.
461    ///
462    /// Pass `None` to disable HTTP2 keep-alive.
463    ///
464    /// Default is currently disabled.
465    ///
466    /// # Cargo Feature
467    ///
468    /// Requires the `runtime` cargo feature to be enabled.
469    #[cfg(feature = "runtime")]
470    #[cfg(feature = "http2")]
471    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
472    pub fn http2_keep_alive_interval(
473        &mut self,
474        interval: impl Into<Option<Duration>>,
475    ) -> &mut Self {
476        self.h2_builder.keep_alive_interval = interval.into();
477        self
478    }
479
480    /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
481    ///
482    /// If the ping is not acknowledged within the timeout, the connection will
483    /// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
484    ///
485    /// Default is 20 seconds.
486    ///
487    /// # Cargo Feature
488    ///
489    /// Requires the `runtime` cargo feature to be enabled.
490    #[cfg(feature = "runtime")]
491    #[cfg(feature = "http2")]
492    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
493    pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
494        self.h2_builder.keep_alive_timeout = timeout;
495        self
496    }
497
498    /// Set the maximum write buffer size for each HTTP/2 stream.
499    ///
500    /// Default is currently ~400KB, but may change.
501    ///
502    /// # Panics
503    ///
504    /// The value must be no larger than `u32::MAX`.
505    #[cfg(feature = "http2")]
506    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
507    pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self {
508        assert!(max <= std::u32::MAX as usize);
509        self.h2_builder.max_send_buffer_size = max;
510        self
511    }
512
513    /// Enables the [extended CONNECT protocol].
514    ///
515    /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
516    #[cfg(feature = "http2")]
517    pub fn http2_enable_connect_protocol(&mut self) -> &mut Self {
518        self.h2_builder.enable_connect_protocol = true;
519        self
520    }
521
522    /// Sets the max size of received header frames.
523    ///
524    /// Default is currently ~16MB, but may change.
525    #[cfg(feature = "http2")]
526    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
527    pub fn http2_max_header_list_size(&mut self, max: u32) -> &mut Self {
528        self.h2_builder.max_header_list_size = max;
529        self
530    }
531
532    /// Set the maximum buffer size for the connection.
533    ///
534    /// Default is ~400kb.
535    ///
536    /// # Panics
537    ///
538    /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
539    #[cfg(feature = "http1")]
540    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
541    pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
542        assert!(
543            max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE,
544            "the max_buf_size cannot be smaller than the minimum that h1 specifies."
545        );
546        self.max_buf_size = Some(max);
547        self
548    }
549
550    /// Aggregates flushes to better support pipelined responses.
551    ///
552    /// Experimental, may have bugs.
553    ///
554    /// Default is false.
555    pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self {
556        self.pipeline_flush = enabled;
557        self
558    }
559
560    /// Set the executor used to spawn background tasks.
561    ///
562    /// Default uses implicit default (like `tokio::spawn`).
563    pub fn with_executor<E2>(self, exec: E2) -> Http<E2> {
564        Http {
565            exec,
566            h1_half_close: self.h1_half_close,
567            h1_keep_alive: self.h1_keep_alive,
568            h1_title_case_headers: self.h1_title_case_headers,
569            h1_preserve_header_case: self.h1_preserve_header_case,
570            #[cfg(all(feature = "http1", feature = "runtime"))]
571            h1_header_read_timeout: self.h1_header_read_timeout,
572            h1_writev: self.h1_writev,
573            #[cfg(feature = "http2")]
574            h2_builder: self.h2_builder,
575            mode: self.mode,
576            max_buf_size: self.max_buf_size,
577            pipeline_flush: self.pipeline_flush,
578        }
579    }
580
581    /// Bind a connection together with a [`Service`](crate::service::Service).
582    ///
583    /// This returns a Future that must be polled in order for HTTP to be
584    /// driven on the connection.
585    ///
586    /// # Example
587    ///
588    /// ```
589    /// # use hyper::{Body, Request, Response};
590    /// # use hyper::service::Service;
591    /// # use hyper::server::conn::Http;
592    /// # use tokio::io::{AsyncRead, AsyncWrite};
593    /// # async fn run<I, S>(some_io: I, some_service: S)
594    /// # where
595    /// #     I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
596    /// #     S: Service<hyper::Request<Body>, Response=hyper::Response<Body>> + Send + 'static,
597    /// #     S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
598    /// #     S::Future: Send,
599    /// # {
600    /// let http = Http::new();
601    /// let conn = http.serve_connection(some_io, some_service);
602    ///
603    /// if let Err(e) = conn.await {
604    ///     eprintln!("server connection error: {}", e);
605    /// }
606    /// # }
607    /// # fn main() {}
608    /// ```
609    pub fn serve_connection<S, I, Bd>(&self, io: I, service: S) -> Connection<I, S, E>
610    where
611        S: HttpService<Body, ResBody = Bd>,
612        S::Error: Into<Box<dyn StdError + Send + Sync>>,
613        Bd: HttpBody + 'static,
614        Bd::Error: Into<Box<dyn StdError + Send + Sync>>,
615        I: AsyncRead + AsyncWrite + Unpin,
616        E: ConnStreamExec<S::Future, Bd>,
617    {
618        #[cfg(feature = "http1")]
619        macro_rules! h1 {
620            () => {{
621                let mut conn = proto::Conn::new(io);
622                if !self.h1_keep_alive {
623                    conn.disable_keep_alive();
624                }
625                if self.h1_half_close {
626                    conn.set_allow_half_close();
627                }
628                if self.h1_title_case_headers {
629                    conn.set_title_case_headers();
630                }
631                if self.h1_preserve_header_case {
632                    conn.set_preserve_header_case();
633                }
634                #[cfg(all(feature = "http1", feature = "runtime"))]
635                if let Some(header_read_timeout) = self.h1_header_read_timeout {
636                    conn.set_http1_header_read_timeout(header_read_timeout);
637                }
638                if let Some(writev) = self.h1_writev {
639                    if writev {
640                        conn.set_write_strategy_queue();
641                    } else {
642                        conn.set_write_strategy_flatten();
643                    }
644                }
645                conn.set_flush_pipeline(self.pipeline_flush);
646                if let Some(max) = self.max_buf_size {
647                    conn.set_max_buf_size(max);
648                }
649                let sd = proto::h1::dispatch::Server::new(service);
650                ProtoServer::H1 {
651                    h1: proto::h1::Dispatcher::new(sd, conn),
652                }
653            }};
654        }
655
656        let proto = match self.mode {
657            #[cfg(feature = "http1")]
658            #[cfg(not(feature = "http2"))]
659            ConnectionMode::H1Only => h1!(),
660            #[cfg(feature = "http2")]
661            #[cfg(feature = "http1")]
662            ConnectionMode::H1Only | ConnectionMode::Fallback => h1!(),
663            #[cfg(feature = "http2")]
664            ConnectionMode::H2Only => {
665                let rewind_io = Rewind::new(io);
666                let h2 =
667                    proto::h2::Server::new(rewind_io, service, &self.h2_builder, self.exec.clone());
668                ProtoServer::H2 { h2 }
669            }
670        };
671
672        Connection {
673            conn: Some(proto),
674            #[cfg(all(feature = "http1", feature = "http2"))]
675            fallback: if self.mode == ConnectionMode::Fallback {
676                Fallback::ToHttp2(self.h2_builder.clone(), self.exec.clone())
677            } else {
678                Fallback::Http1Only
679            },
680            #[cfg(not(all(feature = "http1", feature = "http2")))]
681            fallback: PhantomData,
682        }
683    }
684}
685
686// ===== impl Connection =====
687
688#[cfg(any(feature = "http1", feature = "http2"))]
689impl<I, B, S, E> Connection<I, S, E>
690where
691    S: HttpService<Body, ResBody = B>,
692    S::Error: Into<Box<dyn StdError + Send + Sync>>,
693    I: AsyncRead + AsyncWrite + Unpin,
694    B: HttpBody + 'static,
695    B::Error: Into<Box<dyn StdError + Send + Sync>>,
696    E: ConnStreamExec<S::Future, B>,
697{
698    /// Start a graceful shutdown process for this connection.
699    ///
700    /// This `Connection` should continue to be polled until shutdown
701    /// can finish.
702    ///
703    /// # Note
704    ///
705    /// This should only be called while the `Connection` future is still
706    /// pending. If called after `Connection::poll` has resolved, this does
707    /// nothing.
708    pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
709        match self.conn {
710            #[cfg(feature = "http1")]
711            Some(ProtoServer::H1 { ref mut h1, .. }) => {
712                h1.disable_keep_alive();
713            }
714            #[cfg(feature = "http2")]
715            Some(ProtoServer::H2 { ref mut h2 }) => {
716                h2.graceful_shutdown();
717            }
718            None => (),
719
720            #[cfg(not(feature = "http1"))]
721            Some(ProtoServer::H1 { ref mut h1, .. }) => match h1.0 {},
722            #[cfg(not(feature = "http2"))]
723            Some(ProtoServer::H2 { ref mut h2 }) => match h2.0 {},
724        }
725    }
726
727    /// Return the inner IO object, and additional information.
728    ///
729    /// If the IO object has been "rewound" the io will not contain those bytes rewound.
730    /// This should only be called after `poll_without_shutdown` signals
731    /// that the connection is "done". Otherwise, it may not have finished
732    /// flushing all necessary HTTP bytes.
733    ///
734    /// # Panics
735    /// This method will panic if this connection is using an h2 protocol.
736    pub fn into_parts(self) -> Parts<I, S> {
737        self.try_into_parts()
738            .unwrap_or_else(|| panic!("h2 cannot into_inner"))
739    }
740
741    /// Return the inner IO object, and additional information, if available.
742    ///
743    /// This method will return a `None` if this connection is using an h2 protocol.
744    pub fn try_into_parts(self) -> Option<Parts<I, S>> {
745        match self.conn.unwrap() {
746            #[cfg(feature = "http1")]
747            ProtoServer::H1 { h1, .. } => {
748                let (io, read_buf, dispatch) = h1.into_inner();
749                Some(Parts {
750                    io,
751                    read_buf,
752                    service: dispatch.into_service(),
753                    _inner: (),
754                })
755            }
756            ProtoServer::H2 { .. } => None,
757
758            #[cfg(not(feature = "http1"))]
759            ProtoServer::H1 { h1, .. } => match h1.0 {},
760        }
761    }
762
763    /// Poll the connection for completion, but without calling `shutdown`
764    /// on the underlying IO.
765    ///
766    /// This is useful to allow running a connection while doing an HTTP
767    /// upgrade. Once the upgrade is completed, the connection would be "done",
768    /// but it is not desired to actually shutdown the IO object. Instead you
769    /// would take it back using `into_parts`.
770    pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>
771    where
772        S: Unpin,
773        S::Future: Unpin,
774        B: Unpin,
775    {
776        loop {
777            match *self.conn.as_mut().unwrap() {
778                #[cfg(feature = "http1")]
779                ProtoServer::H1 { ref mut h1, .. } => match ready!(h1.poll_without_shutdown(cx)) {
780                    Ok(()) => return Poll::Ready(Ok(())),
781                    Err(e) => {
782                        #[cfg(feature = "http2")]
783                        match *e.kind() {
784                            Kind::Parse(Parse::VersionH2) if self.fallback.to_h2() => {
785                                self.upgrade_h2();
786                                continue;
787                            }
788                            _ => (),
789                        }
790
791                        return Poll::Ready(Err(e));
792                    }
793                },
794                #[cfg(feature = "http2")]
795                ProtoServer::H2 { ref mut h2 } => return Pin::new(h2).poll(cx).map_ok(|_| ()),
796
797                #[cfg(not(feature = "http1"))]
798                ProtoServer::H1 { ref mut h1, .. } => match h1.0 {},
799                #[cfg(not(feature = "http2"))]
800                ProtoServer::H2 { ref mut h2 } => match h2.0 {},
801            };
802        }
803    }
804
805    /// Prevent shutdown of the underlying IO object at the end of service the request,
806    /// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`.
807    ///
808    /// # Error
809    ///
810    /// This errors if the underlying connection protocol is not HTTP/1.
811    pub fn without_shutdown(self) -> impl Future<Output = crate::Result<Parts<I, S>>>
812    where
813        S: Unpin,
814        S::Future: Unpin,
815        B: Unpin,
816    {
817        let mut conn = Some(self);
818        futures_util::future::poll_fn(move |cx| {
819            ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?;
820            Poll::Ready(conn.take().unwrap().try_into_parts().ok_or_else(crate::Error::new_without_shutdown_not_h1))
821        })
822    }
823
824    #[cfg(all(feature = "http1", feature = "http2"))]
825    fn upgrade_h2(&mut self) {
826        trace!("Trying to upgrade connection to h2");
827        let conn = self.conn.take();
828
829        let (io, read_buf, dispatch) = match conn.unwrap() {
830            ProtoServer::H1 { h1, .. } => h1.into_inner(),
831            ProtoServer::H2 { .. } => {
832                panic!("h2 cannot into_inner");
833            }
834        };
835        let mut rewind_io = Rewind::new(io);
836        rewind_io.rewind(read_buf);
837        let (builder, exec) = match self.fallback {
838            Fallback::ToHttp2(ref builder, ref exec) => (builder, exec),
839            Fallback::Http1Only => unreachable!("upgrade_h2 with Fallback::Http1Only"),
840        };
841        let h2 = proto::h2::Server::new(rewind_io, dispatch.into_service(), builder, exec.clone());
842
843        debug_assert!(self.conn.is_none());
844        self.conn = Some(ProtoServer::H2 { h2 });
845    }
846
847    /// Enable this connection to support higher-level HTTP upgrades.
848    ///
849    /// See [the `upgrade` module](crate::upgrade) for more.
850    pub fn with_upgrades(self) -> UpgradeableConnection<I, S, E>
851    where
852        I: Send,
853    {
854        UpgradeableConnection { inner: self }
855    }
856}
857
858#[cfg(any(feature = "http1", feature = "http2"))]
859impl<I, B, S, E> Future for Connection<I, S, E>
860where
861    S: HttpService<Body, ResBody = B>,
862    S::Error: Into<Box<dyn StdError + Send + Sync>>,
863    I: AsyncRead + AsyncWrite + Unpin + 'static,
864    B: HttpBody + 'static,
865    B::Error: Into<Box<dyn StdError + Send + Sync>>,
866    E: ConnStreamExec<S::Future, B>,
867{
868    type Output = crate::Result<()>;
869
870    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
871        loop {
872            match ready!(Pin::new(self.conn.as_mut().unwrap()).poll(cx)) {
873                Ok(done) => {
874                    match done {
875                        proto::Dispatched::Shutdown => {}
876                        #[cfg(feature = "http1")]
877                        proto::Dispatched::Upgrade(pending) => {
878                            // With no `Send` bound on `I`, we can't try to do
879                            // upgrades here. In case a user was trying to use
880                            // `Body::on_upgrade` with this API, send a special
881                            // error letting them know about that.
882                            pending.manual();
883                        }
884                    };
885                    return Poll::Ready(Ok(()));
886                }
887                Err(e) => {
888                    #[cfg(feature = "http1")]
889                    #[cfg(feature = "http2")]
890                    match *e.kind() {
891                        Kind::Parse(Parse::VersionH2) if self.fallback.to_h2() => {
892                            self.upgrade_h2();
893                            continue;
894                        }
895                        _ => (),
896                    }
897
898                    return Poll::Ready(Err(e));
899                }
900            }
901        }
902    }
903}
904
905#[cfg(any(feature = "http1", feature = "http2"))]
906impl<I, S> fmt::Debug for Connection<I, S>
907where
908    S: HttpService<Body>,
909{
910    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
911        f.debug_struct("Connection").finish()
912    }
913}
914
915// ===== impl ConnectionMode =====
916
917#[cfg(any(feature = "http1", feature = "http2"))]
918impl Default for ConnectionMode {
919    #[cfg(all(feature = "http1", feature = "http2"))]
920    fn default() -> ConnectionMode {
921        ConnectionMode::Fallback
922    }
923
924    #[cfg(all(feature = "http1", not(feature = "http2")))]
925    fn default() -> ConnectionMode {
926        ConnectionMode::H1Only
927    }
928
929    #[cfg(all(not(feature = "http1"), feature = "http2"))]
930    fn default() -> ConnectionMode {
931        ConnectionMode::H2Only
932    }
933}
934
935// ===== impl ProtoServer =====
936
937#[cfg(any(feature = "http1", feature = "http2"))]
938impl<T, B, S, E> Future for ProtoServer<T, B, S, E>
939where
940    T: AsyncRead + AsyncWrite + Unpin,
941    S: HttpService<Body, ResBody = B>,
942    S::Error: Into<Box<dyn StdError + Send + Sync>>,
943    B: HttpBody + 'static,
944    B::Error: Into<Box<dyn StdError + Send + Sync>>,
945    E: ConnStreamExec<S::Future, B>,
946{
947    type Output = crate::Result<proto::Dispatched>;
948
949    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
950        match self.project() {
951            #[cfg(feature = "http1")]
952            ProtoServerProj::H1 { h1, .. } => h1.poll(cx),
953            #[cfg(feature = "http2")]
954            ProtoServerProj::H2 { h2 } => h2.poll(cx),
955
956            #[cfg(not(feature = "http1"))]
957            ProtoServerProj::H1 { h1, .. } => match h1.0 {},
958            #[cfg(not(feature = "http2"))]
959            ProtoServerProj::H2 { h2 } => match h2.0 {},
960        }
961    }
962}
963
964#[cfg(any(feature = "http1", feature = "http2"))]
965mod upgrades {
966    use super::*;
967
968    // A future binding a connection with a Service with Upgrade support.
969    //
970    // This type is unnameable outside the crate, and so basically just an
971    // `impl Future`, without requiring Rust 1.26.
972    #[must_use = "futures do nothing unless polled"]
973    #[allow(missing_debug_implementations)]
974    pub struct UpgradeableConnection<T, S, E>
975    where
976        S: HttpService<Body>,
977    {
978        pub(super) inner: Connection<T, S, E>,
979    }
980
981    impl<I, B, S, E> UpgradeableConnection<I, S, E>
982    where
983        S: HttpService<Body, ResBody = B>,
984        S::Error: Into<Box<dyn StdError + Send + Sync>>,
985        I: AsyncRead + AsyncWrite + Unpin,
986        B: HttpBody + 'static,
987        B::Error: Into<Box<dyn StdError + Send + Sync>>,
988        E: ConnStreamExec<S::Future, B>,
989    {
990        /// Start a graceful shutdown process for this connection.
991        ///
992        /// This `Connection` should continue to be polled until shutdown
993        /// can finish.
994        pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
995            Pin::new(&mut self.inner).graceful_shutdown()
996        }
997    }
998
999    impl<I, B, S, E> Future for UpgradeableConnection<I, S, E>
1000    where
1001        S: HttpService<Body, ResBody = B>,
1002        S::Error: Into<Box<dyn StdError + Send + Sync>>,
1003        I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
1004        B: HttpBody + 'static,
1005        B::Error: Into<Box<dyn StdError + Send + Sync>>,
1006        E: ConnStreamExec<S::Future, B>,
1007    {
1008        type Output = crate::Result<()>;
1009
1010        fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1011            loop {
1012                match ready!(Pin::new(self.inner.conn.as_mut().unwrap()).poll(cx)) {
1013                    Ok(proto::Dispatched::Shutdown) => return Poll::Ready(Ok(())),
1014                    #[cfg(feature = "http1")]
1015                    Ok(proto::Dispatched::Upgrade(pending)) => {
1016                        match self.inner.conn.take() {
1017                            Some(ProtoServer::H1 { h1, .. }) => {
1018                                let (io, buf, _) = h1.into_inner();
1019                                pending.fulfill(Upgraded::new(io, buf));
1020                                return Poll::Ready(Ok(()));
1021                            }
1022                            _ => {
1023                                drop(pending);
1024                                unreachable!("Upgrade expects h1")
1025                            }
1026                        };
1027                    }
1028                    Err(e) => {
1029                        #[cfg(feature = "http1")]
1030                        #[cfg(feature = "http2")]
1031                        match *e.kind() {
1032                            Kind::Parse(Parse::VersionH2) if self.inner.fallback.to_h2() => {
1033                                self.inner.upgrade_h2();
1034                                continue;
1035                            }
1036                            _ => (),
1037                        }
1038
1039                        return Poll::Ready(Err(e));
1040                    }
1041                }
1042            }
1043        }
1044    }
1045}