1#[cfg(all(
47    any(feature = "http1", feature = "http2"),
48    not(all(feature = "http1", feature = "http2"))
49))]
50use std::marker::PhantomData;
51#[cfg(all(any(feature = "http1", feature = "http2"), feature = "runtime"))]
52use std::time::Duration;
53
54#[cfg(feature = "http2")]
55use crate::common::io::Rewind;
56#[cfg(all(feature = "http1", feature = "http2"))]
57use crate::error::{Kind, Parse};
58#[cfg(feature = "http1")]
59use crate::upgrade::Upgraded;
60
61cfg_feature! {
62    #![any(feature = "http1", feature = "http2")]
63
64    use std::error::Error as StdError;
65    use std::fmt;
66
67    use bytes::Bytes;
68    use pin_project_lite::pin_project;
69    use tokio::io::{AsyncRead, AsyncWrite};
70    use tracing::trace;
71
72    pub use super::server::Connecting;
73    use crate::body::{Body, HttpBody};
74    use crate::common::{task, Future, Pin, Poll, Unpin};
75    #[cfg(not(all(feature = "http1", feature = "http2")))]
76    use crate::common::Never;
77    use crate::common::exec::{ConnStreamExec, Exec};
78    use crate::proto;
79    use crate::service::HttpService;
80
81    pub(super) use self::upgrades::UpgradeableConnection;
82}
83
84#[cfg(feature = "tcp")]
85pub use super::tcp::{AddrIncoming, AddrStream};
86
87#[derive(Clone, Debug)]
94#[cfg(any(feature = "http1", feature = "http2"))]
95#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
96pub struct Http<E = Exec> {
97    pub(crate) exec: E,
98    h1_half_close: bool,
99    h1_keep_alive: bool,
100    h1_title_case_headers: bool,
101    h1_preserve_header_case: bool,
102    #[cfg(all(feature = "http1", feature = "runtime"))]
103    h1_header_read_timeout: Option<Duration>,
104    h1_writev: Option<bool>,
105    #[cfg(feature = "http2")]
106    h2_builder: proto::h2::server::Config,
107    mode: ConnectionMode,
108    max_buf_size: Option<usize>,
109    pipeline_flush: bool,
110}
111
112#[cfg(any(feature = "http1", feature = "http2"))]
114#[derive(Clone, Debug, PartialEq)]
115enum ConnectionMode {
116    #[cfg(feature = "http1")]
118    H1Only,
119    #[cfg(feature = "http2")]
121    H2Only,
122    #[cfg(all(feature = "http1", feature = "http2"))]
124    Fallback,
125}
126
127#[cfg(any(feature = "http1", feature = "http2"))]
128pin_project! {
129    #[must_use = "futures do nothing unless polled"]
133    #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
134    pub struct Connection<T, S, E = Exec>
135    where
136        S: HttpService<Body>,
137    {
138        pub(super) conn: Option<ProtoServer<T, S::ResBody, S, E>>,
139        fallback: Fallback<E>,
140    }
141}
142
143#[cfg(feature = "http1")]
144type Http1Dispatcher<T, B, S> =
145    proto::h1::Dispatcher<proto::h1::dispatch::Server<S, Body>, B, T, proto::ServerTransaction>;
146
147#[cfg(all(not(feature = "http1"), feature = "http2"))]
148type Http1Dispatcher<T, B, S> = (Never, PhantomData<(T, Box<Pin<B>>, Box<Pin<S>>)>);
149
150#[cfg(feature = "http2")]
151type Http2Server<T, B, S, E> = proto::h2::Server<Rewind<T>, S, B, E>;
152
153#[cfg(all(not(feature = "http2"), feature = "http1"))]
154type Http2Server<T, B, S, E> = (
155    Never,
156    PhantomData<(T, Box<Pin<S>>, Box<Pin<B>>, Box<Pin<E>>)>,
157);
158
159#[cfg(any(feature = "http1", feature = "http2"))]
160pin_project! {
161    #[project = ProtoServerProj]
162    pub(super) enum ProtoServer<T, B, S, E = Exec>
163    where
164        S: HttpService<Body>,
165        B: HttpBody,
166    {
167        H1 {
168            #[pin]
169            h1: Http1Dispatcher<T, B, S>,
170        },
171        H2 {
172            #[pin]
173            h2: Http2Server<T, B, S, E>,
174        },
175    }
176}
177
178#[cfg(all(feature = "http1", feature = "http2"))]
179#[derive(Clone, Debug)]
180enum Fallback<E> {
181    ToHttp2(proto::h2::server::Config, E),
182    Http1Only,
183}
184
185#[cfg(all(
186    any(feature = "http1", feature = "http2"),
187    not(all(feature = "http1", feature = "http2"))
188))]
189type Fallback<E> = PhantomData<E>;
190
191#[cfg(all(feature = "http1", feature = "http2"))]
192impl<E> Fallback<E> {
193    fn to_h2(&self) -> bool {
194        match *self {
195            Fallback::ToHttp2(..) => true,
196            Fallback::Http1Only => false,
197        }
198    }
199}
200
201#[cfg(all(feature = "http1", feature = "http2"))]
202impl<E> Unpin for Fallback<E> {}
203
204#[derive(Debug)]
209#[cfg(any(feature = "http1", feature = "http2"))]
210#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
211pub struct Parts<T, S> {
212    pub io: T,
214    pub read_buf: Bytes,
223    pub service: S,
225    _inner: (),
226}
227
228#[cfg(any(feature = "http1", feature = "http2"))]
231impl Http {
232    pub fn new() -> Http {
235        Http {
236            exec: Exec::Default,
237            h1_half_close: false,
238            h1_keep_alive: true,
239            h1_title_case_headers: false,
240            h1_preserve_header_case: false,
241            #[cfg(all(feature = "http1", feature = "runtime"))]
242            h1_header_read_timeout: None,
243            h1_writev: None,
244            #[cfg(feature = "http2")]
245            h2_builder: Default::default(),
246            mode: ConnectionMode::default(),
247            max_buf_size: None,
248            pipeline_flush: false,
249        }
250    }
251}
252
253#[cfg(any(feature = "http1", feature = "http2"))]
254impl<E> Http<E> {
255    #[cfg(feature = "http1")]
259    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
260    pub fn http1_only(&mut self, val: bool) -> &mut Self {
261        if val {
262            self.mode = ConnectionMode::H1Only;
263        } else {
264            #[cfg(feature = "http2")]
265            {
266                self.mode = ConnectionMode::Fallback;
267            }
268        }
269        self
270    }
271
272    #[cfg(feature = "http1")]
281    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
282    pub fn http1_half_close(&mut self, val: bool) -> &mut Self {
283        self.h1_half_close = val;
284        self
285    }
286
287    #[cfg(feature = "http1")]
291    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
292    pub fn http1_keep_alive(&mut self, val: bool) -> &mut Self {
293        self.h1_keep_alive = val;
294        self
295    }
296
297    #[cfg(feature = "http1")]
304    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
305    pub fn http1_title_case_headers(&mut self, enabled: bool) -> &mut Self {
306        self.h1_title_case_headers = enabled;
307        self
308    }
309
310    #[cfg(feature = "http1")]
324    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
325    pub fn http1_preserve_header_case(&mut self, enabled: bool) -> &mut Self {
326        self.h1_preserve_header_case = enabled;
327        self
328    }
329
330    #[cfg(all(feature = "http1", feature = "runtime"))]
335    #[cfg_attr(docsrs, doc(cfg(all(feature = "http1", feature = "runtime"))))]
336    pub fn http1_header_read_timeout(&mut self, read_timeout: Duration) -> &mut Self {
337        self.h1_header_read_timeout = Some(read_timeout);
338        self
339    }
340
341    #[inline]
354    #[cfg(feature = "http1")]
355    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
356    pub fn http1_writev(&mut self, val: bool) -> &mut Self {
357        self.h1_writev = Some(val);
358        self
359    }
360
361    #[cfg(feature = "http2")]
365    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
366    pub fn http2_only(&mut self, val: bool) -> &mut Self {
367        if val {
368            self.mode = ConnectionMode::H2Only;
369        } else {
370            #[cfg(feature = "http1")]
371            {
372                self.mode = ConnectionMode::Fallback;
373            }
374        }
375        self
376    }
377
378    #[cfg(feature = "http2")]
387    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
388    pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
389        if let Some(sz) = sz.into() {
390            self.h2_builder.adaptive_window = false;
391            self.h2_builder.initial_stream_window_size = sz;
392        }
393        self
394    }
395
396    #[cfg(feature = "http2")]
402    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
403    pub fn http2_initial_connection_window_size(
404        &mut self,
405        sz: impl Into<Option<u32>>,
406    ) -> &mut Self {
407        if let Some(sz) = sz.into() {
408            self.h2_builder.adaptive_window = false;
409            self.h2_builder.initial_conn_window_size = sz;
410        }
411        self
412    }
413
414    #[cfg(feature = "http2")]
420    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
421    pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
422        use proto::h2::SPEC_WINDOW_SIZE;
423
424        self.h2_builder.adaptive_window = enabled;
425        if enabled {
426            self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE;
427            self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE;
428        }
429        self
430    }
431
432    #[cfg(feature = "http2")]
438    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
439    pub fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
440        if let Some(sz) = sz.into() {
441            self.h2_builder.max_frame_size = sz;
442        }
443        self
444    }
445
446    #[cfg(feature = "http2")]
453    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
454    pub fn http2_max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self {
455        self.h2_builder.max_concurrent_streams = max.into();
456        self
457    }
458
459    #[cfg(feature = "runtime")]
470    #[cfg(feature = "http2")]
471    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
472    pub fn http2_keep_alive_interval(
473        &mut self,
474        interval: impl Into<Option<Duration>>,
475    ) -> &mut Self {
476        self.h2_builder.keep_alive_interval = interval.into();
477        self
478    }
479
480    #[cfg(feature = "runtime")]
491    #[cfg(feature = "http2")]
492    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
493    pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
494        self.h2_builder.keep_alive_timeout = timeout;
495        self
496    }
497
498    #[cfg(feature = "http2")]
506    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
507    pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self {
508        assert!(max <= std::u32::MAX as usize);
509        self.h2_builder.max_send_buffer_size = max;
510        self
511    }
512
513    #[cfg(feature = "http2")]
517    pub fn http2_enable_connect_protocol(&mut self) -> &mut Self {
518        self.h2_builder.enable_connect_protocol = true;
519        self
520    }
521
522    #[cfg(feature = "http2")]
526    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
527    pub fn http2_max_header_list_size(&mut self, max: u32) -> &mut Self {
528        self.h2_builder.max_header_list_size = max;
529        self
530    }
531
532    #[cfg(feature = "http1")]
540    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
541    pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
542        assert!(
543            max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE,
544            "the max_buf_size cannot be smaller than the minimum that h1 specifies."
545        );
546        self.max_buf_size = Some(max);
547        self
548    }
549
550    pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self {
556        self.pipeline_flush = enabled;
557        self
558    }
559
560    pub fn with_executor<E2>(self, exec: E2) -> Http<E2> {
564        Http {
565            exec,
566            h1_half_close: self.h1_half_close,
567            h1_keep_alive: self.h1_keep_alive,
568            h1_title_case_headers: self.h1_title_case_headers,
569            h1_preserve_header_case: self.h1_preserve_header_case,
570            #[cfg(all(feature = "http1", feature = "runtime"))]
571            h1_header_read_timeout: self.h1_header_read_timeout,
572            h1_writev: self.h1_writev,
573            #[cfg(feature = "http2")]
574            h2_builder: self.h2_builder,
575            mode: self.mode,
576            max_buf_size: self.max_buf_size,
577            pipeline_flush: self.pipeline_flush,
578        }
579    }
580
581    pub fn serve_connection<S, I, Bd>(&self, io: I, service: S) -> Connection<I, S, E>
610    where
611        S: HttpService<Body, ResBody = Bd>,
612        S::Error: Into<Box<dyn StdError + Send + Sync>>,
613        Bd: HttpBody + 'static,
614        Bd::Error: Into<Box<dyn StdError + Send + Sync>>,
615        I: AsyncRead + AsyncWrite + Unpin,
616        E: ConnStreamExec<S::Future, Bd>,
617    {
618        #[cfg(feature = "http1")]
619        macro_rules! h1 {
620            () => {{
621                let mut conn = proto::Conn::new(io);
622                if !self.h1_keep_alive {
623                    conn.disable_keep_alive();
624                }
625                if self.h1_half_close {
626                    conn.set_allow_half_close();
627                }
628                if self.h1_title_case_headers {
629                    conn.set_title_case_headers();
630                }
631                if self.h1_preserve_header_case {
632                    conn.set_preserve_header_case();
633                }
634                #[cfg(all(feature = "http1", feature = "runtime"))]
635                if let Some(header_read_timeout) = self.h1_header_read_timeout {
636                    conn.set_http1_header_read_timeout(header_read_timeout);
637                }
638                if let Some(writev) = self.h1_writev {
639                    if writev {
640                        conn.set_write_strategy_queue();
641                    } else {
642                        conn.set_write_strategy_flatten();
643                    }
644                }
645                conn.set_flush_pipeline(self.pipeline_flush);
646                if let Some(max) = self.max_buf_size {
647                    conn.set_max_buf_size(max);
648                }
649                let sd = proto::h1::dispatch::Server::new(service);
650                ProtoServer::H1 {
651                    h1: proto::h1::Dispatcher::new(sd, conn),
652                }
653            }};
654        }
655
656        let proto = match self.mode {
657            #[cfg(feature = "http1")]
658            #[cfg(not(feature = "http2"))]
659            ConnectionMode::H1Only => h1!(),
660            #[cfg(feature = "http2")]
661            #[cfg(feature = "http1")]
662            ConnectionMode::H1Only | ConnectionMode::Fallback => h1!(),
663            #[cfg(feature = "http2")]
664            ConnectionMode::H2Only => {
665                let rewind_io = Rewind::new(io);
666                let h2 =
667                    proto::h2::Server::new(rewind_io, service, &self.h2_builder, self.exec.clone());
668                ProtoServer::H2 { h2 }
669            }
670        };
671
672        Connection {
673            conn: Some(proto),
674            #[cfg(all(feature = "http1", feature = "http2"))]
675            fallback: if self.mode == ConnectionMode::Fallback {
676                Fallback::ToHttp2(self.h2_builder.clone(), self.exec.clone())
677            } else {
678                Fallback::Http1Only
679            },
680            #[cfg(not(all(feature = "http1", feature = "http2")))]
681            fallback: PhantomData,
682        }
683    }
684}
685
686#[cfg(any(feature = "http1", feature = "http2"))]
689impl<I, B, S, E> Connection<I, S, E>
690where
691    S: HttpService<Body, ResBody = B>,
692    S::Error: Into<Box<dyn StdError + Send + Sync>>,
693    I: AsyncRead + AsyncWrite + Unpin,
694    B: HttpBody + 'static,
695    B::Error: Into<Box<dyn StdError + Send + Sync>>,
696    E: ConnStreamExec<S::Future, B>,
697{
698    pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
709        match self.conn {
710            #[cfg(feature = "http1")]
711            Some(ProtoServer::H1 { ref mut h1, .. }) => {
712                h1.disable_keep_alive();
713            }
714            #[cfg(feature = "http2")]
715            Some(ProtoServer::H2 { ref mut h2 }) => {
716                h2.graceful_shutdown();
717            }
718            None => (),
719
720            #[cfg(not(feature = "http1"))]
721            Some(ProtoServer::H1 { ref mut h1, .. }) => match h1.0 {},
722            #[cfg(not(feature = "http2"))]
723            Some(ProtoServer::H2 { ref mut h2 }) => match h2.0 {},
724        }
725    }
726
727    pub fn into_parts(self) -> Parts<I, S> {
737        self.try_into_parts()
738            .unwrap_or_else(|| panic!("h2 cannot into_inner"))
739    }
740
741    pub fn try_into_parts(self) -> Option<Parts<I, S>> {
745        match self.conn.unwrap() {
746            #[cfg(feature = "http1")]
747            ProtoServer::H1 { h1, .. } => {
748                let (io, read_buf, dispatch) = h1.into_inner();
749                Some(Parts {
750                    io,
751                    read_buf,
752                    service: dispatch.into_service(),
753                    _inner: (),
754                })
755            }
756            ProtoServer::H2 { .. } => None,
757
758            #[cfg(not(feature = "http1"))]
759            ProtoServer::H1 { h1, .. } => match h1.0 {},
760        }
761    }
762
763    pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>
771    where
772        S: Unpin,
773        S::Future: Unpin,
774        B: Unpin,
775    {
776        loop {
777            match *self.conn.as_mut().unwrap() {
778                #[cfg(feature = "http1")]
779                ProtoServer::H1 { ref mut h1, .. } => match ready!(h1.poll_without_shutdown(cx)) {
780                    Ok(()) => return Poll::Ready(Ok(())),
781                    Err(e) => {
782                        #[cfg(feature = "http2")]
783                        match *e.kind() {
784                            Kind::Parse(Parse::VersionH2) if self.fallback.to_h2() => {
785                                self.upgrade_h2();
786                                continue;
787                            }
788                            _ => (),
789                        }
790
791                        return Poll::Ready(Err(e));
792                    }
793                },
794                #[cfg(feature = "http2")]
795                ProtoServer::H2 { ref mut h2 } => return Pin::new(h2).poll(cx).map_ok(|_| ()),
796
797                #[cfg(not(feature = "http1"))]
798                ProtoServer::H1 { ref mut h1, .. } => match h1.0 {},
799                #[cfg(not(feature = "http2"))]
800                ProtoServer::H2 { ref mut h2 } => match h2.0 {},
801            };
802        }
803    }
804
805    pub fn without_shutdown(self) -> impl Future<Output = crate::Result<Parts<I, S>>>
812    where
813        S: Unpin,
814        S::Future: Unpin,
815        B: Unpin,
816    {
817        let mut conn = Some(self);
818        futures_util::future::poll_fn(move |cx| {
819            ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?;
820            Poll::Ready(conn.take().unwrap().try_into_parts().ok_or_else(crate::Error::new_without_shutdown_not_h1))
821        })
822    }
823
824    #[cfg(all(feature = "http1", feature = "http2"))]
825    fn upgrade_h2(&mut self) {
826        trace!("Trying to upgrade connection to h2");
827        let conn = self.conn.take();
828
829        let (io, read_buf, dispatch) = match conn.unwrap() {
830            ProtoServer::H1 { h1, .. } => h1.into_inner(),
831            ProtoServer::H2 { .. } => {
832                panic!("h2 cannot into_inner");
833            }
834        };
835        let mut rewind_io = Rewind::new(io);
836        rewind_io.rewind(read_buf);
837        let (builder, exec) = match self.fallback {
838            Fallback::ToHttp2(ref builder, ref exec) => (builder, exec),
839            Fallback::Http1Only => unreachable!("upgrade_h2 with Fallback::Http1Only"),
840        };
841        let h2 = proto::h2::Server::new(rewind_io, dispatch.into_service(), builder, exec.clone());
842
843        debug_assert!(self.conn.is_none());
844        self.conn = Some(ProtoServer::H2 { h2 });
845    }
846
847    pub fn with_upgrades(self) -> UpgradeableConnection<I, S, E>
851    where
852        I: Send,
853    {
854        UpgradeableConnection { inner: self }
855    }
856}
857
858#[cfg(any(feature = "http1", feature = "http2"))]
859impl<I, B, S, E> Future for Connection<I, S, E>
860where
861    S: HttpService<Body, ResBody = B>,
862    S::Error: Into<Box<dyn StdError + Send + Sync>>,
863    I: AsyncRead + AsyncWrite + Unpin + 'static,
864    B: HttpBody + 'static,
865    B::Error: Into<Box<dyn StdError + Send + Sync>>,
866    E: ConnStreamExec<S::Future, B>,
867{
868    type Output = crate::Result<()>;
869
870    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
871        loop {
872            match ready!(Pin::new(self.conn.as_mut().unwrap()).poll(cx)) {
873                Ok(done) => {
874                    match done {
875                        proto::Dispatched::Shutdown => {}
876                        #[cfg(feature = "http1")]
877                        proto::Dispatched::Upgrade(pending) => {
878                            pending.manual();
883                        }
884                    };
885                    return Poll::Ready(Ok(()));
886                }
887                Err(e) => {
888                    #[cfg(feature = "http1")]
889                    #[cfg(feature = "http2")]
890                    match *e.kind() {
891                        Kind::Parse(Parse::VersionH2) if self.fallback.to_h2() => {
892                            self.upgrade_h2();
893                            continue;
894                        }
895                        _ => (),
896                    }
897
898                    return Poll::Ready(Err(e));
899                }
900            }
901        }
902    }
903}
904
905#[cfg(any(feature = "http1", feature = "http2"))]
906impl<I, S> fmt::Debug for Connection<I, S>
907where
908    S: HttpService<Body>,
909{
910    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
911        f.debug_struct("Connection").finish()
912    }
913}
914
915#[cfg(any(feature = "http1", feature = "http2"))]
918impl Default for ConnectionMode {
919    #[cfg(all(feature = "http1", feature = "http2"))]
920    fn default() -> ConnectionMode {
921        ConnectionMode::Fallback
922    }
923
924    #[cfg(all(feature = "http1", not(feature = "http2")))]
925    fn default() -> ConnectionMode {
926        ConnectionMode::H1Only
927    }
928
929    #[cfg(all(not(feature = "http1"), feature = "http2"))]
930    fn default() -> ConnectionMode {
931        ConnectionMode::H2Only
932    }
933}
934
935#[cfg(any(feature = "http1", feature = "http2"))]
938impl<T, B, S, E> Future for ProtoServer<T, B, S, E>
939where
940    T: AsyncRead + AsyncWrite + Unpin,
941    S: HttpService<Body, ResBody = B>,
942    S::Error: Into<Box<dyn StdError + Send + Sync>>,
943    B: HttpBody + 'static,
944    B::Error: Into<Box<dyn StdError + Send + Sync>>,
945    E: ConnStreamExec<S::Future, B>,
946{
947    type Output = crate::Result<proto::Dispatched>;
948
949    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
950        match self.project() {
951            #[cfg(feature = "http1")]
952            ProtoServerProj::H1 { h1, .. } => h1.poll(cx),
953            #[cfg(feature = "http2")]
954            ProtoServerProj::H2 { h2 } => h2.poll(cx),
955
956            #[cfg(not(feature = "http1"))]
957            ProtoServerProj::H1 { h1, .. } => match h1.0 {},
958            #[cfg(not(feature = "http2"))]
959            ProtoServerProj::H2 { h2 } => match h2.0 {},
960        }
961    }
962}
963
964#[cfg(any(feature = "http1", feature = "http2"))]
965mod upgrades {
966    use super::*;
967
968    #[must_use = "futures do nothing unless polled"]
973    #[allow(missing_debug_implementations)]
974    pub struct UpgradeableConnection<T, S, E>
975    where
976        S: HttpService<Body>,
977    {
978        pub(super) inner: Connection<T, S, E>,
979    }
980
981    impl<I, B, S, E> UpgradeableConnection<I, S, E>
982    where
983        S: HttpService<Body, ResBody = B>,
984        S::Error: Into<Box<dyn StdError + Send + Sync>>,
985        I: AsyncRead + AsyncWrite + Unpin,
986        B: HttpBody + 'static,
987        B::Error: Into<Box<dyn StdError + Send + Sync>>,
988        E: ConnStreamExec<S::Future, B>,
989    {
990        pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
995            Pin::new(&mut self.inner).graceful_shutdown()
996        }
997    }
998
999    impl<I, B, S, E> Future for UpgradeableConnection<I, S, E>
1000    where
1001        S: HttpService<Body, ResBody = B>,
1002        S::Error: Into<Box<dyn StdError + Send + Sync>>,
1003        I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
1004        B: HttpBody + 'static,
1005        B::Error: Into<Box<dyn StdError + Send + Sync>>,
1006        E: ConnStreamExec<S::Future, B>,
1007    {
1008        type Output = crate::Result<()>;
1009
1010        fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1011            loop {
1012                match ready!(Pin::new(self.inner.conn.as_mut().unwrap()).poll(cx)) {
1013                    Ok(proto::Dispatched::Shutdown) => return Poll::Ready(Ok(())),
1014                    #[cfg(feature = "http1")]
1015                    Ok(proto::Dispatched::Upgrade(pending)) => {
1016                        match self.inner.conn.take() {
1017                            Some(ProtoServer::H1 { h1, .. }) => {
1018                                let (io, buf, _) = h1.into_inner();
1019                                pending.fulfill(Upgraded::new(io, buf));
1020                                return Poll::Ready(Ok(()));
1021                            }
1022                            _ => {
1023                                drop(pending);
1024                                unreachable!("Upgrade expects h1")
1025                            }
1026                        };
1027                    }
1028                    Err(e) => {
1029                        #[cfg(feature = "http1")]
1030                        #[cfg(feature = "http2")]
1031                        match *e.kind() {
1032                            Kind::Parse(Parse::VersionH2) if self.inner.fallback.to_h2() => {
1033                                self.inner.upgrade_h2();
1034                                continue;
1035                            }
1036                            _ => (),
1037                        }
1038
1039                        return Poll::Ready(Err(e));
1040                    }
1041                }
1042            }
1043        }
1044    }
1045}