hyper/proto/h1/
decode.rs

1use std::error::Error as StdError;
2use std::fmt;
3use std::io;
4use std::usize;
5
6use bytes::Bytes;
7use tracing::{debug, trace};
8
9use crate::common::{task, Poll};
10
11use super::io::MemRead;
12use super::DecodedLength;
13
14use self::Kind::{Chunked, Eof, Length};
15
16/// Decoders to handle different Transfer-Encodings.
17///
18/// If a message body does not include a Transfer-Encoding, it *should*
19/// include a Content-Length header.
20#[derive(Clone, PartialEq)]
21pub(crate) struct Decoder {
22    kind: Kind,
23}
24
25#[derive(Debug, Clone, Copy, PartialEq)]
26enum Kind {
27    /// A Reader used when a Content-Length header is passed with a positive integer.
28    Length(u64),
29    /// A Reader used when Transfer-Encoding is `chunked`.
30    Chunked(ChunkedState, u64),
31    /// A Reader used for responses that don't indicate a length or chunked.
32    ///
33    /// The bool tracks when EOF is seen on the transport.
34    ///
35    /// Note: This should only used for `Response`s. It is illegal for a
36    /// `Request` to be made with both `Content-Length` and
37    /// `Transfer-Encoding: chunked` missing, as explained from the spec:
38    ///
39    /// > If a Transfer-Encoding header field is present in a response and
40    /// > the chunked transfer coding is not the final encoding, the
41    /// > message body length is determined by reading the connection until
42    /// > it is closed by the server.  If a Transfer-Encoding header field
43    /// > is present in a request and the chunked transfer coding is not
44    /// > the final encoding, the message body length cannot be determined
45    /// > reliably; the server MUST respond with the 400 (Bad Request)
46    /// > status code and then close the connection.
47    Eof(bool),
48}
49
50#[derive(Debug, PartialEq, Clone, Copy)]
51enum ChunkedState {
52    Size,
53    SizeLws,
54    Extension,
55    SizeLf,
56    Body,
57    BodyCr,
58    BodyLf,
59    Trailer,
60    TrailerLf,
61    EndCr,
62    EndLf,
63    End,
64}
65
66impl Decoder {
67    // constructors
68
69    pub(crate) fn length(x: u64) -> Decoder {
70        Decoder {
71            kind: Kind::Length(x),
72        }
73    }
74
75    pub(crate) fn chunked() -> Decoder {
76        Decoder {
77            kind: Kind::Chunked(ChunkedState::Size, 0),
78        }
79    }
80
81    pub(crate) fn eof() -> Decoder {
82        Decoder {
83            kind: Kind::Eof(false),
84        }
85    }
86
87    pub(super) fn new(len: DecodedLength) -> Self {
88        match len {
89            DecodedLength::CHUNKED => Decoder::chunked(),
90            DecodedLength::CLOSE_DELIMITED => Decoder::eof(),
91            length => Decoder::length(length.danger_len()),
92        }
93    }
94
95    // methods
96
97    pub(crate) fn is_eof(&self) -> bool {
98        matches!(self.kind, Length(0) | Chunked(ChunkedState::End, _) | Eof(true))
99    }
100
101    pub(crate) fn decode<R: MemRead>(
102        &mut self,
103        cx: &mut task::Context<'_>,
104        body: &mut R,
105    ) -> Poll<Result<Bytes, io::Error>> {
106        trace!("decode; state={:?}", self.kind);
107        match self.kind {
108            Length(ref mut remaining) => {
109                if *remaining == 0 {
110                    Poll::Ready(Ok(Bytes::new()))
111                } else {
112                    let to_read = *remaining as usize;
113                    let buf = ready!(body.read_mem(cx, to_read))?;
114                    let num = buf.as_ref().len() as u64;
115                    if num > *remaining {
116                        *remaining = 0;
117                    } else if num == 0 {
118                        return Poll::Ready(Err(io::Error::new(
119                            io::ErrorKind::UnexpectedEof,
120                            IncompleteBody,
121                        )));
122                    } else {
123                        *remaining -= num;
124                    }
125                    Poll::Ready(Ok(buf))
126                }
127            }
128            Chunked(ref mut state, ref mut size) => {
129                loop {
130                    let mut buf = None;
131                    // advances the chunked state
132                    *state = ready!(state.step(cx, body, size, &mut buf))?;
133                    if *state == ChunkedState::End {
134                        trace!("end of chunked");
135                        return Poll::Ready(Ok(Bytes::new()));
136                    }
137                    if let Some(buf) = buf {
138                        return Poll::Ready(Ok(buf));
139                    }
140                }
141            }
142            Eof(ref mut is_eof) => {
143                if *is_eof {
144                    Poll::Ready(Ok(Bytes::new()))
145                } else {
146                    // 8192 chosen because its about 2 packets, there probably
147                    // won't be that much available, so don't have MemReaders
148                    // allocate buffers to big
149                    body.read_mem(cx, 8192).map_ok(|slice| {
150                        *is_eof = slice.is_empty();
151                        slice
152                    })
153                }
154            }
155        }
156    }
157
158    #[cfg(test)]
159    async fn decode_fut<R: MemRead>(&mut self, body: &mut R) -> Result<Bytes, io::Error> {
160        futures_util::future::poll_fn(move |cx| self.decode(cx, body)).await
161    }
162}
163
164impl fmt::Debug for Decoder {
165    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
166        fmt::Debug::fmt(&self.kind, f)
167    }
168}
169
170macro_rules! byte (
171    ($rdr:ident, $cx:expr) => ({
172        let buf = ready!($rdr.read_mem($cx, 1))?;
173        if !buf.is_empty() {
174            buf[0]
175        } else {
176            return Poll::Ready(Err(io::Error::new(io::ErrorKind::UnexpectedEof,
177                                      "unexpected EOF during chunk size line")));
178        }
179    })
180);
181
182impl ChunkedState {
183    fn step<R: MemRead>(
184        &self,
185        cx: &mut task::Context<'_>,
186        body: &mut R,
187        size: &mut u64,
188        buf: &mut Option<Bytes>,
189    ) -> Poll<Result<ChunkedState, io::Error>> {
190        use self::ChunkedState::*;
191        match *self {
192            Size => ChunkedState::read_size(cx, body, size),
193            SizeLws => ChunkedState::read_size_lws(cx, body),
194            Extension => ChunkedState::read_extension(cx, body),
195            SizeLf => ChunkedState::read_size_lf(cx, body, *size),
196            Body => ChunkedState::read_body(cx, body, size, buf),
197            BodyCr => ChunkedState::read_body_cr(cx, body),
198            BodyLf => ChunkedState::read_body_lf(cx, body),
199            Trailer => ChunkedState::read_trailer(cx, body),
200            TrailerLf => ChunkedState::read_trailer_lf(cx, body),
201            EndCr => ChunkedState::read_end_cr(cx, body),
202            EndLf => ChunkedState::read_end_lf(cx, body),
203            End => Poll::Ready(Ok(ChunkedState::End)),
204        }
205    }
206    fn read_size<R: MemRead>(
207        cx: &mut task::Context<'_>,
208        rdr: &mut R,
209        size: &mut u64,
210    ) -> Poll<Result<ChunkedState, io::Error>> {
211        trace!("Read chunk hex size");
212
213        macro_rules! or_overflow {
214            ($e:expr) => (
215                match $e {
216                    Some(val) => val,
217                    None => return Poll::Ready(Err(io::Error::new(
218                        io::ErrorKind::InvalidData,
219                        "invalid chunk size: overflow",
220                    ))),
221                }
222            )
223        }
224
225        let radix = 16;
226        match byte!(rdr, cx) {
227            b @ b'0'..=b'9' => {
228                *size = or_overflow!(size.checked_mul(radix));
229                *size = or_overflow!(size.checked_add((b - b'0') as u64));
230            }
231            b @ b'a'..=b'f' => {
232                *size = or_overflow!(size.checked_mul(radix));
233                *size = or_overflow!(size.checked_add((b + 10 - b'a') as u64));
234            }
235            b @ b'A'..=b'F' => {
236                *size = or_overflow!(size.checked_mul(radix));
237                *size = or_overflow!(size.checked_add((b + 10 - b'A') as u64));
238            }
239            b'\t' | b' ' => return Poll::Ready(Ok(ChunkedState::SizeLws)),
240            b';' => return Poll::Ready(Ok(ChunkedState::Extension)),
241            b'\r' => return Poll::Ready(Ok(ChunkedState::SizeLf)),
242            _ => {
243                return Poll::Ready(Err(io::Error::new(
244                    io::ErrorKind::InvalidInput,
245                    "Invalid chunk size line: Invalid Size",
246                )));
247            }
248        }
249        Poll::Ready(Ok(ChunkedState::Size))
250    }
251    fn read_size_lws<R: MemRead>(
252        cx: &mut task::Context<'_>,
253        rdr: &mut R,
254    ) -> Poll<Result<ChunkedState, io::Error>> {
255        trace!("read_size_lws");
256        match byte!(rdr, cx) {
257            // LWS can follow the chunk size, but no more digits can come
258            b'\t' | b' ' => Poll::Ready(Ok(ChunkedState::SizeLws)),
259            b';' => Poll::Ready(Ok(ChunkedState::Extension)),
260            b'\r' => Poll::Ready(Ok(ChunkedState::SizeLf)),
261            _ => Poll::Ready(Err(io::Error::new(
262                io::ErrorKind::InvalidInput,
263                "Invalid chunk size linear white space",
264            ))),
265        }
266    }
267    fn read_extension<R: MemRead>(
268        cx: &mut task::Context<'_>,
269        rdr: &mut R,
270    ) -> Poll<Result<ChunkedState, io::Error>> {
271        trace!("read_extension");
272        // We don't care about extensions really at all. Just ignore them.
273        // They "end" at the next CRLF.
274        //
275        // However, some implementations may not check for the CR, so to save
276        // them from themselves, we reject extensions containing plain LF as
277        // well.
278        match byte!(rdr, cx) {
279            b'\r' => Poll::Ready(Ok(ChunkedState::SizeLf)),
280            b'\n' => Poll::Ready(Err(io::Error::new(
281                io::ErrorKind::InvalidData,
282                "invalid chunk extension contains newline",
283            ))),
284            _ => Poll::Ready(Ok(ChunkedState::Extension)), // no supported extensions
285        }
286    }
287    fn read_size_lf<R: MemRead>(
288        cx: &mut task::Context<'_>,
289        rdr: &mut R,
290        size: u64,
291    ) -> Poll<Result<ChunkedState, io::Error>> {
292        trace!("Chunk size is {:?}", size);
293        match byte!(rdr, cx) {
294            b'\n' => {
295                if size == 0 {
296                    Poll::Ready(Ok(ChunkedState::EndCr))
297                } else {
298                    debug!("incoming chunked header: {0:#X} ({0} bytes)", size);
299                    Poll::Ready(Ok(ChunkedState::Body))
300                }
301            }
302            _ => Poll::Ready(Err(io::Error::new(
303                io::ErrorKind::InvalidInput,
304                "Invalid chunk size LF",
305            ))),
306        }
307    }
308
309    fn read_body<R: MemRead>(
310        cx: &mut task::Context<'_>,
311        rdr: &mut R,
312        rem: &mut u64,
313        buf: &mut Option<Bytes>,
314    ) -> Poll<Result<ChunkedState, io::Error>> {
315        trace!("Chunked read, remaining={:?}", rem);
316
317        // cap remaining bytes at the max capacity of usize
318        let rem_cap = match *rem {
319            r if r > usize::MAX as u64 => usize::MAX,
320            r => r as usize,
321        };
322
323        let to_read = rem_cap;
324        let slice = ready!(rdr.read_mem(cx, to_read))?;
325        let count = slice.len();
326
327        if count == 0 {
328            *rem = 0;
329            return Poll::Ready(Err(io::Error::new(
330                io::ErrorKind::UnexpectedEof,
331                IncompleteBody,
332            )));
333        }
334        *buf = Some(slice);
335        *rem -= count as u64;
336
337        if *rem > 0 {
338            Poll::Ready(Ok(ChunkedState::Body))
339        } else {
340            Poll::Ready(Ok(ChunkedState::BodyCr))
341        }
342    }
343    fn read_body_cr<R: MemRead>(
344        cx: &mut task::Context<'_>,
345        rdr: &mut R,
346    ) -> Poll<Result<ChunkedState, io::Error>> {
347        match byte!(rdr, cx) {
348            b'\r' => Poll::Ready(Ok(ChunkedState::BodyLf)),
349            _ => Poll::Ready(Err(io::Error::new(
350                io::ErrorKind::InvalidInput,
351                "Invalid chunk body CR",
352            ))),
353        }
354    }
355    fn read_body_lf<R: MemRead>(
356        cx: &mut task::Context<'_>,
357        rdr: &mut R,
358    ) -> Poll<Result<ChunkedState, io::Error>> {
359        match byte!(rdr, cx) {
360            b'\n' => Poll::Ready(Ok(ChunkedState::Size)),
361            _ => Poll::Ready(Err(io::Error::new(
362                io::ErrorKind::InvalidInput,
363                "Invalid chunk body LF",
364            ))),
365        }
366    }
367
368    fn read_trailer<R: MemRead>(
369        cx: &mut task::Context<'_>,
370        rdr: &mut R,
371    ) -> Poll<Result<ChunkedState, io::Error>> {
372        trace!("read_trailer");
373        match byte!(rdr, cx) {
374            b'\r' => Poll::Ready(Ok(ChunkedState::TrailerLf)),
375            _ => Poll::Ready(Ok(ChunkedState::Trailer)),
376        }
377    }
378    fn read_trailer_lf<R: MemRead>(
379        cx: &mut task::Context<'_>,
380        rdr: &mut R,
381    ) -> Poll<Result<ChunkedState, io::Error>> {
382        match byte!(rdr, cx) {
383            b'\n' => Poll::Ready(Ok(ChunkedState::EndCr)),
384            _ => Poll::Ready(Err(io::Error::new(
385                io::ErrorKind::InvalidInput,
386                "Invalid trailer end LF",
387            ))),
388        }
389    }
390
391    fn read_end_cr<R: MemRead>(
392        cx: &mut task::Context<'_>,
393        rdr: &mut R,
394    ) -> Poll<Result<ChunkedState, io::Error>> {
395        match byte!(rdr, cx) {
396            b'\r' => Poll::Ready(Ok(ChunkedState::EndLf)),
397            _ => Poll::Ready(Ok(ChunkedState::Trailer)),
398        }
399    }
400    fn read_end_lf<R: MemRead>(
401        cx: &mut task::Context<'_>,
402        rdr: &mut R,
403    ) -> Poll<Result<ChunkedState, io::Error>> {
404        match byte!(rdr, cx) {
405            b'\n' => Poll::Ready(Ok(ChunkedState::End)),
406            _ => Poll::Ready(Err(io::Error::new(
407                io::ErrorKind::InvalidInput,
408                "Invalid chunk end LF",
409            ))),
410        }
411    }
412}
413
414#[derive(Debug)]
415struct IncompleteBody;
416
417impl fmt::Display for IncompleteBody {
418    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
419        write!(f, "end of file before message length reached")
420    }
421}
422
423impl StdError for IncompleteBody {}
424
425#[cfg(test)]
426mod tests {
427    use super::*;
428    use std::pin::Pin;
429    use std::time::Duration;
430    use tokio::io::{AsyncRead, ReadBuf};
431
432    impl<'a> MemRead for &'a [u8] {
433        fn read_mem(&mut self, _: &mut task::Context<'_>, len: usize) -> Poll<io::Result<Bytes>> {
434            let n = std::cmp::min(len, self.len());
435            if n > 0 {
436                let (a, b) = self.split_at(n);
437                let buf = Bytes::copy_from_slice(a);
438                *self = b;
439                Poll::Ready(Ok(buf))
440            } else {
441                Poll::Ready(Ok(Bytes::new()))
442            }
443        }
444    }
445
446    impl<'a> MemRead for &'a mut (dyn AsyncRead + Unpin) {
447        fn read_mem(&mut self, cx: &mut task::Context<'_>, len: usize) -> Poll<io::Result<Bytes>> {
448            let mut v = vec![0; len];
449            let mut buf = ReadBuf::new(&mut v);
450            ready!(Pin::new(self).poll_read(cx, &mut buf)?);
451            Poll::Ready(Ok(Bytes::copy_from_slice(&buf.filled())))
452        }
453    }
454
455    #[cfg(feature = "nightly")]
456    impl MemRead for Bytes {
457        fn read_mem(&mut self, _: &mut task::Context<'_>, len: usize) -> Poll<io::Result<Bytes>> {
458            let n = std::cmp::min(len, self.len());
459            let ret = self.split_to(n);
460            Poll::Ready(Ok(ret))
461        }
462    }
463
464    /*
465    use std::io;
466    use std::io::Write;
467    use super::Decoder;
468    use super::ChunkedState;
469    use futures::{Async, Poll};
470    use bytes::{BytesMut, Bytes};
471    use crate::mock::AsyncIo;
472    */
473
474    #[tokio::test]
475    async fn test_read_chunk_size() {
476        use std::io::ErrorKind::{InvalidData, InvalidInput, UnexpectedEof};
477
478        async fn read(s: &str) -> u64 {
479            let mut state = ChunkedState::Size;
480            let rdr = &mut s.as_bytes();
481            let mut size = 0;
482            loop {
483                let result =
484                    futures_util::future::poll_fn(|cx| state.step(cx, rdr, &mut size, &mut None))
485                        .await;
486                let desc = format!("read_size failed for {:?}", s);
487                state = result.expect(desc.as_str());
488                if state == ChunkedState::Body || state == ChunkedState::EndCr {
489                    break;
490                }
491            }
492            size
493        }
494
495        async fn read_err(s: &str, expected_err: io::ErrorKind) {
496            let mut state = ChunkedState::Size;
497            let rdr = &mut s.as_bytes();
498            let mut size = 0;
499            loop {
500                let result =
501                    futures_util::future::poll_fn(|cx| state.step(cx, rdr, &mut size, &mut None))
502                        .await;
503                state = match result {
504                    Ok(s) => s,
505                    Err(e) => {
506                        assert!(
507                            expected_err == e.kind(),
508                            "Reading {:?}, expected {:?}, but got {:?}",
509                            s,
510                            expected_err,
511                            e.kind()
512                        );
513                        return;
514                    }
515                };
516                if state == ChunkedState::Body || state == ChunkedState::End {
517                    panic!("Was Ok. Expected Err for {:?}", s);
518                }
519            }
520        }
521
522        assert_eq!(1, read("1\r\n").await);
523        assert_eq!(1, read("01\r\n").await);
524        assert_eq!(0, read("0\r\n").await);
525        assert_eq!(0, read("00\r\n").await);
526        assert_eq!(10, read("A\r\n").await);
527        assert_eq!(10, read("a\r\n").await);
528        assert_eq!(255, read("Ff\r\n").await);
529        assert_eq!(255, read("Ff   \r\n").await);
530        // Missing LF or CRLF
531        read_err("F\rF", InvalidInput).await;
532        read_err("F", UnexpectedEof).await;
533        // Invalid hex digit
534        read_err("X\r\n", InvalidInput).await;
535        read_err("1X\r\n", InvalidInput).await;
536        read_err("-\r\n", InvalidInput).await;
537        read_err("-1\r\n", InvalidInput).await;
538        // Acceptable (if not fully valid) extensions do not influence the size
539        assert_eq!(1, read("1;extension\r\n").await);
540        assert_eq!(10, read("a;ext name=value\r\n").await);
541        assert_eq!(1, read("1;extension;extension2\r\n").await);
542        assert_eq!(1, read("1;;;  ;\r\n").await);
543        assert_eq!(2, read("2; extension...\r\n").await);
544        assert_eq!(3, read("3   ; extension=123\r\n").await);
545        assert_eq!(3, read("3   ;\r\n").await);
546        assert_eq!(3, read("3   ;   \r\n").await);
547        // Invalid extensions cause an error
548        read_err("1 invalid extension\r\n", InvalidInput).await;
549        read_err("1 A\r\n", InvalidInput).await;
550        read_err("1;no CRLF", UnexpectedEof).await;
551        read_err("1;reject\nnewlines\r\n", InvalidData).await;
552        // Overflow
553        read_err("f0000000000000003\r\n", InvalidData).await;
554    }
555
556    #[tokio::test]
557    async fn test_read_sized_early_eof() {
558        let mut bytes = &b"foo bar"[..];
559        let mut decoder = Decoder::length(10);
560        assert_eq!(decoder.decode_fut(&mut bytes).await.unwrap().len(), 7);
561        let e = decoder.decode_fut(&mut bytes).await.unwrap_err();
562        assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof);
563    }
564
565    #[tokio::test]
566    async fn test_read_chunked_early_eof() {
567        let mut bytes = &b"\
568            9\r\n\
569            foo bar\
570        "[..];
571        let mut decoder = Decoder::chunked();
572        assert_eq!(decoder.decode_fut(&mut bytes).await.unwrap().len(), 7);
573        let e = decoder.decode_fut(&mut bytes).await.unwrap_err();
574        assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof);
575    }
576
577    #[tokio::test]
578    async fn test_read_chunked_single_read() {
579        let mut mock_buf = &b"10\r\n1234567890abcdef\r\n0\r\n"[..];
580        let buf = Decoder::chunked()
581            .decode_fut(&mut mock_buf)
582            .await
583            .expect("decode");
584        assert_eq!(16, buf.len());
585        let result = String::from_utf8(buf.as_ref().to_vec()).expect("decode String");
586        assert_eq!("1234567890abcdef", &result);
587    }
588
589    #[tokio::test]
590    async fn test_read_chunked_trailer_with_missing_lf() {
591        let mut mock_buf = &b"10\r\n1234567890abcdef\r\n0\r\nbad\r\r\n"[..];
592        let mut decoder = Decoder::chunked();
593        decoder.decode_fut(&mut mock_buf).await.expect("decode");
594        let e = decoder.decode_fut(&mut mock_buf).await.unwrap_err();
595        assert_eq!(e.kind(), io::ErrorKind::InvalidInput);
596    }
597
598    #[tokio::test]
599    async fn test_read_chunked_after_eof() {
600        let mut mock_buf = &b"10\r\n1234567890abcdef\r\n0\r\n\r\n"[..];
601        let mut decoder = Decoder::chunked();
602
603        // normal read
604        let buf = decoder.decode_fut(&mut mock_buf).await.unwrap();
605        assert_eq!(16, buf.len());
606        let result = String::from_utf8(buf.as_ref().to_vec()).expect("decode String");
607        assert_eq!("1234567890abcdef", &result);
608
609        // eof read
610        let buf = decoder.decode_fut(&mut mock_buf).await.expect("decode");
611        assert_eq!(0, buf.len());
612
613        // ensure read after eof also returns eof
614        let buf = decoder.decode_fut(&mut mock_buf).await.expect("decode");
615        assert_eq!(0, buf.len());
616    }
617
618    // perform an async read using a custom buffer size and causing a blocking
619    // read at the specified byte
620    async fn read_async(mut decoder: Decoder, content: &[u8], block_at: usize) -> String {
621        let mut outs = Vec::new();
622
623        let mut ins = if block_at == 0 {
624            tokio_test::io::Builder::new()
625                .wait(Duration::from_millis(10))
626                .read(content)
627                .build()
628        } else {
629            tokio_test::io::Builder::new()
630                .read(&content[..block_at])
631                .wait(Duration::from_millis(10))
632                .read(&content[block_at..])
633                .build()
634        };
635
636        let mut ins = &mut ins as &mut (dyn AsyncRead + Unpin);
637
638        loop {
639            let buf = decoder
640                .decode_fut(&mut ins)
641                .await
642                .expect("unexpected decode error");
643            if buf.is_empty() {
644                break; // eof
645            }
646            outs.extend(buf.as_ref());
647        }
648
649        String::from_utf8(outs).expect("decode String")
650    }
651
652    // iterate over the different ways that this async read could go.
653    // tests blocking a read at each byte along the content - The shotgun approach
654    async fn all_async_cases(content: &str, expected: &str, decoder: Decoder) {
655        let content_len = content.len();
656        for block_at in 0..content_len {
657            let actual = read_async(decoder.clone(), content.as_bytes(), block_at).await;
658            assert_eq!(expected, &actual) //, "Failed async. Blocking at {}", block_at);
659        }
660    }
661
662    #[tokio::test]
663    async fn test_read_length_async() {
664        let content = "foobar";
665        all_async_cases(content, content, Decoder::length(content.len() as u64)).await;
666    }
667
668    #[tokio::test]
669    async fn test_read_chunked_async() {
670        let content = "3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n";
671        let expected = "foobar";
672        all_async_cases(content, expected, Decoder::chunked()).await;
673    }
674
675    #[tokio::test]
676    async fn test_read_eof_async() {
677        let content = "foobar";
678        all_async_cases(content, content, Decoder::eof()).await;
679    }
680
681    #[cfg(feature = "nightly")]
682    #[bench]
683    fn bench_decode_chunked_1kb(b: &mut test::Bencher) {
684        let rt = new_runtime();
685
686        const LEN: usize = 1024;
687        let mut vec = Vec::new();
688        vec.extend(format!("{:x}\r\n", LEN).as_bytes());
689        vec.extend(&[0; LEN][..]);
690        vec.extend(b"\r\n");
691        let content = Bytes::from(vec);
692
693        b.bytes = LEN as u64;
694
695        b.iter(|| {
696            let mut decoder = Decoder::chunked();
697            rt.block_on(async {
698                let mut raw = content.clone();
699                let chunk = decoder.decode_fut(&mut raw).await.unwrap();
700                assert_eq!(chunk.len(), LEN);
701            });
702        });
703    }
704
705    #[cfg(feature = "nightly")]
706    #[bench]
707    fn bench_decode_length_1kb(b: &mut test::Bencher) {
708        let rt = new_runtime();
709
710        const LEN: usize = 1024;
711        let content = Bytes::from(&[0; LEN][..]);
712        b.bytes = LEN as u64;
713
714        b.iter(|| {
715            let mut decoder = Decoder::length(LEN as u64);
716            rt.block_on(async {
717                let mut raw = content.clone();
718                let chunk = decoder.decode_fut(&mut raw).await.unwrap();
719                assert_eq!(chunk.len(), LEN);
720            });
721        });
722    }
723
724    #[cfg(feature = "nightly")]
725    fn new_runtime() -> tokio::runtime::Runtime {
726        tokio::runtime::Builder::new_current_thread()
727            .enable_all()
728            .build()
729            .expect("rt build")
730    }
731}