hyper/proto/h1/
conn.rs

1use std::fmt;
2use std::io;
3use std::marker::PhantomData;
4#[cfg(all(feature = "server", feature = "runtime"))]
5use std::time::Duration;
6
7use bytes::{Buf, Bytes};
8use http::header::{HeaderValue, CONNECTION};
9use http::{HeaderMap, Method, Version};
10use httparse::ParserConfig;
11use tokio::io::{AsyncRead, AsyncWrite};
12#[cfg(all(feature = "server", feature = "runtime"))]
13use tokio::time::Sleep;
14use tracing::{debug, error, trace};
15
16use super::io::Buffered;
17use super::{Decoder, Encode, EncodedBuf, Encoder, Http1Transaction, ParseContext, Wants};
18use crate::body::DecodedLength;
19use crate::common::{task, Pin, Poll, Unpin};
20use crate::headers::connection_keep_alive;
21use crate::proto::{BodyLength, MessageHead};
22
23const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
24
25/// This handles a connection, which will have been established over an
26/// `AsyncRead + AsyncWrite` (like a socket), and will likely include multiple
27/// `Transaction`s over HTTP.
28///
29/// The connection will determine when a message begins and ends as well as
30/// determine if this connection can be kept alive after the message,
31/// or if it is complete.
32pub(crate) struct Conn<I, B, T> {
33    io: Buffered<I, EncodedBuf<B>>,
34    state: State,
35    _marker: PhantomData<fn(T)>,
36}
37
38impl<I, B, T> Conn<I, B, T>
39where
40    I: AsyncRead + AsyncWrite + Unpin,
41    B: Buf,
42    T: Http1Transaction,
43{
44    pub(crate) fn new(io: I) -> Conn<I, B, T> {
45        Conn {
46            io: Buffered::new(io),
47            state: State {
48                allow_half_close: false,
49                cached_headers: None,
50                error: None,
51                keep_alive: KA::Busy,
52                method: None,
53                h1_parser_config: ParserConfig::default(),
54                #[cfg(all(feature = "server", feature = "runtime"))]
55                h1_header_read_timeout: None,
56                #[cfg(all(feature = "server", feature = "runtime"))]
57                h1_header_read_timeout_fut: None,
58                #[cfg(all(feature = "server", feature = "runtime"))]
59                h1_header_read_timeout_running: false,
60                preserve_header_case: false,
61                #[cfg(feature = "ffi")]
62                preserve_header_order: false,
63                title_case_headers: false,
64                h09_responses: false,
65                #[cfg(feature = "ffi")]
66                on_informational: None,
67                #[cfg(feature = "ffi")]
68                raw_headers: false,
69                notify_read: false,
70                reading: Reading::Init,
71                writing: Writing::Init,
72                upgrade: None,
73                // We assume a modern world where the remote speaks HTTP/1.1.
74                // If they tell us otherwise, we'll downgrade in `read_head`.
75                version: Version::HTTP_11,
76            },
77            _marker: PhantomData,
78        }
79    }
80
81    #[cfg(feature = "server")]
82    pub(crate) fn set_flush_pipeline(&mut self, enabled: bool) {
83        self.io.set_flush_pipeline(enabled);
84    }
85
86    pub(crate) fn set_write_strategy_queue(&mut self) {
87        self.io.set_write_strategy_queue();
88    }
89
90    pub(crate) fn set_max_buf_size(&mut self, max: usize) {
91        self.io.set_max_buf_size(max);
92    }
93
94    #[cfg(feature = "client")]
95    pub(crate) fn set_read_buf_exact_size(&mut self, sz: usize) {
96        self.io.set_read_buf_exact_size(sz);
97    }
98
99    pub(crate) fn set_write_strategy_flatten(&mut self) {
100        self.io.set_write_strategy_flatten();
101    }
102
103    #[cfg(feature = "client")]
104    pub(crate) fn set_h1_parser_config(&mut self, parser_config: ParserConfig) {
105        self.state.h1_parser_config = parser_config;
106    }
107
108    pub(crate) fn set_title_case_headers(&mut self) {
109        self.state.title_case_headers = true;
110    }
111
112    pub(crate) fn set_preserve_header_case(&mut self) {
113        self.state.preserve_header_case = true;
114    }
115
116    #[cfg(feature = "ffi")]
117    pub(crate) fn set_preserve_header_order(&mut self) {
118        self.state.preserve_header_order = true;
119    }
120
121    #[cfg(feature = "client")]
122    pub(crate) fn set_h09_responses(&mut self) {
123        self.state.h09_responses = true;
124    }
125
126    #[cfg(all(feature = "server", feature = "runtime"))]
127    pub(crate) fn set_http1_header_read_timeout(&mut self, val: Duration) {
128        self.state.h1_header_read_timeout = Some(val);
129    }
130
131    #[cfg(feature = "server")]
132    pub(crate) fn set_allow_half_close(&mut self) {
133        self.state.allow_half_close = true;
134    }
135
136    #[cfg(feature = "ffi")]
137    pub(crate) fn set_raw_headers(&mut self, enabled: bool) {
138        self.state.raw_headers = enabled;
139    }
140
141    pub(crate) fn into_inner(self) -> (I, Bytes) {
142        self.io.into_inner()
143    }
144
145    pub(crate) fn pending_upgrade(&mut self) -> Option<crate::upgrade::Pending> {
146        self.state.upgrade.take()
147    }
148
149    pub(crate) fn is_read_closed(&self) -> bool {
150        self.state.is_read_closed()
151    }
152
153    pub(crate) fn is_write_closed(&self) -> bool {
154        self.state.is_write_closed()
155    }
156
157    pub(crate) fn can_read_head(&self) -> bool {
158        if !matches!(self.state.reading, Reading::Init) {
159            return false;
160        }
161
162        if T::should_read_first() {
163            return true;
164        }
165
166        !matches!(self.state.writing, Writing::Init)
167    }
168
169    pub(crate) fn can_read_body(&self) -> bool {
170        match self.state.reading {
171            Reading::Body(..) | Reading::Continue(..) => true,
172            _ => false,
173        }
174    }
175
176    fn should_error_on_eof(&self) -> bool {
177        // If we're idle, it's probably just the connection closing gracefully.
178        T::should_error_on_parse_eof() && !self.state.is_idle()
179    }
180
181    fn has_h2_prefix(&self) -> bool {
182        let read_buf = self.io.read_buf();
183        read_buf.len() >= 24 && read_buf[..24] == *H2_PREFACE
184    }
185
186    pub(super) fn poll_read_head(
187        &mut self,
188        cx: &mut task::Context<'_>,
189    ) -> Poll<Option<crate::Result<(MessageHead<T::Incoming>, DecodedLength, Wants)>>> {
190        debug_assert!(self.can_read_head());
191        trace!("Conn::read_head");
192
193        let msg = match ready!(self.io.parse::<T>(
194            cx,
195            ParseContext {
196                cached_headers: &mut self.state.cached_headers,
197                req_method: &mut self.state.method,
198                h1_parser_config: self.state.h1_parser_config.clone(),
199                #[cfg(all(feature = "server", feature = "runtime"))]
200                h1_header_read_timeout: self.state.h1_header_read_timeout,
201                #[cfg(all(feature = "server", feature = "runtime"))]
202                h1_header_read_timeout_fut: &mut self.state.h1_header_read_timeout_fut,
203                #[cfg(all(feature = "server", feature = "runtime"))]
204                h1_header_read_timeout_running: &mut self.state.h1_header_read_timeout_running,
205                preserve_header_case: self.state.preserve_header_case,
206                #[cfg(feature = "ffi")]
207                preserve_header_order: self.state.preserve_header_order,
208                h09_responses: self.state.h09_responses,
209                #[cfg(feature = "ffi")]
210                on_informational: &mut self.state.on_informational,
211                #[cfg(feature = "ffi")]
212                raw_headers: self.state.raw_headers,
213            }
214        )) {
215            Ok(msg) => msg,
216            Err(e) => return self.on_read_head_error(e),
217        };
218
219        // Note: don't deconstruct `msg` into local variables, it appears
220        // the optimizer doesn't remove the extra copies.
221
222        debug!("incoming body is {}", msg.decode);
223
224        // Prevent accepting HTTP/0.9 responses after the initial one, if any.
225        self.state.h09_responses = false;
226
227        // Drop any OnInformational callbacks, we're done there!
228        #[cfg(feature = "ffi")]
229        {
230            self.state.on_informational = None;
231        }
232
233        self.state.busy();
234        self.state.keep_alive &= msg.keep_alive;
235        self.state.version = msg.head.version;
236
237        let mut wants = if msg.wants_upgrade {
238            Wants::UPGRADE
239        } else {
240            Wants::EMPTY
241        };
242
243        if msg.decode == DecodedLength::ZERO {
244            if msg.expect_continue {
245                debug!("ignoring expect-continue since body is empty");
246            }
247            self.state.reading = Reading::KeepAlive;
248            if !T::should_read_first() {
249                self.try_keep_alive(cx);
250            }
251        } else if msg.expect_continue {
252            self.state.reading = Reading::Continue(Decoder::new(msg.decode));
253            wants = wants.add(Wants::EXPECT);
254        } else {
255            self.state.reading = Reading::Body(Decoder::new(msg.decode));
256        }
257
258        Poll::Ready(Some(Ok((msg.head, msg.decode, wants))))
259    }
260
261    fn on_read_head_error<Z>(&mut self, e: crate::Error) -> Poll<Option<crate::Result<Z>>> {
262        // If we are currently waiting on a message, then an empty
263        // message should be reported as an error. If not, it is just
264        // the connection closing gracefully.
265        let must_error = self.should_error_on_eof();
266        self.close_read();
267        self.io.consume_leading_lines();
268        let was_mid_parse = e.is_parse() || !self.io.read_buf().is_empty();
269        if was_mid_parse || must_error {
270            // We check if the buf contains the h2 Preface
271            debug!(
272                "parse error ({}) with {} bytes",
273                e,
274                self.io.read_buf().len()
275            );
276            match self.on_parse_error(e) {
277                Ok(()) => Poll::Pending, // XXX: wat?
278                Err(e) => Poll::Ready(Some(Err(e))),
279            }
280        } else {
281            debug!("read eof");
282            self.close_write();
283            Poll::Ready(None)
284        }
285    }
286
287    pub(crate) fn poll_read_body(
288        &mut self,
289        cx: &mut task::Context<'_>,
290    ) -> Poll<Option<io::Result<Bytes>>> {
291        debug_assert!(self.can_read_body());
292
293        let (reading, ret) = match self.state.reading {
294            Reading::Body(ref mut decoder) => {
295                match ready!(decoder.decode(cx, &mut self.io)) {
296                    Ok(slice) => {
297                        let (reading, chunk) = if decoder.is_eof() {
298                            debug!("incoming body completed");
299                            (
300                                Reading::KeepAlive,
301                                if !slice.is_empty() {
302                                    Some(Ok(slice))
303                                } else {
304                                    None
305                                },
306                            )
307                        } else if slice.is_empty() {
308                            error!("incoming body unexpectedly ended");
309                            // This should be unreachable, since all 3 decoders
310                            // either set eof=true or return an Err when reading
311                            // an empty slice...
312                            (Reading::Closed, None)
313                        } else {
314                            return Poll::Ready(Some(Ok(slice)));
315                        };
316                        (reading, Poll::Ready(chunk))
317                    }
318                    Err(e) => {
319                        debug!("incoming body decode error: {}", e);
320                        (Reading::Closed, Poll::Ready(Some(Err(e))))
321                    }
322                }
323            }
324            Reading::Continue(ref decoder) => {
325                // Write the 100 Continue if not already responded...
326                if let Writing::Init = self.state.writing {
327                    trace!("automatically sending 100 Continue");
328                    let cont = b"HTTP/1.1 100 Continue\r\n\r\n";
329                    self.io.headers_buf().extend_from_slice(cont);
330                }
331
332                // And now recurse once in the Reading::Body state...
333                self.state.reading = Reading::Body(decoder.clone());
334                return self.poll_read_body(cx);
335            }
336            _ => unreachable!("poll_read_body invalid state: {:?}", self.state.reading),
337        };
338
339        self.state.reading = reading;
340        self.try_keep_alive(cx);
341        ret
342    }
343
344    pub(crate) fn wants_read_again(&mut self) -> bool {
345        let ret = self.state.notify_read;
346        self.state.notify_read = false;
347        ret
348    }
349
350    pub(crate) fn poll_read_keep_alive(
351        &mut self,
352        cx: &mut task::Context<'_>,
353    ) -> Poll<crate::Result<()>> {
354        debug_assert!(!self.can_read_head() && !self.can_read_body());
355
356        if self.is_read_closed() {
357            Poll::Pending
358        } else if self.is_mid_message() {
359            self.mid_message_detect_eof(cx)
360        } else {
361            self.require_empty_read(cx)
362        }
363    }
364
365    fn is_mid_message(&self) -> bool {
366        !matches!(
367            (&self.state.reading, &self.state.writing),
368            (&Reading::Init, &Writing::Init)
369        )
370    }
371
372    // This will check to make sure the io object read is empty.
373    //
374    // This should only be called for Clients wanting to enter the idle
375    // state.
376    fn require_empty_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
377        debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed());
378        debug_assert!(!self.is_mid_message());
379        debug_assert!(T::is_client());
380
381        if !self.io.read_buf().is_empty() {
382            debug!("received an unexpected {} bytes", self.io.read_buf().len());
383            return Poll::Ready(Err(crate::Error::new_unexpected_message()));
384        }
385
386        let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?;
387
388        if num_read == 0 {
389            let ret = if self.should_error_on_eof() {
390                trace!("found unexpected EOF on busy connection: {:?}", self.state);
391                Poll::Ready(Err(crate::Error::new_incomplete()))
392            } else {
393                trace!("found EOF on idle connection, closing");
394                Poll::Ready(Ok(()))
395            };
396
397            // order is important: should_error needs state BEFORE close_read
398            self.state.close_read();
399            return ret;
400        }
401
402        debug!(
403            "received unexpected {} bytes on an idle connection",
404            num_read
405        );
406        Poll::Ready(Err(crate::Error::new_unexpected_message()))
407    }
408
409    fn mid_message_detect_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
410        debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed());
411        debug_assert!(self.is_mid_message());
412
413        if self.state.allow_half_close || !self.io.read_buf().is_empty() {
414            return Poll::Pending;
415        }
416
417        let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?;
418
419        if num_read == 0 {
420            trace!("found unexpected EOF on busy connection: {:?}", self.state);
421            self.state.close_read();
422            Poll::Ready(Err(crate::Error::new_incomplete()))
423        } else {
424            Poll::Ready(Ok(()))
425        }
426    }
427
428    fn force_io_read(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<usize>> {
429        debug_assert!(!self.state.is_read_closed());
430
431        let result = ready!(self.io.poll_read_from_io(cx));
432        Poll::Ready(result.map_err(|e| {
433            trace!("force_io_read; io error = {:?}", e);
434            self.state.close();
435            e
436        }))
437    }
438
439    fn maybe_notify(&mut self, cx: &mut task::Context<'_>) {
440        // its possible that we returned NotReady from poll() without having
441        // exhausted the underlying Io. We would have done this when we
442        // determined we couldn't keep reading until we knew how writing
443        // would finish.
444
445        match self.state.reading {
446            Reading::Continue(..) | Reading::Body(..) | Reading::KeepAlive | Reading::Closed => {
447                return
448            }
449            Reading::Init => (),
450        };
451
452        match self.state.writing {
453            Writing::Body(..) => return,
454            Writing::Init | Writing::KeepAlive | Writing::Closed => (),
455        }
456
457        if !self.io.is_read_blocked() {
458            if self.io.read_buf().is_empty() {
459                match self.io.poll_read_from_io(cx) {
460                    Poll::Ready(Ok(n)) => {
461                        if n == 0 {
462                            trace!("maybe_notify; read eof");
463                            if self.state.is_idle() {
464                                self.state.close();
465                            } else {
466                                self.close_read()
467                            }
468                            return;
469                        }
470                    }
471                    Poll::Pending => {
472                        trace!("maybe_notify; read_from_io blocked");
473                        return;
474                    }
475                    Poll::Ready(Err(e)) => {
476                        trace!("maybe_notify; read_from_io error: {}", e);
477                        self.state.close();
478                        self.state.error = Some(crate::Error::new_io(e));
479                    }
480                }
481            }
482            self.state.notify_read = true;
483        }
484    }
485
486    fn try_keep_alive(&mut self, cx: &mut task::Context<'_>) {
487        self.state.try_keep_alive::<T>();
488        self.maybe_notify(cx);
489    }
490
491    pub(crate) fn can_write_head(&self) -> bool {
492        if !T::should_read_first() && matches!(self.state.reading, Reading::Closed) {
493            return false;
494        }
495
496        match self.state.writing {
497            Writing::Init => self.io.can_headers_buf(),
498            _ => false,
499        }
500    }
501
502    pub(crate) fn can_write_body(&self) -> bool {
503        match self.state.writing {
504            Writing::Body(..) => true,
505            Writing::Init | Writing::KeepAlive | Writing::Closed => false,
506        }
507    }
508
509    pub(crate) fn can_buffer_body(&self) -> bool {
510        self.io.can_buffer()
511    }
512
513    pub(crate) fn write_head(&mut self, head: MessageHead<T::Outgoing>, body: Option<BodyLength>) {
514        if let Some(encoder) = self.encode_head(head, body) {
515            self.state.writing = if !encoder.is_eof() {
516                Writing::Body(encoder)
517            } else if encoder.is_last() {
518                Writing::Closed
519            } else {
520                Writing::KeepAlive
521            };
522        }
523    }
524
525    pub(crate) fn write_full_msg(&mut self, head: MessageHead<T::Outgoing>, body: B) {
526        if let Some(encoder) =
527            self.encode_head(head, Some(BodyLength::Known(body.remaining() as u64)))
528        {
529            let is_last = encoder.is_last();
530            // Make sure we don't write a body if we weren't actually allowed
531            // to do so, like because its a HEAD request.
532            if !encoder.is_eof() {
533                encoder.danger_full_buf(body, self.io.write_buf());
534            }
535            self.state.writing = if is_last {
536                Writing::Closed
537            } else {
538                Writing::KeepAlive
539            }
540        }
541    }
542
543    fn encode_head(
544        &mut self,
545        mut head: MessageHead<T::Outgoing>,
546        body: Option<BodyLength>,
547    ) -> Option<Encoder> {
548        debug_assert!(self.can_write_head());
549
550        if !T::should_read_first() {
551            self.state.busy();
552        }
553
554        self.enforce_version(&mut head);
555
556        let buf = self.io.headers_buf();
557        match super::role::encode_headers::<T>(
558            Encode {
559                head: &mut head,
560                body,
561                #[cfg(feature = "server")]
562                keep_alive: self.state.wants_keep_alive(),
563                req_method: &mut self.state.method,
564                title_case_headers: self.state.title_case_headers,
565            },
566            buf,
567        ) {
568            Ok(encoder) => {
569                debug_assert!(self.state.cached_headers.is_none());
570                debug_assert!(head.headers.is_empty());
571                self.state.cached_headers = Some(head.headers);
572
573                #[cfg(feature = "ffi")]
574                {
575                    self.state.on_informational =
576                        head.extensions.remove::<crate::ffi::OnInformational>();
577                }
578
579                Some(encoder)
580            }
581            Err(err) => {
582                self.state.error = Some(err);
583                self.state.writing = Writing::Closed;
584                None
585            }
586        }
587    }
588
589    // Fix keep-alives when Connection: keep-alive header is not present
590    fn fix_keep_alive(&mut self, head: &mut MessageHead<T::Outgoing>) {
591        let outgoing_is_keep_alive = head
592            .headers
593            .get(CONNECTION)
594            .map(connection_keep_alive)
595            .unwrap_or(false);
596
597        if !outgoing_is_keep_alive {
598            match head.version {
599                // If response is version 1.0 and keep-alive is not present in the response,
600                // disable keep-alive so the server closes the connection
601                Version::HTTP_10 => self.state.disable_keep_alive(),
602                // If response is version 1.1 and keep-alive is wanted, add
603                // Connection: keep-alive header when not present
604                Version::HTTP_11 => {
605                    if self.state.wants_keep_alive() {
606                        head.headers
607                            .insert(CONNECTION, HeaderValue::from_static("keep-alive"));
608                    }
609                }
610                _ => (),
611            }
612        }
613    }
614
615    // If we know the remote speaks an older version, we try to fix up any messages
616    // to work with our older peer.
617    fn enforce_version(&mut self, head: &mut MessageHead<T::Outgoing>) {
618        if let Version::HTTP_10 = self.state.version {
619            // Fixes response or connection when keep-alive header is not present
620            self.fix_keep_alive(head);
621            // If the remote only knows HTTP/1.0, we should force ourselves
622            // to do only speak HTTP/1.0 as well.
623            head.version = Version::HTTP_10;
624        }
625        // If the remote speaks HTTP/1.1, then it *should* be fine with
626        // both HTTP/1.0 and HTTP/1.1 from us. So again, we just let
627        // the user's headers be.
628    }
629
630    pub(crate) fn write_body(&mut self, chunk: B) {
631        debug_assert!(self.can_write_body() && self.can_buffer_body());
632        // empty chunks should be discarded at Dispatcher level
633        debug_assert!(chunk.remaining() != 0);
634
635        let state = match self.state.writing {
636            Writing::Body(ref mut encoder) => {
637                self.io.buffer(encoder.encode(chunk));
638
639                if !encoder.is_eof() {
640                    return;
641                }
642
643                if encoder.is_last() {
644                    Writing::Closed
645                } else {
646                    Writing::KeepAlive
647                }
648            }
649            _ => unreachable!("write_body invalid state: {:?}", self.state.writing),
650        };
651
652        self.state.writing = state;
653    }
654
655    pub(crate) fn write_body_and_end(&mut self, chunk: B) {
656        debug_assert!(self.can_write_body() && self.can_buffer_body());
657        // empty chunks should be discarded at Dispatcher level
658        debug_assert!(chunk.remaining() != 0);
659
660        let state = match self.state.writing {
661            Writing::Body(ref encoder) => {
662                let can_keep_alive = encoder.encode_and_end(chunk, self.io.write_buf());
663                if can_keep_alive {
664                    Writing::KeepAlive
665                } else {
666                    Writing::Closed
667                }
668            }
669            _ => unreachable!("write_body invalid state: {:?}", self.state.writing),
670        };
671
672        self.state.writing = state;
673    }
674
675    pub(crate) fn end_body(&mut self) -> crate::Result<()> {
676        debug_assert!(self.can_write_body());
677
678        let encoder = match self.state.writing {
679            Writing::Body(ref mut enc) => enc,
680            _ => return Ok(()),
681        };
682
683        // end of stream, that means we should try to eof
684        match encoder.end() {
685            Ok(end) => {
686                if let Some(end) = end {
687                    self.io.buffer(end);
688                }
689
690                self.state.writing = if encoder.is_last() || encoder.is_close_delimited() {
691                    Writing::Closed
692                } else {
693                    Writing::KeepAlive
694                };
695
696                Ok(())
697            }
698            Err(not_eof) => {
699                self.state.writing = Writing::Closed;
700                Err(crate::Error::new_body_write_aborted().with(not_eof))
701            }
702        }
703    }
704
705    // When we get a parse error, depending on what side we are, we might be able
706    // to write a response before closing the connection.
707    //
708    // - Client: there is nothing we can do
709    // - Server: if Response hasn't been written yet, we can send a 4xx response
710    fn on_parse_error(&mut self, err: crate::Error) -> crate::Result<()> {
711        if let Writing::Init = self.state.writing {
712            if self.has_h2_prefix() {
713                return Err(crate::Error::new_version_h2());
714            }
715            if let Some(msg) = T::on_error(&err) {
716                // Drop the cached headers so as to not trigger a debug
717                // assert in `write_head`...
718                self.state.cached_headers.take();
719                self.write_head(msg, None);
720                self.state.error = Some(err);
721                return Ok(());
722            }
723        }
724
725        // fallback is pass the error back up
726        Err(err)
727    }
728
729    pub(crate) fn poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
730        ready!(Pin::new(&mut self.io).poll_flush(cx))?;
731        self.try_keep_alive(cx);
732        trace!("flushed({}): {:?}", T::LOG, self.state);
733        Poll::Ready(Ok(()))
734    }
735
736    pub(crate) fn poll_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
737        match ready!(Pin::new(self.io.io_mut()).poll_shutdown(cx)) {
738            Ok(()) => {
739                trace!("shut down IO complete");
740                Poll::Ready(Ok(()))
741            }
742            Err(e) => {
743                debug!("error shutting down IO: {}", e);
744                Poll::Ready(Err(e))
745            }
746        }
747    }
748
749    /// If the read side can be cheaply drained, do so. Otherwise, close.
750    pub(super) fn poll_drain_or_close_read(&mut self, cx: &mut task::Context<'_>) {
751        let _ = self.poll_read_body(cx);
752
753        // If still in Reading::Body, just give up
754        match self.state.reading {
755            Reading::Init | Reading::KeepAlive => trace!("body drained"),
756            _ => self.close_read(),
757        }
758    }
759
760    pub(crate) fn close_read(&mut self) {
761        self.state.close_read();
762    }
763
764    pub(crate) fn close_write(&mut self) {
765        self.state.close_write();
766    }
767
768    #[cfg(feature = "server")]
769    pub(crate) fn disable_keep_alive(&mut self) {
770        if self.state.is_idle() {
771            trace!("disable_keep_alive; closing idle connection");
772            self.state.close();
773        } else {
774            trace!("disable_keep_alive; in-progress connection");
775            self.state.disable_keep_alive();
776        }
777    }
778
779    pub(crate) fn take_error(&mut self) -> crate::Result<()> {
780        if let Some(err) = self.state.error.take() {
781            Err(err)
782        } else {
783            Ok(())
784        }
785    }
786
787    pub(super) fn on_upgrade(&mut self) -> crate::upgrade::OnUpgrade {
788        trace!("{}: prepare possible HTTP upgrade", T::LOG);
789        self.state.prepare_upgrade()
790    }
791}
792
793impl<I, B: Buf, T> fmt::Debug for Conn<I, B, T> {
794    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
795        f.debug_struct("Conn")
796            .field("state", &self.state)
797            .field("io", &self.io)
798            .finish()
799    }
800}
801
802// B and T are never pinned
803impl<I: Unpin, B, T> Unpin for Conn<I, B, T> {}
804
805struct State {
806    allow_half_close: bool,
807    /// Re-usable HeaderMap to reduce allocating new ones.
808    cached_headers: Option<HeaderMap>,
809    /// If an error occurs when there wasn't a direct way to return it
810    /// back to the user, this is set.
811    error: Option<crate::Error>,
812    /// Current keep-alive status.
813    keep_alive: KA,
814    /// If mid-message, the HTTP Method that started it.
815    ///
816    /// This is used to know things such as if the message can include
817    /// a body or not.
818    method: Option<Method>,
819    h1_parser_config: ParserConfig,
820    #[cfg(all(feature = "server", feature = "runtime"))]
821    h1_header_read_timeout: Option<Duration>,
822    #[cfg(all(feature = "server", feature = "runtime"))]
823    h1_header_read_timeout_fut: Option<Pin<Box<Sleep>>>,
824    #[cfg(all(feature = "server", feature = "runtime"))]
825    h1_header_read_timeout_running: bool,
826    preserve_header_case: bool,
827    #[cfg(feature = "ffi")]
828    preserve_header_order: bool,
829    title_case_headers: bool,
830    h09_responses: bool,
831    /// If set, called with each 1xx informational response received for
832    /// the current request. MUST be unset after a non-1xx response is
833    /// received.
834    #[cfg(feature = "ffi")]
835    on_informational: Option<crate::ffi::OnInformational>,
836    #[cfg(feature = "ffi")]
837    raw_headers: bool,
838    /// Set to true when the Dispatcher should poll read operations
839    /// again. See the `maybe_notify` method for more.
840    notify_read: bool,
841    /// State of allowed reads
842    reading: Reading,
843    /// State of allowed writes
844    writing: Writing,
845    /// An expected pending HTTP upgrade.
846    upgrade: Option<crate::upgrade::Pending>,
847    /// Either HTTP/1.0 or 1.1 connection
848    version: Version,
849}
850
851#[derive(Debug)]
852enum Reading {
853    Init,
854    Continue(Decoder),
855    Body(Decoder),
856    KeepAlive,
857    Closed,
858}
859
860enum Writing {
861    Init,
862    Body(Encoder),
863    KeepAlive,
864    Closed,
865}
866
867impl fmt::Debug for State {
868    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
869        let mut builder = f.debug_struct("State");
870        builder
871            .field("reading", &self.reading)
872            .field("writing", &self.writing)
873            .field("keep_alive", &self.keep_alive);
874
875        // Only show error field if it's interesting...
876        if let Some(ref error) = self.error {
877            builder.field("error", error);
878        }
879
880        if self.allow_half_close {
881            builder.field("allow_half_close", &true);
882        }
883
884        // Purposefully leaving off other fields..
885
886        builder.finish()
887    }
888}
889
890impl fmt::Debug for Writing {
891    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
892        match *self {
893            Writing::Init => f.write_str("Init"),
894            Writing::Body(ref enc) => f.debug_tuple("Body").field(enc).finish(),
895            Writing::KeepAlive => f.write_str("KeepAlive"),
896            Writing::Closed => f.write_str("Closed"),
897        }
898    }
899}
900
901impl std::ops::BitAndAssign<bool> for KA {
902    fn bitand_assign(&mut self, enabled: bool) {
903        if !enabled {
904            trace!("remote disabling keep-alive");
905            *self = KA::Disabled;
906        }
907    }
908}
909
910#[derive(Clone, Copy, Debug)]
911enum KA {
912    Idle,
913    Busy,
914    Disabled,
915}
916
917impl Default for KA {
918    fn default() -> KA {
919        KA::Busy
920    }
921}
922
923impl KA {
924    fn idle(&mut self) {
925        *self = KA::Idle;
926    }
927
928    fn busy(&mut self) {
929        *self = KA::Busy;
930    }
931
932    fn disable(&mut self) {
933        *self = KA::Disabled;
934    }
935
936    fn status(&self) -> KA {
937        *self
938    }
939}
940
941impl State {
942    fn close(&mut self) {
943        trace!("State::close()");
944        self.reading = Reading::Closed;
945        self.writing = Writing::Closed;
946        self.keep_alive.disable();
947    }
948
949    fn close_read(&mut self) {
950        trace!("State::close_read()");
951        self.reading = Reading::Closed;
952        self.keep_alive.disable();
953    }
954
955    fn close_write(&mut self) {
956        trace!("State::close_write()");
957        self.writing = Writing::Closed;
958        self.keep_alive.disable();
959    }
960
961    fn wants_keep_alive(&self) -> bool {
962        if let KA::Disabled = self.keep_alive.status() {
963            false
964        } else {
965            true
966        }
967    }
968
969    fn try_keep_alive<T: Http1Transaction>(&mut self) {
970        match (&self.reading, &self.writing) {
971            (&Reading::KeepAlive, &Writing::KeepAlive) => {
972                if let KA::Busy = self.keep_alive.status() {
973                    self.idle::<T>();
974                } else {
975                    trace!(
976                        "try_keep_alive({}): could keep-alive, but status = {:?}",
977                        T::LOG,
978                        self.keep_alive
979                    );
980                    self.close();
981                }
982            }
983            (&Reading::Closed, &Writing::KeepAlive) | (&Reading::KeepAlive, &Writing::Closed) => {
984                self.close()
985            }
986            _ => (),
987        }
988    }
989
990    fn disable_keep_alive(&mut self) {
991        self.keep_alive.disable()
992    }
993
994    fn busy(&mut self) {
995        if let KA::Disabled = self.keep_alive.status() {
996            return;
997        }
998        self.keep_alive.busy();
999    }
1000
1001    fn idle<T: Http1Transaction>(&mut self) {
1002        debug_assert!(!self.is_idle(), "State::idle() called while idle");
1003
1004        self.method = None;
1005        self.keep_alive.idle();
1006
1007        if !self.is_idle() {
1008            self.close();
1009            return;
1010        }
1011
1012        self.reading = Reading::Init;
1013        self.writing = Writing::Init;
1014
1015        // !T::should_read_first() means Client.
1016        //
1017        // If Client connection has just gone idle, the Dispatcher
1018        // should try the poll loop one more time, so as to poll the
1019        // pending requests stream.
1020        if !T::should_read_first() {
1021            self.notify_read = true;
1022        }
1023    }
1024
1025    fn is_idle(&self) -> bool {
1026        matches!(self.keep_alive.status(), KA::Idle)
1027    }
1028
1029    fn is_read_closed(&self) -> bool {
1030        matches!(self.reading, Reading::Closed)
1031    }
1032
1033    fn is_write_closed(&self) -> bool {
1034        matches!(self.writing, Writing::Closed)
1035    }
1036
1037    fn prepare_upgrade(&mut self) -> crate::upgrade::OnUpgrade {
1038        let (tx, rx) = crate::upgrade::pending();
1039        self.upgrade = Some(tx);
1040        rx
1041    }
1042}
1043
1044#[cfg(test)]
1045mod tests {
1046    #[cfg(feature = "nightly")]
1047    #[bench]
1048    fn bench_read_head_short(b: &mut ::test::Bencher) {
1049        use super::*;
1050        let s = b"GET / HTTP/1.1\r\nHost: localhost:8080\r\n\r\n";
1051        let len = s.len();
1052        b.bytes = len as u64;
1053
1054        // an empty IO, we'll be skipping and using the read buffer anyways
1055        let io = tokio_test::io::Builder::new().build();
1056        let mut conn = Conn::<_, bytes::Bytes, crate::proto::h1::ServerTransaction>::new(io);
1057        *conn.io.read_buf_mut() = ::bytes::BytesMut::from(&s[..]);
1058        conn.state.cached_headers = Some(HeaderMap::with_capacity(2));
1059
1060        let rt = tokio::runtime::Builder::new_current_thread()
1061            .enable_all()
1062            .build()
1063            .unwrap();
1064
1065        b.iter(|| {
1066            rt.block_on(futures_util::future::poll_fn(|cx| {
1067                match conn.poll_read_head(cx) {
1068                    Poll::Ready(Some(Ok(x))) => {
1069                        ::test::black_box(&x);
1070                        let mut headers = x.0.headers;
1071                        headers.clear();
1072                        conn.state.cached_headers = Some(headers);
1073                    }
1074                    f => panic!("expected Ready(Some(Ok(..))): {:?}", f),
1075                }
1076
1077                conn.io.read_buf_mut().reserve(1);
1078                unsafe {
1079                    conn.io.read_buf_mut().set_len(len);
1080                }
1081                conn.state.reading = Reading::Init;
1082                Poll::Ready(())
1083            }));
1084        });
1085    }
1086
1087    /*
1088    //TODO: rewrite these using dispatch... someday...
1089    use futures::{Async, Future, Stream, Sink};
1090    use futures::future;
1091
1092    use proto::{self, ClientTransaction, MessageHead, ServerTransaction};
1093    use super::super::Encoder;
1094    use mock::AsyncIo;
1095
1096    use super::{Conn, Decoder, Reading, Writing};
1097    use ::uri::Uri;
1098
1099    use std::str::FromStr;
1100
1101    #[test]
1102    fn test_conn_init_read() {
1103        let good_message = b"GET / HTTP/1.1\r\n\r\n".to_vec();
1104        let len = good_message.len();
1105        let io = AsyncIo::new_buf(good_message, len);
1106        let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1107
1108        match conn.poll().unwrap() {
1109            Async::Ready(Some(Frame::Message { message, body: false })) => {
1110                assert_eq!(message, MessageHead {
1111                    subject: ::proto::RequestLine(::Get, Uri::from_str("/").unwrap()),
1112                    .. MessageHead::default()
1113                })
1114            },
1115            f => panic!("frame is not Frame::Message: {:?}", f)
1116        }
1117    }
1118
1119    #[test]
1120    fn test_conn_parse_partial() {
1121        let _: Result<(), ()> = future::lazy(|| {
1122            let good_message = b"GET / HTTP/1.1\r\nHost: foo.bar\r\n\r\n".to_vec();
1123            let io = AsyncIo::new_buf(good_message, 10);
1124            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1125            assert!(conn.poll().unwrap().is_not_ready());
1126            conn.io.io_mut().block_in(50);
1127            let async = conn.poll().unwrap();
1128            assert!(async.is_ready());
1129            match async {
1130                Async::Ready(Some(Frame::Message { .. })) => (),
1131                f => panic!("frame is not Message: {:?}", f),
1132            }
1133            Ok(())
1134        }).wait();
1135    }
1136
1137    #[test]
1138    fn test_conn_init_read_eof_idle() {
1139        let io = AsyncIo::new_buf(vec![], 1);
1140        let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1141        conn.state.idle();
1142
1143        match conn.poll().unwrap() {
1144            Async::Ready(None) => {},
1145            other => panic!("frame is not None: {:?}", other)
1146        }
1147    }
1148
1149    #[test]
1150    fn test_conn_init_read_eof_idle_partial_parse() {
1151        let io = AsyncIo::new_buf(b"GET / HTTP/1.1".to_vec(), 100);
1152        let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1153        conn.state.idle();
1154
1155        match conn.poll() {
1156            Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {},
1157            other => panic!("unexpected frame: {:?}", other)
1158        }
1159    }
1160
1161    #[test]
1162    fn test_conn_init_read_eof_busy() {
1163        let _: Result<(), ()> = future::lazy(|| {
1164            // server ignores
1165            let io = AsyncIo::new_eof();
1166            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1167            conn.state.busy();
1168
1169            match conn.poll().unwrap() {
1170                Async::Ready(None) => {},
1171                other => panic!("unexpected frame: {:?}", other)
1172            }
1173
1174            // client
1175            let io = AsyncIo::new_eof();
1176            let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1177            conn.state.busy();
1178
1179            match conn.poll() {
1180                Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {},
1181                other => panic!("unexpected frame: {:?}", other)
1182            }
1183            Ok(())
1184        }).wait();
1185    }
1186
1187    #[test]
1188    fn test_conn_body_finish_read_eof() {
1189        let _: Result<(), ()> = future::lazy(|| {
1190            let io = AsyncIo::new_eof();
1191            let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1192            conn.state.busy();
1193            conn.state.writing = Writing::KeepAlive;
1194            conn.state.reading = Reading::Body(Decoder::length(0));
1195
1196            match conn.poll() {
1197                Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (),
1198                other => panic!("unexpected frame: {:?}", other)
1199            }
1200
1201            // conn eofs, but tokio-proto will call poll() again, before calling flush()
1202            // the conn eof in this case is perfectly fine
1203
1204            match conn.poll() {
1205                Ok(Async::Ready(None)) => (),
1206                other => panic!("unexpected frame: {:?}", other)
1207            }
1208            Ok(())
1209        }).wait();
1210    }
1211
1212    #[test]
1213    fn test_conn_message_empty_body_read_eof() {
1214        let _: Result<(), ()> = future::lazy(|| {
1215            let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".to_vec(), 1024);
1216            let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1217            conn.state.busy();
1218            conn.state.writing = Writing::KeepAlive;
1219
1220            match conn.poll() {
1221                Ok(Async::Ready(Some(Frame::Message { body: false, .. }))) => (),
1222                other => panic!("unexpected frame: {:?}", other)
1223            }
1224
1225            // conn eofs, but tokio-proto will call poll() again, before calling flush()
1226            // the conn eof in this case is perfectly fine
1227
1228            match conn.poll() {
1229                Ok(Async::Ready(None)) => (),
1230                other => panic!("unexpected frame: {:?}", other)
1231            }
1232            Ok(())
1233        }).wait();
1234    }
1235
1236    #[test]
1237    fn test_conn_read_body_end() {
1238        let _: Result<(), ()> = future::lazy(|| {
1239            let io = AsyncIo::new_buf(b"POST / HTTP/1.1\r\nContent-Length: 5\r\n\r\n12345".to_vec(), 1024);
1240            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1241            conn.state.busy();
1242
1243            match conn.poll() {
1244                Ok(Async::Ready(Some(Frame::Message { body: true, .. }))) => (),
1245                other => panic!("unexpected frame: {:?}", other)
1246            }
1247
1248            match conn.poll() {
1249                Ok(Async::Ready(Some(Frame::Body { chunk: Some(_) }))) => (),
1250                other => panic!("unexpected frame: {:?}", other)
1251            }
1252
1253            // When the body is done, `poll` MUST return a `Body` frame with chunk set to `None`
1254            match conn.poll() {
1255                Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (),
1256                other => panic!("unexpected frame: {:?}", other)
1257            }
1258
1259            match conn.poll() {
1260                Ok(Async::NotReady) => (),
1261                other => panic!("unexpected frame: {:?}", other)
1262            }
1263            Ok(())
1264        }).wait();
1265    }
1266
1267    #[test]
1268    fn test_conn_closed_read() {
1269        let io = AsyncIo::new_buf(vec![], 0);
1270        let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1271        conn.state.close();
1272
1273        match conn.poll().unwrap() {
1274            Async::Ready(None) => {},
1275            other => panic!("frame is not None: {:?}", other)
1276        }
1277    }
1278
1279    #[test]
1280    fn test_conn_body_write_length() {
1281        let _ = pretty_env_logger::try_init();
1282        let _: Result<(), ()> = future::lazy(|| {
1283            let io = AsyncIo::new_buf(vec![], 0);
1284            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1285            let max = super::super::io::DEFAULT_MAX_BUFFER_SIZE + 4096;
1286            conn.state.writing = Writing::Body(Encoder::length((max * 2) as u64));
1287
1288            assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; max].into()) }).unwrap().is_ready());
1289            assert!(!conn.can_buffer_body());
1290
1291            assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'b'; 1024 * 8].into()) }).unwrap().is_not_ready());
1292
1293            conn.io.io_mut().block_in(1024 * 3);
1294            assert!(conn.poll_complete().unwrap().is_not_ready());
1295            conn.io.io_mut().block_in(1024 * 3);
1296            assert!(conn.poll_complete().unwrap().is_not_ready());
1297            conn.io.io_mut().block_in(max * 2);
1298            assert!(conn.poll_complete().unwrap().is_ready());
1299
1300            assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'c'; 1024 * 8].into()) }).unwrap().is_ready());
1301            Ok(())
1302        }).wait();
1303    }
1304
1305    #[test]
1306    fn test_conn_body_write_chunked() {
1307        let _: Result<(), ()> = future::lazy(|| {
1308            let io = AsyncIo::new_buf(vec![], 4096);
1309            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1310            conn.state.writing = Writing::Body(Encoder::chunked());
1311
1312            assert!(conn.start_send(Frame::Body { chunk: Some("headers".into()) }).unwrap().is_ready());
1313            assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'x'; 8192].into()) }).unwrap().is_ready());
1314            Ok(())
1315        }).wait();
1316    }
1317
1318    #[test]
1319    fn test_conn_body_flush() {
1320        let _: Result<(), ()> = future::lazy(|| {
1321            let io = AsyncIo::new_buf(vec![], 1024 * 1024 * 5);
1322            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1323            conn.state.writing = Writing::Body(Encoder::length(1024 * 1024));
1324            assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; 1024 * 1024].into()) }).unwrap().is_ready());
1325            assert!(!conn.can_buffer_body());
1326            conn.io.io_mut().block_in(1024 * 1024 * 5);
1327            assert!(conn.poll_complete().unwrap().is_ready());
1328            assert!(conn.can_buffer_body());
1329            assert!(conn.io.io_mut().flushed());
1330
1331            Ok(())
1332        }).wait();
1333    }
1334
1335    #[test]
1336    fn test_conn_parking() {
1337        use std::sync::Arc;
1338        use futures::executor::Notify;
1339        use futures::executor::NotifyHandle;
1340
1341        struct Car {
1342            permit: bool,
1343        }
1344        impl Notify for Car {
1345            fn notify(&self, _id: usize) {
1346                assert!(self.permit, "unparked without permit");
1347            }
1348        }
1349
1350        fn car(permit: bool) -> NotifyHandle {
1351            Arc::new(Car {
1352                permit: permit,
1353            }).into()
1354        }
1355
1356        // test that once writing is done, unparks
1357        let f = future::lazy(|| {
1358            let io = AsyncIo::new_buf(vec![], 4096);
1359            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1360            conn.state.reading = Reading::KeepAlive;
1361            assert!(conn.poll().unwrap().is_not_ready());
1362
1363            conn.state.writing = Writing::KeepAlive;
1364            assert!(conn.poll_complete().unwrap().is_ready());
1365            Ok::<(), ()>(())
1366        });
1367        ::futures::executor::spawn(f).poll_future_notify(&car(true), 0).unwrap();
1368
1369
1370        // test that flushing when not waiting on read doesn't unpark
1371        let f = future::lazy(|| {
1372            let io = AsyncIo::new_buf(vec![], 4096);
1373            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1374            conn.state.writing = Writing::KeepAlive;
1375            assert!(conn.poll_complete().unwrap().is_ready());
1376            Ok::<(), ()>(())
1377        });
1378        ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap();
1379
1380
1381        // test that flushing and writing isn't done doesn't unpark
1382        let f = future::lazy(|| {
1383            let io = AsyncIo::new_buf(vec![], 4096);
1384            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1385            conn.state.reading = Reading::KeepAlive;
1386            assert!(conn.poll().unwrap().is_not_ready());
1387            conn.state.writing = Writing::Body(Encoder::length(5_000));
1388            assert!(conn.poll_complete().unwrap().is_ready());
1389            Ok::<(), ()>(())
1390        });
1391        ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap();
1392    }
1393
1394    #[test]
1395    fn test_conn_closed_write() {
1396        let io = AsyncIo::new_buf(vec![], 0);
1397        let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1398        conn.state.close();
1399
1400        match conn.start_send(Frame::Body { chunk: Some(b"foobar".to_vec().into()) }) {
1401            Err(_e) => {},
1402            other => panic!("did not return Err: {:?}", other)
1403        }
1404
1405        assert!(conn.state.is_write_closed());
1406    }
1407
1408    #[test]
1409    fn test_conn_write_empty_chunk() {
1410        let io = AsyncIo::new_buf(vec![], 0);
1411        let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1412        conn.state.writing = Writing::KeepAlive;
1413
1414        assert!(conn.start_send(Frame::Body { chunk: None }).unwrap().is_ready());
1415        assert!(conn.start_send(Frame::Body { chunk: Some(Vec::new().into()) }).unwrap().is_ready());
1416        conn.start_send(Frame::Body { chunk: Some(vec![b'a'].into()) }).unwrap_err();
1417    }
1418    */
1419}