hyper/server/
shutdown.rs

1use std::error::Error as StdError;
2
3use pin_project_lite::pin_project;
4use tokio::io::{AsyncRead, AsyncWrite};
5use tracing::debug;
6
7use super::accept::Accept;
8use super::conn::UpgradeableConnection;
9use super::server::{Server, Watcher};
10use crate::body::{Body, HttpBody};
11use crate::common::drain::{self, Draining, Signal, Watch, Watching};
12use crate::common::exec::{ConnStreamExec, NewSvcExec};
13use crate::common::{task, Future, Pin, Poll, Unpin};
14use crate::service::{HttpService, MakeServiceRef};
15
16pin_project! {
17    #[allow(missing_debug_implementations)]
18    pub struct Graceful<I, S, F, E> {
19        #[pin]
20        state: State<I, S, F, E>,
21    }
22}
23
24pin_project! {
25    #[project = StateProj]
26    pub(super) enum State<I, S, F, E> {
27        Running {
28            drain: Option<(Signal, Watch)>,
29            #[pin]
30            server: Server<I, S, E>,
31            #[pin]
32            signal: F,
33        },
34        Draining { draining: Draining },
35    }
36}
37
38impl<I, S, F, E> Graceful<I, S, F, E> {
39    pub(super) fn new(server: Server<I, S, E>, signal: F) -> Self {
40        let drain = Some(drain::channel());
41        Graceful {
42            state: State::Running {
43                drain,
44                server,
45                signal,
46            },
47        }
48    }
49}
50
51impl<I, IO, IE, S, B, F, E> Future for Graceful<I, S, F, E>
52where
53    I: Accept<Conn = IO, Error = IE>,
54    IE: Into<Box<dyn StdError + Send + Sync>>,
55    IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
56    S: MakeServiceRef<IO, Body, ResBody = B>,
57    S::Error: Into<Box<dyn StdError + Send + Sync>>,
58    B: HttpBody + 'static,
59    B::Error: Into<Box<dyn StdError + Send + Sync>>,
60    F: Future<Output = ()>,
61    E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,
62    E: NewSvcExec<IO, S::Future, S::Service, E, GracefulWatcher>,
63{
64    type Output = crate::Result<()>;
65
66    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
67        let mut me = self.project();
68        loop {
69            let next = {
70                match me.state.as_mut().project() {
71                    StateProj::Running {
72                        drain,
73                        server,
74                        signal,
75                    } => match signal.poll(cx) {
76                        Poll::Ready(()) => {
77                            debug!("signal received, starting graceful shutdown");
78                            let sig = drain.take().expect("drain channel").0;
79                            State::Draining {
80                                draining: sig.drain(),
81                            }
82                        }
83                        Poll::Pending => {
84                            let watch = drain.as_ref().expect("drain channel").1.clone();
85                            return server.poll_watch(cx, &GracefulWatcher(watch));
86                        }
87                    },
88                    StateProj::Draining { ref mut draining } => {
89                        return Pin::new(draining).poll(cx).map(Ok);
90                    }
91                }
92            };
93            me.state.set(next);
94        }
95    }
96}
97
98#[allow(missing_debug_implementations)]
99#[derive(Clone)]
100pub struct GracefulWatcher(Watch);
101
102impl<I, S, E> Watcher<I, S, E> for GracefulWatcher
103where
104    I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
105    S: HttpService<Body>,
106    E: ConnStreamExec<S::Future, S::ResBody>,
107    S::ResBody: 'static,
108    <S::ResBody as HttpBody>::Error: Into<Box<dyn StdError + Send + Sync>>,
109{
110    type Future =
111        Watching<UpgradeableConnection<I, S, E>, fn(Pin<&mut UpgradeableConnection<I, S, E>>)>;
112
113    fn watch(&self, conn: UpgradeableConnection<I, S, E>) -> Self::Future {
114        self.0.clone().watch(conn, on_drain)
115    }
116}
117
118fn on_drain<I, S, E>(conn: Pin<&mut UpgradeableConnection<I, S, E>>)
119where
120    S: HttpService<Body>,
121    S::Error: Into<Box<dyn StdError + Send + Sync>>,
122    I: AsyncRead + AsyncWrite + Unpin,
123    S::ResBody: HttpBody + 'static,
124    <S::ResBody as HttpBody>::Error: Into<Box<dyn StdError + Send + Sync>>,
125    E: ConnStreamExec<S::Future, S::ResBody>,
126{
127    conn.graceful_shutdown()
128}