1#[cfg(all(
47 any(feature = "http1", feature = "http2"),
48 not(all(feature = "http1", feature = "http2"))
49))]
50use std::marker::PhantomData;
51#[cfg(all(any(feature = "http1", feature = "http2"), feature = "runtime"))]
52use std::time::Duration;
53
54#[cfg(feature = "http2")]
55use crate::common::io::Rewind;
56#[cfg(all(feature = "http1", feature = "http2"))]
57use crate::error::{Kind, Parse};
58#[cfg(feature = "http1")]
59use crate::upgrade::Upgraded;
60
61cfg_feature! {
62 #![any(feature = "http1", feature = "http2")]
63
64 use std::error::Error as StdError;
65 use std::fmt;
66
67 use bytes::Bytes;
68 use pin_project_lite::pin_project;
69 use tokio::io::{AsyncRead, AsyncWrite};
70 use tracing::trace;
71
72 pub use super::server::Connecting;
73 use crate::body::{Body, HttpBody};
74 use crate::common::{task, Future, Pin, Poll, Unpin};
75 #[cfg(not(all(feature = "http1", feature = "http2")))]
76 use crate::common::Never;
77 use crate::common::exec::{ConnStreamExec, Exec};
78 use crate::proto;
79 use crate::service::HttpService;
80
81 pub(super) use self::upgrades::UpgradeableConnection;
82}
83
84#[cfg(feature = "tcp")]
85pub use super::tcp::{AddrIncoming, AddrStream};
86
87#[derive(Clone, Debug)]
94#[cfg(any(feature = "http1", feature = "http2"))]
95#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
96pub struct Http<E = Exec> {
97 pub(crate) exec: E,
98 h1_half_close: bool,
99 h1_keep_alive: bool,
100 h1_title_case_headers: bool,
101 h1_preserve_header_case: bool,
102 #[cfg(all(feature = "http1", feature = "runtime"))]
103 h1_header_read_timeout: Option<Duration>,
104 h1_writev: Option<bool>,
105 #[cfg(feature = "http2")]
106 h2_builder: proto::h2::server::Config,
107 mode: ConnectionMode,
108 max_buf_size: Option<usize>,
109 pipeline_flush: bool,
110}
111
112#[cfg(any(feature = "http1", feature = "http2"))]
114#[derive(Clone, Debug, PartialEq)]
115enum ConnectionMode {
116 #[cfg(feature = "http1")]
118 H1Only,
119 #[cfg(feature = "http2")]
121 H2Only,
122 #[cfg(all(feature = "http1", feature = "http2"))]
124 Fallback,
125}
126
127#[cfg(any(feature = "http1", feature = "http2"))]
128pin_project! {
129 #[must_use = "futures do nothing unless polled"]
133 #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
134 pub struct Connection<T, S, E = Exec>
135 where
136 S: HttpService<Body>,
137 {
138 pub(super) conn: Option<ProtoServer<T, S::ResBody, S, E>>,
139 fallback: Fallback<E>,
140 }
141}
142
143#[cfg(feature = "http1")]
144type Http1Dispatcher<T, B, S> =
145 proto::h1::Dispatcher<proto::h1::dispatch::Server<S, Body>, B, T, proto::ServerTransaction>;
146
147#[cfg(all(not(feature = "http1"), feature = "http2"))]
148type Http1Dispatcher<T, B, S> = (Never, PhantomData<(T, Box<Pin<B>>, Box<Pin<S>>)>);
149
150#[cfg(feature = "http2")]
151type Http2Server<T, B, S, E> = proto::h2::Server<Rewind<T>, S, B, E>;
152
153#[cfg(all(not(feature = "http2"), feature = "http1"))]
154type Http2Server<T, B, S, E> = (
155 Never,
156 PhantomData<(T, Box<Pin<S>>, Box<Pin<B>>, Box<Pin<E>>)>,
157);
158
159#[cfg(any(feature = "http1", feature = "http2"))]
160pin_project! {
161 #[project = ProtoServerProj]
162 pub(super) enum ProtoServer<T, B, S, E = Exec>
163 where
164 S: HttpService<Body>,
165 B: HttpBody,
166 {
167 H1 {
168 #[pin]
169 h1: Http1Dispatcher<T, B, S>,
170 },
171 H2 {
172 #[pin]
173 h2: Http2Server<T, B, S, E>,
174 },
175 }
176}
177
178#[cfg(all(feature = "http1", feature = "http2"))]
179#[derive(Clone, Debug)]
180enum Fallback<E> {
181 ToHttp2(proto::h2::server::Config, E),
182 Http1Only,
183}
184
185#[cfg(all(
186 any(feature = "http1", feature = "http2"),
187 not(all(feature = "http1", feature = "http2"))
188))]
189type Fallback<E> = PhantomData<E>;
190
191#[cfg(all(feature = "http1", feature = "http2"))]
192impl<E> Fallback<E> {
193 fn to_h2(&self) -> bool {
194 match *self {
195 Fallback::ToHttp2(..) => true,
196 Fallback::Http1Only => false,
197 }
198 }
199}
200
201#[cfg(all(feature = "http1", feature = "http2"))]
202impl<E> Unpin for Fallback<E> {}
203
204#[derive(Debug)]
209#[cfg(any(feature = "http1", feature = "http2"))]
210#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
211pub struct Parts<T, S> {
212 pub io: T,
214 pub read_buf: Bytes,
223 pub service: S,
225 _inner: (),
226}
227
228#[cfg(any(feature = "http1", feature = "http2"))]
231impl Http {
232 pub fn new() -> Http {
235 Http {
236 exec: Exec::Default,
237 h1_half_close: false,
238 h1_keep_alive: true,
239 h1_title_case_headers: false,
240 h1_preserve_header_case: false,
241 #[cfg(all(feature = "http1", feature = "runtime"))]
242 h1_header_read_timeout: None,
243 h1_writev: None,
244 #[cfg(feature = "http2")]
245 h2_builder: Default::default(),
246 mode: ConnectionMode::default(),
247 max_buf_size: None,
248 pipeline_flush: false,
249 }
250 }
251}
252
253#[cfg(any(feature = "http1", feature = "http2"))]
254impl<E> Http<E> {
255 #[cfg(feature = "http1")]
259 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
260 pub fn http1_only(&mut self, val: bool) -> &mut Self {
261 if val {
262 self.mode = ConnectionMode::H1Only;
263 } else {
264 #[cfg(feature = "http2")]
265 {
266 self.mode = ConnectionMode::Fallback;
267 }
268 }
269 self
270 }
271
272 #[cfg(feature = "http1")]
281 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
282 pub fn http1_half_close(&mut self, val: bool) -> &mut Self {
283 self.h1_half_close = val;
284 self
285 }
286
287 #[cfg(feature = "http1")]
291 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
292 pub fn http1_keep_alive(&mut self, val: bool) -> &mut Self {
293 self.h1_keep_alive = val;
294 self
295 }
296
297 #[cfg(feature = "http1")]
304 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
305 pub fn http1_title_case_headers(&mut self, enabled: bool) -> &mut Self {
306 self.h1_title_case_headers = enabled;
307 self
308 }
309
310 #[cfg(feature = "http1")]
324 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
325 pub fn http1_preserve_header_case(&mut self, enabled: bool) -> &mut Self {
326 self.h1_preserve_header_case = enabled;
327 self
328 }
329
330 #[cfg(all(feature = "http1", feature = "runtime"))]
335 #[cfg_attr(docsrs, doc(cfg(all(feature = "http1", feature = "runtime"))))]
336 pub fn http1_header_read_timeout(&mut self, read_timeout: Duration) -> &mut Self {
337 self.h1_header_read_timeout = Some(read_timeout);
338 self
339 }
340
341 #[inline]
354 #[cfg(feature = "http1")]
355 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
356 pub fn http1_writev(&mut self, val: bool) -> &mut Self {
357 self.h1_writev = Some(val);
358 self
359 }
360
361 #[cfg(feature = "http2")]
365 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
366 pub fn http2_only(&mut self, val: bool) -> &mut Self {
367 if val {
368 self.mode = ConnectionMode::H2Only;
369 } else {
370 #[cfg(feature = "http1")]
371 {
372 self.mode = ConnectionMode::Fallback;
373 }
374 }
375 self
376 }
377
378 #[cfg(feature = "http2")]
387 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
388 pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
389 if let Some(sz) = sz.into() {
390 self.h2_builder.adaptive_window = false;
391 self.h2_builder.initial_stream_window_size = sz;
392 }
393 self
394 }
395
396 #[cfg(feature = "http2")]
402 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
403 pub fn http2_initial_connection_window_size(
404 &mut self,
405 sz: impl Into<Option<u32>>,
406 ) -> &mut Self {
407 if let Some(sz) = sz.into() {
408 self.h2_builder.adaptive_window = false;
409 self.h2_builder.initial_conn_window_size = sz;
410 }
411 self
412 }
413
414 #[cfg(feature = "http2")]
420 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
421 pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
422 use proto::h2::SPEC_WINDOW_SIZE;
423
424 self.h2_builder.adaptive_window = enabled;
425 if enabled {
426 self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE;
427 self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE;
428 }
429 self
430 }
431
432 #[cfg(feature = "http2")]
438 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
439 pub fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
440 if let Some(sz) = sz.into() {
441 self.h2_builder.max_frame_size = sz;
442 }
443 self
444 }
445
446 #[cfg(feature = "http2")]
453 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
454 pub fn http2_max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self {
455 self.h2_builder.max_concurrent_streams = max.into();
456 self
457 }
458
459 #[cfg(feature = "runtime")]
470 #[cfg(feature = "http2")]
471 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
472 pub fn http2_keep_alive_interval(
473 &mut self,
474 interval: impl Into<Option<Duration>>,
475 ) -> &mut Self {
476 self.h2_builder.keep_alive_interval = interval.into();
477 self
478 }
479
480 #[cfg(feature = "runtime")]
491 #[cfg(feature = "http2")]
492 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
493 pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
494 self.h2_builder.keep_alive_timeout = timeout;
495 self
496 }
497
498 #[cfg(feature = "http2")]
506 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
507 pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self {
508 assert!(max <= std::u32::MAX as usize);
509 self.h2_builder.max_send_buffer_size = max;
510 self
511 }
512
513 #[cfg(feature = "http2")]
517 pub fn http2_enable_connect_protocol(&mut self) -> &mut Self {
518 self.h2_builder.enable_connect_protocol = true;
519 self
520 }
521
522 #[cfg(feature = "http2")]
526 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
527 pub fn http2_max_header_list_size(&mut self, max: u32) -> &mut Self {
528 self.h2_builder.max_header_list_size = max;
529 self
530 }
531
532 #[cfg(feature = "http1")]
540 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
541 pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
542 assert!(
543 max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE,
544 "the max_buf_size cannot be smaller than the minimum that h1 specifies."
545 );
546 self.max_buf_size = Some(max);
547 self
548 }
549
550 pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self {
556 self.pipeline_flush = enabled;
557 self
558 }
559
560 pub fn with_executor<E2>(self, exec: E2) -> Http<E2> {
564 Http {
565 exec,
566 h1_half_close: self.h1_half_close,
567 h1_keep_alive: self.h1_keep_alive,
568 h1_title_case_headers: self.h1_title_case_headers,
569 h1_preserve_header_case: self.h1_preserve_header_case,
570 #[cfg(all(feature = "http1", feature = "runtime"))]
571 h1_header_read_timeout: self.h1_header_read_timeout,
572 h1_writev: self.h1_writev,
573 #[cfg(feature = "http2")]
574 h2_builder: self.h2_builder,
575 mode: self.mode,
576 max_buf_size: self.max_buf_size,
577 pipeline_flush: self.pipeline_flush,
578 }
579 }
580
581 pub fn serve_connection<S, I, Bd>(&self, io: I, service: S) -> Connection<I, S, E>
610 where
611 S: HttpService<Body, ResBody = Bd>,
612 S::Error: Into<Box<dyn StdError + Send + Sync>>,
613 Bd: HttpBody + 'static,
614 Bd::Error: Into<Box<dyn StdError + Send + Sync>>,
615 I: AsyncRead + AsyncWrite + Unpin,
616 E: ConnStreamExec<S::Future, Bd>,
617 {
618 #[cfg(feature = "http1")]
619 macro_rules! h1 {
620 () => {{
621 let mut conn = proto::Conn::new(io);
622 if !self.h1_keep_alive {
623 conn.disable_keep_alive();
624 }
625 if self.h1_half_close {
626 conn.set_allow_half_close();
627 }
628 if self.h1_title_case_headers {
629 conn.set_title_case_headers();
630 }
631 if self.h1_preserve_header_case {
632 conn.set_preserve_header_case();
633 }
634 #[cfg(all(feature = "http1", feature = "runtime"))]
635 if let Some(header_read_timeout) = self.h1_header_read_timeout {
636 conn.set_http1_header_read_timeout(header_read_timeout);
637 }
638 if let Some(writev) = self.h1_writev {
639 if writev {
640 conn.set_write_strategy_queue();
641 } else {
642 conn.set_write_strategy_flatten();
643 }
644 }
645 conn.set_flush_pipeline(self.pipeline_flush);
646 if let Some(max) = self.max_buf_size {
647 conn.set_max_buf_size(max);
648 }
649 let sd = proto::h1::dispatch::Server::new(service);
650 ProtoServer::H1 {
651 h1: proto::h1::Dispatcher::new(sd, conn),
652 }
653 }};
654 }
655
656 let proto = match self.mode {
657 #[cfg(feature = "http1")]
658 #[cfg(not(feature = "http2"))]
659 ConnectionMode::H1Only => h1!(),
660 #[cfg(feature = "http2")]
661 #[cfg(feature = "http1")]
662 ConnectionMode::H1Only | ConnectionMode::Fallback => h1!(),
663 #[cfg(feature = "http2")]
664 ConnectionMode::H2Only => {
665 let rewind_io = Rewind::new(io);
666 let h2 =
667 proto::h2::Server::new(rewind_io, service, &self.h2_builder, self.exec.clone());
668 ProtoServer::H2 { h2 }
669 }
670 };
671
672 Connection {
673 conn: Some(proto),
674 #[cfg(all(feature = "http1", feature = "http2"))]
675 fallback: if self.mode == ConnectionMode::Fallback {
676 Fallback::ToHttp2(self.h2_builder.clone(), self.exec.clone())
677 } else {
678 Fallback::Http1Only
679 },
680 #[cfg(not(all(feature = "http1", feature = "http2")))]
681 fallback: PhantomData,
682 }
683 }
684}
685
686#[cfg(any(feature = "http1", feature = "http2"))]
689impl<I, B, S, E> Connection<I, S, E>
690where
691 S: HttpService<Body, ResBody = B>,
692 S::Error: Into<Box<dyn StdError + Send + Sync>>,
693 I: AsyncRead + AsyncWrite + Unpin,
694 B: HttpBody + 'static,
695 B::Error: Into<Box<dyn StdError + Send + Sync>>,
696 E: ConnStreamExec<S::Future, B>,
697{
698 pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
709 match self.conn {
710 #[cfg(feature = "http1")]
711 Some(ProtoServer::H1 { ref mut h1, .. }) => {
712 h1.disable_keep_alive();
713 }
714 #[cfg(feature = "http2")]
715 Some(ProtoServer::H2 { ref mut h2 }) => {
716 h2.graceful_shutdown();
717 }
718 None => (),
719
720 #[cfg(not(feature = "http1"))]
721 Some(ProtoServer::H1 { ref mut h1, .. }) => match h1.0 {},
722 #[cfg(not(feature = "http2"))]
723 Some(ProtoServer::H2 { ref mut h2 }) => match h2.0 {},
724 }
725 }
726
727 pub fn into_parts(self) -> Parts<I, S> {
737 self.try_into_parts()
738 .unwrap_or_else(|| panic!("h2 cannot into_inner"))
739 }
740
741 pub fn try_into_parts(self) -> Option<Parts<I, S>> {
745 match self.conn.unwrap() {
746 #[cfg(feature = "http1")]
747 ProtoServer::H1 { h1, .. } => {
748 let (io, read_buf, dispatch) = h1.into_inner();
749 Some(Parts {
750 io,
751 read_buf,
752 service: dispatch.into_service(),
753 _inner: (),
754 })
755 }
756 ProtoServer::H2 { .. } => None,
757
758 #[cfg(not(feature = "http1"))]
759 ProtoServer::H1 { h1, .. } => match h1.0 {},
760 }
761 }
762
763 pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>
771 where
772 S: Unpin,
773 S::Future: Unpin,
774 B: Unpin,
775 {
776 loop {
777 match *self.conn.as_mut().unwrap() {
778 #[cfg(feature = "http1")]
779 ProtoServer::H1 { ref mut h1, .. } => match ready!(h1.poll_without_shutdown(cx)) {
780 Ok(()) => return Poll::Ready(Ok(())),
781 Err(e) => {
782 #[cfg(feature = "http2")]
783 match *e.kind() {
784 Kind::Parse(Parse::VersionH2) if self.fallback.to_h2() => {
785 self.upgrade_h2();
786 continue;
787 }
788 _ => (),
789 }
790
791 return Poll::Ready(Err(e));
792 }
793 },
794 #[cfg(feature = "http2")]
795 ProtoServer::H2 { ref mut h2 } => return Pin::new(h2).poll(cx).map_ok(|_| ()),
796
797 #[cfg(not(feature = "http1"))]
798 ProtoServer::H1 { ref mut h1, .. } => match h1.0 {},
799 #[cfg(not(feature = "http2"))]
800 ProtoServer::H2 { ref mut h2 } => match h2.0 {},
801 };
802 }
803 }
804
805 pub fn without_shutdown(self) -> impl Future<Output = crate::Result<Parts<I, S>>>
812 where
813 S: Unpin,
814 S::Future: Unpin,
815 B: Unpin,
816 {
817 let mut conn = Some(self);
818 futures_util::future::poll_fn(move |cx| {
819 ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?;
820 Poll::Ready(conn.take().unwrap().try_into_parts().ok_or_else(crate::Error::new_without_shutdown_not_h1))
821 })
822 }
823
824 #[cfg(all(feature = "http1", feature = "http2"))]
825 fn upgrade_h2(&mut self) {
826 trace!("Trying to upgrade connection to h2");
827 let conn = self.conn.take();
828
829 let (io, read_buf, dispatch) = match conn.unwrap() {
830 ProtoServer::H1 { h1, .. } => h1.into_inner(),
831 ProtoServer::H2 { .. } => {
832 panic!("h2 cannot into_inner");
833 }
834 };
835 let mut rewind_io = Rewind::new(io);
836 rewind_io.rewind(read_buf);
837 let (builder, exec) = match self.fallback {
838 Fallback::ToHttp2(ref builder, ref exec) => (builder, exec),
839 Fallback::Http1Only => unreachable!("upgrade_h2 with Fallback::Http1Only"),
840 };
841 let h2 = proto::h2::Server::new(rewind_io, dispatch.into_service(), builder, exec.clone());
842
843 debug_assert!(self.conn.is_none());
844 self.conn = Some(ProtoServer::H2 { h2 });
845 }
846
847 pub fn with_upgrades(self) -> UpgradeableConnection<I, S, E>
851 where
852 I: Send,
853 {
854 UpgradeableConnection { inner: self }
855 }
856}
857
858#[cfg(any(feature = "http1", feature = "http2"))]
859impl<I, B, S, E> Future for Connection<I, S, E>
860where
861 S: HttpService<Body, ResBody = B>,
862 S::Error: Into<Box<dyn StdError + Send + Sync>>,
863 I: AsyncRead + AsyncWrite + Unpin + 'static,
864 B: HttpBody + 'static,
865 B::Error: Into<Box<dyn StdError + Send + Sync>>,
866 E: ConnStreamExec<S::Future, B>,
867{
868 type Output = crate::Result<()>;
869
870 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
871 loop {
872 match ready!(Pin::new(self.conn.as_mut().unwrap()).poll(cx)) {
873 Ok(done) => {
874 match done {
875 proto::Dispatched::Shutdown => {}
876 #[cfg(feature = "http1")]
877 proto::Dispatched::Upgrade(pending) => {
878 pending.manual();
883 }
884 };
885 return Poll::Ready(Ok(()));
886 }
887 Err(e) => {
888 #[cfg(feature = "http1")]
889 #[cfg(feature = "http2")]
890 match *e.kind() {
891 Kind::Parse(Parse::VersionH2) if self.fallback.to_h2() => {
892 self.upgrade_h2();
893 continue;
894 }
895 _ => (),
896 }
897
898 return Poll::Ready(Err(e));
899 }
900 }
901 }
902 }
903}
904
905#[cfg(any(feature = "http1", feature = "http2"))]
906impl<I, S> fmt::Debug for Connection<I, S>
907where
908 S: HttpService<Body>,
909{
910 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
911 f.debug_struct("Connection").finish()
912 }
913}
914
915#[cfg(any(feature = "http1", feature = "http2"))]
918impl Default for ConnectionMode {
919 #[cfg(all(feature = "http1", feature = "http2"))]
920 fn default() -> ConnectionMode {
921 ConnectionMode::Fallback
922 }
923
924 #[cfg(all(feature = "http1", not(feature = "http2")))]
925 fn default() -> ConnectionMode {
926 ConnectionMode::H1Only
927 }
928
929 #[cfg(all(not(feature = "http1"), feature = "http2"))]
930 fn default() -> ConnectionMode {
931 ConnectionMode::H2Only
932 }
933}
934
935#[cfg(any(feature = "http1", feature = "http2"))]
938impl<T, B, S, E> Future for ProtoServer<T, B, S, E>
939where
940 T: AsyncRead + AsyncWrite + Unpin,
941 S: HttpService<Body, ResBody = B>,
942 S::Error: Into<Box<dyn StdError + Send + Sync>>,
943 B: HttpBody + 'static,
944 B::Error: Into<Box<dyn StdError + Send + Sync>>,
945 E: ConnStreamExec<S::Future, B>,
946{
947 type Output = crate::Result<proto::Dispatched>;
948
949 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
950 match self.project() {
951 #[cfg(feature = "http1")]
952 ProtoServerProj::H1 { h1, .. } => h1.poll(cx),
953 #[cfg(feature = "http2")]
954 ProtoServerProj::H2 { h2 } => h2.poll(cx),
955
956 #[cfg(not(feature = "http1"))]
957 ProtoServerProj::H1 { h1, .. } => match h1.0 {},
958 #[cfg(not(feature = "http2"))]
959 ProtoServerProj::H2 { h2 } => match h2.0 {},
960 }
961 }
962}
963
964#[cfg(any(feature = "http1", feature = "http2"))]
965mod upgrades {
966 use super::*;
967
968 #[must_use = "futures do nothing unless polled"]
973 #[allow(missing_debug_implementations)]
974 pub struct UpgradeableConnection<T, S, E>
975 where
976 S: HttpService<Body>,
977 {
978 pub(super) inner: Connection<T, S, E>,
979 }
980
981 impl<I, B, S, E> UpgradeableConnection<I, S, E>
982 where
983 S: HttpService<Body, ResBody = B>,
984 S::Error: Into<Box<dyn StdError + Send + Sync>>,
985 I: AsyncRead + AsyncWrite + Unpin,
986 B: HttpBody + 'static,
987 B::Error: Into<Box<dyn StdError + Send + Sync>>,
988 E: ConnStreamExec<S::Future, B>,
989 {
990 pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
995 Pin::new(&mut self.inner).graceful_shutdown()
996 }
997 }
998
999 impl<I, B, S, E> Future for UpgradeableConnection<I, S, E>
1000 where
1001 S: HttpService<Body, ResBody = B>,
1002 S::Error: Into<Box<dyn StdError + Send + Sync>>,
1003 I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
1004 B: HttpBody + 'static,
1005 B::Error: Into<Box<dyn StdError + Send + Sync>>,
1006 E: ConnStreamExec<S::Future, B>,
1007 {
1008 type Output = crate::Result<()>;
1009
1010 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1011 loop {
1012 match ready!(Pin::new(self.inner.conn.as_mut().unwrap()).poll(cx)) {
1013 Ok(proto::Dispatched::Shutdown) => return Poll::Ready(Ok(())),
1014 #[cfg(feature = "http1")]
1015 Ok(proto::Dispatched::Upgrade(pending)) => {
1016 match self.inner.conn.take() {
1017 Some(ProtoServer::H1 { h1, .. }) => {
1018 let (io, buf, _) = h1.into_inner();
1019 pending.fulfill(Upgraded::new(io, buf));
1020 return Poll::Ready(Ok(()));
1021 }
1022 _ => {
1023 drop(pending);
1024 unreachable!("Upgrade expects h1")
1025 }
1026 };
1027 }
1028 Err(e) => {
1029 #[cfg(feature = "http1")]
1030 #[cfg(feature = "http2")]
1031 match *e.kind() {
1032 Kind::Parse(Parse::VersionH2) if self.inner.fallback.to_h2() => {
1033 self.inner.upgrade_h2();
1034 continue;
1035 }
1036 _ => (),
1037 }
1038
1039 return Poll::Ready(Err(e));
1040 }
1041 }
1042 }
1043 }
1044 }
1045}