1use std::error::Error as StdError;
2use std::fmt;
3#[cfg(feature = "tcp")]
4use std::net::{SocketAddr, TcpListener as StdTcpListener};
5#[cfg(any(feature = "tcp", feature = "http1"))]
6use std::time::Duration;
7
8use pin_project_lite::pin_project;
9use tokio::io::{AsyncRead, AsyncWrite};
10use tracing::trace;
11
12use super::accept::Accept;
13#[cfg(all(feature = "tcp"))]
14use super::tcp::AddrIncoming;
15use crate::body::{Body, HttpBody};
16use crate::common::exec::Exec;
17use crate::common::exec::{ConnStreamExec, NewSvcExec};
18use crate::common::{task, Future, Pin, Poll, Unpin};
19use super::conn::{Connection, Http as Http_, UpgradeableConnection};
22use super::shutdown::{Graceful, GracefulWatcher};
23use crate::service::{HttpService, MakeServiceRef};
24
25use self::new_svc::NewSvcTask;
26
27pin_project! {
28 pub struct Server<I, S, E = Exec> {
35 #[pin]
36 incoming: I,
37 make_service: S,
38 protocol: Http_<E>,
39 }
40}
41
42#[derive(Debug)]
44#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
45pub struct Builder<I, E = Exec> {
46 incoming: I,
47 protocol: Http_<E>,
48}
49
50#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
53impl<I> Server<I, ()> {
54 pub fn builder(incoming: I) -> Builder<I> {
56 Builder {
57 incoming,
58 protocol: Http_::new(),
59 }
60 }
61}
62
63#[cfg(feature = "tcp")]
64#[cfg_attr(
65 docsrs,
66 doc(cfg(all(feature = "tcp", any(feature = "http1", feature = "http2"))))
67)]
68impl Server<AddrIncoming, ()> {
69 pub fn bind(addr: &SocketAddr) -> Builder<AddrIncoming> {
76 let incoming = AddrIncoming::new(addr).unwrap_or_else(|e| {
77 panic!("error binding to {}: {}", addr, e);
78 });
79 Server::builder(incoming)
80 }
81
82 pub fn try_bind(addr: &SocketAddr) -> crate::Result<Builder<AddrIncoming>> {
84 AddrIncoming::new(addr).map(Server::builder)
85 }
86
87 pub fn from_tcp(listener: StdTcpListener) -> Result<Builder<AddrIncoming>, crate::Error> {
89 AddrIncoming::from_std(listener).map(Server::builder)
90 }
91}
92
93#[cfg(feature = "tcp")]
94#[cfg_attr(
95 docsrs,
96 doc(cfg(all(feature = "tcp", any(feature = "http1", feature = "http2"))))
97)]
98impl<S, E> Server<AddrIncoming, S, E> {
99 pub fn local_addr(&self) -> SocketAddr {
101 self.incoming.local_addr()
102 }
103}
104
105#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
106impl<I, IO, IE, S, E, B> Server<I, S, E>
107where
108 I: Accept<Conn = IO, Error = IE>,
109 IE: Into<Box<dyn StdError + Send + Sync>>,
110 IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
111 S: MakeServiceRef<IO, Body, ResBody = B>,
112 S::Error: Into<Box<dyn StdError + Send + Sync>>,
113 B: HttpBody + 'static,
114 B::Error: Into<Box<dyn StdError + Send + Sync>>,
115 E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,
116{
117 pub fn with_graceful_shutdown<F>(self, signal: F) -> Graceful<I, S, F, E>
154 where
155 F: Future<Output = ()>,
156 E: NewSvcExec<IO, S::Future, S::Service, E, GracefulWatcher>,
157 {
158 Graceful::new(self, signal)
159 }
160
161 fn poll_next_(
162 self: Pin<&mut Self>,
163 cx: &mut task::Context<'_>,
164 ) -> Poll<Option<crate::Result<Connecting<IO, S::Future, E>>>> {
165 let me = self.project();
166 match ready!(me.make_service.poll_ready_ref(cx)) {
167 Ok(()) => (),
168 Err(e) => {
169 trace!("make_service closed");
170 return Poll::Ready(Some(Err(crate::Error::new_user_make_service(e))));
171 }
172 }
173
174 if let Some(item) = ready!(me.incoming.poll_accept(cx)) {
175 let io = item.map_err(crate::Error::new_accept)?;
176 let new_fut = me.make_service.make_service_ref(&io);
177 Poll::Ready(Some(Ok(Connecting {
178 future: new_fut,
179 io: Some(io),
180 protocol: me.protocol.clone(),
181 })))
182 } else {
183 Poll::Ready(None)
184 }
185 }
186
187 pub(super) fn poll_watch<W>(
188 mut self: Pin<&mut Self>,
189 cx: &mut task::Context<'_>,
190 watcher: &W,
191 ) -> Poll<crate::Result<()>>
192 where
193 E: NewSvcExec<IO, S::Future, S::Service, E, W>,
194 W: Watcher<IO, S::Service, E>,
195 {
196 loop {
197 if let Some(connecting) = ready!(self.as_mut().poll_next_(cx)?) {
198 let fut = NewSvcTask::new(connecting, watcher.clone());
199 self.as_mut().project().protocol.exec.execute_new_svc(fut);
200 } else {
201 return Poll::Ready(Ok(()));
202 }
203 }
204 }
205}
206
207#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
208impl<I, IO, IE, S, B, E> Future for Server<I, S, E>
209where
210 I: Accept<Conn = IO, Error = IE>,
211 IE: Into<Box<dyn StdError + Send + Sync>>,
212 IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
213 S: MakeServiceRef<IO, Body, ResBody = B>,
214 S::Error: Into<Box<dyn StdError + Send + Sync>>,
215 B: HttpBody + 'static,
216 B::Error: Into<Box<dyn StdError + Send + Sync>>,
217 E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,
218 E: NewSvcExec<IO, S::Future, S::Service, E, NoopWatcher>,
219{
220 type Output = crate::Result<()>;
221
222 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
223 self.poll_watch(cx, &NoopWatcher)
224 }
225}
226
227impl<I: fmt::Debug, S: fmt::Debug> fmt::Debug for Server<I, S> {
228 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
229 let mut st = f.debug_struct("Server");
230 st.field("listener", &self.incoming);
231 st.finish()
232 }
233}
234
235#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
238impl<I, E> Builder<I, E> {
239 pub fn new(incoming: I, protocol: Http_<E>) -> Self {
243 Builder { incoming, protocol }
244 }
245
246 #[cfg(feature = "http1")]
250 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
251 pub fn http1_keepalive(mut self, val: bool) -> Self {
252 self.protocol.http1_keep_alive(val);
253 self
254 }
255
256 #[cfg(feature = "http1")]
265 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
266 pub fn http1_half_close(mut self, val: bool) -> Self {
267 self.protocol.http1_half_close(val);
268 self
269 }
270
271 #[cfg(feature = "http1")]
275 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
276 pub fn http1_max_buf_size(mut self, val: usize) -> Self {
277 self.protocol.max_buf_size(val);
278 self
279 }
280
281 #[doc(hidden)]
286 #[cfg(feature = "http1")]
287 pub fn http1_pipeline_flush(mut self, val: bool) -> Self {
288 self.protocol.pipeline_flush(val);
289 self
290 }
291
292 #[cfg(feature = "http1")]
305 pub fn http1_writev(mut self, enabled: bool) -> Self {
306 self.protocol.http1_writev(enabled);
307 self
308 }
309
310 #[cfg(feature = "http1")]
317 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
318 pub fn http1_title_case_headers(mut self, val: bool) -> Self {
319 self.protocol.http1_title_case_headers(val);
320 self
321 }
322
323 #[cfg(feature = "http1")]
337 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
338 pub fn http1_preserve_header_case(mut self, val: bool) -> Self {
339 self.protocol.http1_preserve_header_case(val);
340 self
341 }
342
343 #[cfg(all(feature = "http1", feature = "runtime"))]
348 #[cfg_attr(docsrs, doc(cfg(all(feature = "http1", feature = "runtime"))))]
349 pub fn http1_header_read_timeout(mut self, read_timeout: Duration) -> Self {
350 self.protocol.http1_header_read_timeout(read_timeout);
351 self
352 }
353
354 #[cfg(feature = "http1")]
358 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
359 pub fn http1_only(mut self, val: bool) -> Self {
360 self.protocol.http1_only(val);
361 self
362 }
363
364 #[cfg(feature = "http2")]
368 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
369 pub fn http2_only(mut self, val: bool) -> Self {
370 self.protocol.http2_only(val);
371 self
372 }
373
374 #[cfg(feature = "http2")]
383 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
384 pub fn http2_initial_stream_window_size(mut self, sz: impl Into<Option<u32>>) -> Self {
385 self.protocol.http2_initial_stream_window_size(sz.into());
386 self
387 }
388
389 #[cfg(feature = "http2")]
395 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
396 pub fn http2_initial_connection_window_size(mut self, sz: impl Into<Option<u32>>) -> Self {
397 self.protocol
398 .http2_initial_connection_window_size(sz.into());
399 self
400 }
401
402 #[cfg(feature = "http2")]
408 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
409 pub fn http2_adaptive_window(mut self, enabled: bool) -> Self {
410 self.protocol.http2_adaptive_window(enabled);
411 self
412 }
413
414 #[cfg(feature = "http2")]
420 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
421 pub fn http2_max_frame_size(mut self, sz: impl Into<Option<u32>>) -> Self {
422 self.protocol.http2_max_frame_size(sz);
423 self
424 }
425
426 #[cfg(feature = "http2")]
433 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
434 pub fn http2_max_concurrent_streams(mut self, max: impl Into<Option<u32>>) -> Self {
435 self.protocol.http2_max_concurrent_streams(max.into());
436 self
437 }
438
439 #[cfg(all(feature = "runtime", feature = "http2"))]
450 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
451 pub fn http2_keep_alive_interval(mut self, interval: impl Into<Option<Duration>>) -> Self {
452 self.protocol.http2_keep_alive_interval(interval);
453 self
454 }
455
456 #[cfg(all(feature = "runtime", feature = "http2"))]
467 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
468 pub fn http2_keep_alive_timeout(mut self, timeout: Duration) -> Self {
469 self.protocol.http2_keep_alive_timeout(timeout);
470 self
471 }
472
473 #[cfg(feature = "http2")]
481 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
482 pub fn http2_max_send_buf_size(mut self, max: usize) -> Self {
483 self.protocol.http2_max_send_buf_size(max);
484 self
485 }
486
487 #[cfg(feature = "http2")]
491 pub fn http2_enable_connect_protocol(mut self) -> Self {
492 self.protocol.http2_enable_connect_protocol();
493 self
494 }
495
496 pub fn executor<E2>(self, executor: E2) -> Builder<I, E2> {
500 Builder {
501 incoming: self.incoming,
502 protocol: self.protocol.with_executor(executor),
503 }
504 }
505
506 pub fn serve<S, B>(self, make_service: S) -> Server<I, S, E>
537 where
538 I: Accept,
539 I::Error: Into<Box<dyn StdError + Send + Sync>>,
540 I::Conn: AsyncRead + AsyncWrite + Unpin + Send + 'static,
541 S: MakeServiceRef<I::Conn, Body, ResBody = B>,
542 S::Error: Into<Box<dyn StdError + Send + Sync>>,
543 B: HttpBody + 'static,
544 B::Error: Into<Box<dyn StdError + Send + Sync>>,
545 E: NewSvcExec<I::Conn, S::Future, S::Service, E, NoopWatcher>,
546 E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,
547 {
548 Server {
549 incoming: self.incoming,
550 make_service,
551 protocol: self.protocol.clone(),
552 }
553 }
554}
555
556#[cfg(feature = "tcp")]
557#[cfg_attr(
558 docsrs,
559 doc(cfg(all(feature = "tcp", any(feature = "http1", feature = "http2"))))
560)]
561impl<E> Builder<AddrIncoming, E> {
562 pub fn tcp_keepalive(mut self, keepalive: Option<Duration>) -> Self {
568 self.incoming.set_keepalive(keepalive);
569 self
570 }
571
572 pub fn tcp_nodelay(mut self, enabled: bool) -> Self {
574 self.incoming.set_nodelay(enabled);
575 self
576 }
577
578 pub fn tcp_sleep_on_accept_errors(mut self, val: bool) -> Self {
594 self.incoming.set_sleep_on_errors(val);
595 self
596 }
597}
598
599pub trait Watcher<I, S: HttpService<Body>, E>: Clone {
608 type Future: Future<Output = crate::Result<()>>;
609
610 fn watch(&self, conn: UpgradeableConnection<I, S, E>) -> Self::Future;
611}
612
613#[allow(missing_debug_implementations)]
614#[derive(Copy, Clone)]
615pub struct NoopWatcher;
616
617impl<I, S, E> Watcher<I, S, E> for NoopWatcher
618where
619 I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
620 S: HttpService<Body>,
621 E: ConnStreamExec<S::Future, S::ResBody>,
622 S::ResBody: 'static,
623 <S::ResBody as HttpBody>::Error: Into<Box<dyn StdError + Send + Sync>>,
624{
625 type Future = UpgradeableConnection<I, S, E>;
626
627 fn watch(&self, conn: UpgradeableConnection<I, S, E>) -> Self::Future {
628 conn
629 }
630}
631
632pub(crate) mod new_svc {
634 use std::error::Error as StdError;
635 use tokio::io::{AsyncRead, AsyncWrite};
636 use tracing::debug;
637
638 use super::{Connecting, Watcher};
639 use crate::body::{Body, HttpBody};
640 use crate::common::exec::ConnStreamExec;
641 use crate::common::{task, Future, Pin, Poll, Unpin};
642 use crate::service::HttpService;
643 use pin_project_lite::pin_project;
644
645 pin_project! {
656 #[allow(missing_debug_implementations)]
657 pub struct NewSvcTask<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>> {
658 #[pin]
659 state: State<I, N, S, E, W>,
660 }
661 }
662
663 pin_project! {
664 #[project = StateProj]
665 pub(super) enum State<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>> {
666 Connecting {
667 #[pin]
668 connecting: Connecting<I, N, E>,
669 watcher: W,
670 },
671 Connected {
672 #[pin]
673 future: W::Future,
674 },
675 }
676 }
677
678 impl<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>> NewSvcTask<I, N, S, E, W> {
679 pub(super) fn new(connecting: Connecting<I, N, E>, watcher: W) -> Self {
680 NewSvcTask {
681 state: State::Connecting {
682 connecting,
683 watcher,
684 },
685 }
686 }
687 }
688
689 impl<I, N, S, NE, B, E, W> Future for NewSvcTask<I, N, S, E, W>
690 where
691 I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
692 N: Future<Output = Result<S, NE>>,
693 NE: Into<Box<dyn StdError + Send + Sync>>,
694 S: HttpService<Body, ResBody = B>,
695 B: HttpBody + 'static,
696 B::Error: Into<Box<dyn StdError + Send + Sync>>,
697 E: ConnStreamExec<S::Future, B>,
698 W: Watcher<I, S, E>,
699 {
700 type Output = ();
701
702 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
703 let mut me = self.project();
708 loop {
709 let next = {
710 match me.state.as_mut().project() {
711 StateProj::Connecting {
712 connecting,
713 watcher,
714 } => {
715 let res = ready!(connecting.poll(cx));
716 let conn = match res {
717 Ok(conn) => conn,
718 Err(err) => {
719 let err = crate::Error::new_user_make_service(err);
720 debug!("connecting error: {}", err);
721 return Poll::Ready(());
722 }
723 };
724 let future = watcher.watch(conn.with_upgrades());
725 State::Connected { future }
726 }
727 StateProj::Connected { future } => {
728 return future.poll(cx).map(|res| {
729 if let Err(err) = res {
730 debug!("connection error: {}", err);
731 }
732 });
733 }
734 }
735 };
736
737 me.state.set(next);
738 }
739 }
740 }
741}
742
743pin_project! {
744 #[must_use = "futures do nothing unless polled"]
749 #[derive(Debug)]
750 #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
751 pub struct Connecting<I, F, E = Exec> {
752 #[pin]
753 future: F,
754 io: Option<I>,
755 protocol: Http_<E>,
756 }
757}
758
759impl<I, F, S, FE, E, B> Future for Connecting<I, F, E>
760where
761 I: AsyncRead + AsyncWrite + Unpin,
762 F: Future<Output = Result<S, FE>>,
763 S: HttpService<Body, ResBody = B>,
764 B: HttpBody + 'static,
765 B::Error: Into<Box<dyn StdError + Send + Sync>>,
766 E: ConnStreamExec<S::Future, B>,
767{
768 type Output = Result<Connection<I, S, E>, FE>;
769
770 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
771 let mut me = self.project();
772 let service = ready!(me.future.poll(cx))?;
773 let io = Option::take(&mut me.io).expect("polled after complete");
774 Poll::Ready(Ok(me.protocol.serve_connection(io, service)))
775 }
776}