hyper/body/
body.rs

1use std::borrow::Cow;
2#[cfg(feature = "stream")]
3use std::error::Error as StdError;
4use std::fmt;
5
6use bytes::Bytes;
7use futures_channel::mpsc;
8use futures_channel::oneshot;
9use futures_core::Stream; // for mpsc::Receiver
10#[cfg(feature = "stream")]
11use futures_util::TryStreamExt;
12use http::HeaderMap;
13use http_body::{Body as HttpBody, SizeHint};
14
15use super::DecodedLength;
16#[cfg(feature = "stream")]
17use crate::common::sync_wrapper::SyncWrapper;
18use crate::common::Future;
19#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
20use crate::common::Never;
21use crate::common::{task, watch, Pin, Poll};
22#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
23use crate::proto::h2::ping;
24
25type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>;
26type TrailersSender = oneshot::Sender<HeaderMap>;
27
28/// A stream of `Bytes`, used when receiving bodies.
29///
30/// A good default [`HttpBody`](crate::body::HttpBody) to use in many
31/// applications.
32///
33/// Note: To read the full body, use [`body::to_bytes`](crate::body::to_bytes)
34/// or [`body::aggregate`](crate::body::aggregate).
35#[must_use = "streams do nothing unless polled"]
36pub struct Body {
37    kind: Kind,
38    /// Keep the extra bits in an `Option<Box<Extra>>`, so that
39    /// Body stays small in the common case (no extras needed).
40    extra: Option<Box<Extra>>,
41}
42
43enum Kind {
44    Once(Option<Bytes>),
45    Chan {
46        content_length: DecodedLength,
47        want_tx: watch::Sender,
48        data_rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
49        trailers_rx: oneshot::Receiver<HeaderMap>,
50    },
51    #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
52    H2 {
53        ping: ping::Recorder,
54        content_length: DecodedLength,
55        recv: h2::RecvStream,
56    },
57    #[cfg(feature = "ffi")]
58    Ffi(crate::ffi::UserBody),
59    #[cfg(feature = "stream")]
60    Wrapped(
61        SyncWrapper<
62            Pin<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>>,
63        >,
64    ),
65}
66
67struct Extra {
68    /// Allow the client to pass a future to delay the `Body` from returning
69    /// EOF. This allows the `Client` to try to put the idle connection
70    /// back into the pool before the body is "finished".
71    ///
72    /// The reason for this is so that creating a new request after finishing
73    /// streaming the body of a response could sometimes result in creating
74    /// a brand new connection, since the pool didn't know about the idle
75    /// connection yet.
76    delayed_eof: Option<DelayEof>,
77}
78
79#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
80type DelayEofUntil = oneshot::Receiver<Never>;
81
82enum DelayEof {
83    /// Initial state, stream hasn't seen EOF yet.
84    #[cfg(any(feature = "http1", feature = "http2"))]
85    #[cfg(feature = "client")]
86    NotEof(DelayEofUntil),
87    /// Transitions to this state once we've seen `poll` try to
88    /// return EOF (`None`). This future is then polled, and
89    /// when it completes, the Body finally returns EOF (`None`).
90    #[cfg(any(feature = "http1", feature = "http2"))]
91    #[cfg(feature = "client")]
92    Eof(DelayEofUntil),
93}
94
95/// A sender half created through [`Body::channel()`].
96///
97/// Useful when wanting to stream chunks from another thread.
98///
99/// ## Body Closing
100///
101/// Note that the request body will always be closed normally when the sender is dropped (meaning
102/// that the empty terminating chunk will be sent to the remote). If you desire to close the
103/// connection with an incomplete response (e.g. in the case of an error during asynchronous
104/// processing), call the [`Sender::abort()`] method to abort the body in an abnormal fashion.
105///
106/// [`Body::channel()`]: struct.Body.html#method.channel
107/// [`Sender::abort()`]: struct.Sender.html#method.abort
108#[must_use = "Sender does nothing unless sent on"]
109pub struct Sender {
110    want_rx: watch::Receiver,
111    data_tx: BodySender,
112    trailers_tx: Option<TrailersSender>,
113}
114
115const WANT_PENDING: usize = 1;
116const WANT_READY: usize = 2;
117
118impl Body {
119    /// Create an empty `Body` stream.
120    ///
121    /// # Example
122    ///
123    /// ```
124    /// use hyper::{Body, Request};
125    ///
126    /// // create a `GET /` request
127    /// let get = Request::new(Body::empty());
128    /// ```
129    #[inline]
130    pub fn empty() -> Body {
131        Body::new(Kind::Once(None))
132    }
133
134    /// Create a `Body` stream with an associated sender half.
135    ///
136    /// Useful when wanting to stream chunks from another thread.
137    #[inline]
138    pub fn channel() -> (Sender, Body) {
139        Self::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false)
140    }
141
142    pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Body) {
143        let (data_tx, data_rx) = mpsc::channel(0);
144        let (trailers_tx, trailers_rx) = oneshot::channel();
145
146        // If wanter is true, `Sender::poll_ready()` won't becoming ready
147        // until the `Body` has been polled for data once.
148        let want = if wanter { WANT_PENDING } else { WANT_READY };
149
150        let (want_tx, want_rx) = watch::channel(want);
151
152        let tx = Sender {
153            want_rx,
154            data_tx,
155            trailers_tx: Some(trailers_tx),
156        };
157        let rx = Body::new(Kind::Chan {
158            content_length,
159            want_tx,
160            data_rx,
161            trailers_rx,
162        });
163
164        (tx, rx)
165    }
166
167    /// Wrap a futures `Stream` in a box inside `Body`.
168    ///
169    /// # Example
170    ///
171    /// ```
172    /// # use hyper::Body;
173    /// let chunks: Vec<Result<_, std::io::Error>> = vec![
174    ///     Ok("hello"),
175    ///     Ok(" "),
176    ///     Ok("world"),
177    /// ];
178    ///
179    /// let stream = futures_util::stream::iter(chunks);
180    ///
181    /// let body = Body::wrap_stream(stream);
182    /// ```
183    ///
184    /// # Optional
185    ///
186    /// This function requires enabling the `stream` feature in your
187    /// `Cargo.toml`.
188    #[cfg(feature = "stream")]
189    #[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
190    pub fn wrap_stream<S, O, E>(stream: S) -> Body
191    where
192        S: Stream<Item = Result<O, E>> + Send + 'static,
193        O: Into<Bytes> + 'static,
194        E: Into<Box<dyn StdError + Send + Sync>> + 'static,
195    {
196        let mapped = stream.map_ok(Into::into).map_err(Into::into);
197        Body::new(Kind::Wrapped(SyncWrapper::new(Box::pin(mapped))))
198    }
199
200    fn new(kind: Kind) -> Body {
201        Body { kind, extra: None }
202    }
203
204    #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
205    pub(crate) fn h2(
206        recv: h2::RecvStream,
207        mut content_length: DecodedLength,
208        ping: ping::Recorder,
209    ) -> Self {
210        // If the stream is already EOS, then the "unknown length" is clearly
211        // actually ZERO.
212        if !content_length.is_exact() && recv.is_end_stream() {
213            content_length = DecodedLength::ZERO;
214        }
215        let body = Body::new(Kind::H2 {
216            ping,
217            content_length,
218            recv,
219        });
220
221        body
222    }
223
224    #[cfg(any(feature = "http1", feature = "http2"))]
225    #[cfg(feature = "client")]
226    pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) {
227        self.extra_mut().delayed_eof = Some(DelayEof::NotEof(fut));
228    }
229
230    fn take_delayed_eof(&mut self) -> Option<DelayEof> {
231        self.extra
232            .as_mut()
233            .and_then(|extra| extra.delayed_eof.take())
234    }
235
236    #[cfg(any(feature = "http1", feature = "http2"))]
237    fn extra_mut(&mut self) -> &mut Extra {
238        self.extra
239            .get_or_insert_with(|| Box::new(Extra { delayed_eof: None }))
240    }
241
242    fn poll_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {
243        match self.take_delayed_eof() {
244            #[cfg(any(feature = "http1", feature = "http2"))]
245            #[cfg(feature = "client")]
246            Some(DelayEof::NotEof(mut delay)) => match self.poll_inner(cx) {
247                ok @ Poll::Ready(Some(Ok(..))) | ok @ Poll::Pending => {
248                    self.extra_mut().delayed_eof = Some(DelayEof::NotEof(delay));
249                    ok
250                }
251                Poll::Ready(None) => match Pin::new(&mut delay).poll(cx) {
252                    Poll::Ready(Ok(never)) => match never {},
253                    Poll::Pending => {
254                        self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay));
255                        Poll::Pending
256                    }
257                    Poll::Ready(Err(_done)) => Poll::Ready(None),
258                },
259                Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
260            },
261            #[cfg(any(feature = "http1", feature = "http2"))]
262            #[cfg(feature = "client")]
263            Some(DelayEof::Eof(mut delay)) => match Pin::new(&mut delay).poll(cx) {
264                Poll::Ready(Ok(never)) => match never {},
265                Poll::Pending => {
266                    self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay));
267                    Poll::Pending
268                }
269                Poll::Ready(Err(_done)) => Poll::Ready(None),
270            },
271            #[cfg(any(
272                not(any(feature = "http1", feature = "http2")),
273                not(feature = "client")
274            ))]
275            Some(delay_eof) => match delay_eof {},
276            None => self.poll_inner(cx),
277        }
278    }
279
280    #[cfg(feature = "ffi")]
281    pub(crate) fn as_ffi_mut(&mut self) -> &mut crate::ffi::UserBody {
282        match self.kind {
283            Kind::Ffi(ref mut body) => return body,
284            _ => {
285                self.kind = Kind::Ffi(crate::ffi::UserBody::new());
286            }
287        }
288
289        match self.kind {
290            Kind::Ffi(ref mut body) => body,
291            _ => unreachable!(),
292        }
293    }
294
295    fn poll_inner(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {
296        match self.kind {
297            Kind::Once(ref mut val) => Poll::Ready(val.take().map(Ok)),
298            Kind::Chan {
299                content_length: ref mut len,
300                ref mut data_rx,
301                ref mut want_tx,
302                ..
303            } => {
304                want_tx.send(WANT_READY);
305
306                match ready!(Pin::new(data_rx).poll_next(cx)?) {
307                    Some(chunk) => {
308                        len.sub_if(chunk.len() as u64);
309                        Poll::Ready(Some(Ok(chunk)))
310                    }
311                    None => Poll::Ready(None),
312                }
313            }
314            #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
315            Kind::H2 {
316                ref ping,
317                recv: ref mut h2,
318                content_length: ref mut len,
319            } => match ready!(h2.poll_data(cx)) {
320                Some(Ok(bytes)) => {
321                    let _ = h2.flow_control().release_capacity(bytes.len());
322                    len.sub_if(bytes.len() as u64);
323                    ping.record_data(bytes.len());
324                    Poll::Ready(Some(Ok(bytes)))
325                }
326                Some(Err(e)) => Poll::Ready(Some(Err(crate::Error::new_body(e)))),
327                None => Poll::Ready(None),
328            },
329
330            #[cfg(feature = "ffi")]
331            Kind::Ffi(ref mut body) => body.poll_data(cx),
332
333            #[cfg(feature = "stream")]
334            Kind::Wrapped(ref mut s) => match ready!(s.get_mut().as_mut().poll_next(cx)) {
335                Some(res) => Poll::Ready(Some(res.map_err(crate::Error::new_body))),
336                None => Poll::Ready(None),
337            },
338        }
339    }
340
341    #[cfg(feature = "http1")]
342    pub(super) fn take_full_data(&mut self) -> Option<Bytes> {
343        if let Kind::Once(ref mut chunk) = self.kind {
344            chunk.take()
345        } else {
346            None
347        }
348    }
349}
350
351impl Default for Body {
352    /// Returns `Body::empty()`.
353    #[inline]
354    fn default() -> Body {
355        Body::empty()
356    }
357}
358
359impl HttpBody for Body {
360    type Data = Bytes;
361    type Error = crate::Error;
362
363    fn poll_data(
364        mut self: Pin<&mut Self>,
365        cx: &mut task::Context<'_>,
366    ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
367        self.poll_eof(cx)
368    }
369
370    fn poll_trailers(
371        #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut self: Pin<&mut Self>,
372        #[cfg_attr(not(feature = "http2"), allow(unused))] cx: &mut task::Context<'_>,
373    ) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
374        match self.kind {
375            #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
376            Kind::H2 {
377                recv: ref mut h2,
378                ref ping,
379                ..
380            } => match ready!(h2.poll_trailers(cx)) {
381                Ok(t) => {
382                    ping.record_non_data();
383                    Poll::Ready(Ok(t))
384                }
385                Err(e) => Poll::Ready(Err(crate::Error::new_h2(e))),
386            },
387            Kind::Chan {
388                ref mut trailers_rx,
389                ..
390            } => match ready!(Pin::new(trailers_rx).poll(cx)) {
391                Ok(t) => Poll::Ready(Ok(Some(t))),
392                Err(_) => Poll::Ready(Ok(None)),
393            },
394            #[cfg(feature = "ffi")]
395            Kind::Ffi(ref mut body) => body.poll_trailers(cx),
396            _ => Poll::Ready(Ok(None)),
397        }
398    }
399
400    fn is_end_stream(&self) -> bool {
401        match self.kind {
402            Kind::Once(ref val) => val.is_none(),
403            Kind::Chan { content_length, .. } => content_length == DecodedLength::ZERO,
404            #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
405            Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(),
406            #[cfg(feature = "ffi")]
407            Kind::Ffi(..) => false,
408            #[cfg(feature = "stream")]
409            Kind::Wrapped(..) => false,
410        }
411    }
412
413    fn size_hint(&self) -> SizeHint {
414        macro_rules! opt_len {
415            ($content_length:expr) => {{
416                let mut hint = SizeHint::default();
417
418                if let Some(content_length) = $content_length.into_opt() {
419                    hint.set_exact(content_length);
420                }
421
422                hint
423            }};
424        }
425
426        match self.kind {
427            Kind::Once(Some(ref val)) => SizeHint::with_exact(val.len() as u64),
428            Kind::Once(None) => SizeHint::with_exact(0),
429            #[cfg(feature = "stream")]
430            Kind::Wrapped(..) => SizeHint::default(),
431            Kind::Chan { content_length, .. } => opt_len!(content_length),
432            #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
433            Kind::H2 { content_length, .. } => opt_len!(content_length),
434            #[cfg(feature = "ffi")]
435            Kind::Ffi(..) => SizeHint::default(),
436        }
437    }
438}
439
440impl fmt::Debug for Body {
441    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
442        #[derive(Debug)]
443        struct Streaming;
444        #[derive(Debug)]
445        struct Empty;
446        #[derive(Debug)]
447        struct Full<'a>(&'a Bytes);
448
449        let mut builder = f.debug_tuple("Body");
450        match self.kind {
451            Kind::Once(None) => builder.field(&Empty),
452            Kind::Once(Some(ref chunk)) => builder.field(&Full(chunk)),
453            _ => builder.field(&Streaming),
454        };
455
456        builder.finish()
457    }
458}
459
460/// # Optional
461///
462/// This function requires enabling the `stream` feature in your
463/// `Cargo.toml`.
464#[cfg(feature = "stream")]
465impl Stream for Body {
466    type Item = crate::Result<Bytes>;
467
468    fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
469        HttpBody::poll_data(self, cx)
470    }
471}
472
473/// # Optional
474///
475/// This function requires enabling the `stream` feature in your
476/// `Cargo.toml`.
477#[cfg(feature = "stream")]
478impl From<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>> for Body {
479    #[inline]
480    fn from(
481        stream: Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>,
482    ) -> Body {
483        Body::new(Kind::Wrapped(SyncWrapper::new(stream.into())))
484    }
485}
486
487impl From<Bytes> for Body {
488    #[inline]
489    fn from(chunk: Bytes) -> Body {
490        if chunk.is_empty() {
491            Body::empty()
492        } else {
493            Body::new(Kind::Once(Some(chunk)))
494        }
495    }
496}
497
498impl From<Vec<u8>> for Body {
499    #[inline]
500    fn from(vec: Vec<u8>) -> Body {
501        Body::from(Bytes::from(vec))
502    }
503}
504
505impl From<&'static [u8]> for Body {
506    #[inline]
507    fn from(slice: &'static [u8]) -> Body {
508        Body::from(Bytes::from(slice))
509    }
510}
511
512impl From<Cow<'static, [u8]>> for Body {
513    #[inline]
514    fn from(cow: Cow<'static, [u8]>) -> Body {
515        match cow {
516            Cow::Borrowed(b) => Body::from(b),
517            Cow::Owned(o) => Body::from(o),
518        }
519    }
520}
521
522impl From<String> for Body {
523    #[inline]
524    fn from(s: String) -> Body {
525        Body::from(Bytes::from(s.into_bytes()))
526    }
527}
528
529impl From<&'static str> for Body {
530    #[inline]
531    fn from(slice: &'static str) -> Body {
532        Body::from(Bytes::from(slice.as_bytes()))
533    }
534}
535
536impl From<Cow<'static, str>> for Body {
537    #[inline]
538    fn from(cow: Cow<'static, str>) -> Body {
539        match cow {
540            Cow::Borrowed(b) => Body::from(b),
541            Cow::Owned(o) => Body::from(o),
542        }
543    }
544}
545
546impl Sender {
547    /// Check to see if this `Sender` can send more data.
548    pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
549        // Check if the receiver end has tried polling for the body yet
550        ready!(self.poll_want(cx)?);
551        self.data_tx
552            .poll_ready(cx)
553            .map_err(|_| crate::Error::new_closed())
554    }
555
556    fn poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
557        match self.want_rx.load(cx) {
558            WANT_READY => Poll::Ready(Ok(())),
559            WANT_PENDING => Poll::Pending,
560            watch::CLOSED => Poll::Ready(Err(crate::Error::new_closed())),
561            unexpected => unreachable!("want_rx value: {}", unexpected),
562        }
563    }
564
565    async fn ready(&mut self) -> crate::Result<()> {
566        futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
567    }
568
569    /// Send data on data channel when it is ready.
570    pub async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> {
571        self.ready().await?;
572        self.data_tx
573            .try_send(Ok(chunk))
574            .map_err(|_| crate::Error::new_closed())
575    }
576
577    /// Send trailers on trailers channel.
578    pub async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> {
579        let tx = match self.trailers_tx.take() {
580            Some(tx) => tx,
581            None => return Err(crate::Error::new_closed()),
582        };
583        tx.send(trailers).map_err(|_| crate::Error::new_closed())
584    }
585
586    /// Try to send data on this channel.
587    ///
588    /// # Errors
589    ///
590    /// Returns `Err(Bytes)` if the channel could not (currently) accept
591    /// another `Bytes`.
592    ///
593    /// # Note
594    ///
595    /// This is mostly useful for when trying to send from some other thread
596    /// that doesn't have an async context. If in an async context, prefer
597    /// `send_data()` instead.
598    pub fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
599        self.data_tx
600            .try_send(Ok(chunk))
601            .map_err(|err| err.into_inner().expect("just sent Ok"))
602    }
603
604    /// Aborts the body in an abnormal fashion.
605    pub fn abort(self) {
606        let _ = self
607            .data_tx
608            // clone so the send works even if buffer is full
609            .clone()
610            .try_send(Err(crate::Error::new_body_write_aborted()));
611    }
612
613    #[cfg(feature = "http1")]
614    pub(crate) fn send_error(&mut self, err: crate::Error) {
615        let _ = self.data_tx.try_send(Err(err));
616    }
617}
618
619impl fmt::Debug for Sender {
620    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
621        #[derive(Debug)]
622        struct Open;
623        #[derive(Debug)]
624        struct Closed;
625
626        let mut builder = f.debug_tuple("Sender");
627        match self.want_rx.peek() {
628            watch::CLOSED => builder.field(&Closed),
629            _ => builder.field(&Open),
630        };
631
632        builder.finish()
633    }
634}
635
636#[cfg(test)]
637mod tests {
638    use std::mem;
639    use std::task::Poll;
640
641    use super::{Body, DecodedLength, HttpBody, Sender, SizeHint};
642
643    #[test]
644    fn test_size_of() {
645        // These are mostly to help catch *accidentally* increasing
646        // the size by too much.
647
648        let body_size = mem::size_of::<Body>();
649        let body_expected_size = mem::size_of::<u64>() * 6;
650        assert!(
651            body_size <= body_expected_size,
652            "Body size = {} <= {}",
653            body_size,
654            body_expected_size,
655        );
656
657        assert_eq!(body_size, mem::size_of::<Option<Body>>(), "Option<Body>");
658
659        assert_eq!(
660            mem::size_of::<Sender>(),
661            mem::size_of::<usize>() * 5,
662            "Sender"
663        );
664
665        assert_eq!(
666            mem::size_of::<Sender>(),
667            mem::size_of::<Option<Sender>>(),
668            "Option<Sender>"
669        );
670    }
671
672    #[test]
673    fn size_hint() {
674        fn eq(body: Body, b: SizeHint, note: &str) {
675            let a = body.size_hint();
676            assert_eq!(a.lower(), b.lower(), "lower for {:?}", note);
677            assert_eq!(a.upper(), b.upper(), "upper for {:?}", note);
678        }
679
680        eq(Body::from("Hello"), SizeHint::with_exact(5), "from str");
681
682        eq(Body::empty(), SizeHint::with_exact(0), "empty");
683
684        eq(Body::channel().1, SizeHint::new(), "channel");
685
686        eq(
687            Body::new_channel(DecodedLength::new(4), /*wanter =*/ false).1,
688            SizeHint::with_exact(4),
689            "channel with length",
690        );
691    }
692
693    #[tokio::test]
694    async fn channel_abort() {
695        let (tx, mut rx) = Body::channel();
696
697        tx.abort();
698
699        let err = rx.data().await.unwrap().unwrap_err();
700        assert!(err.is_body_write_aborted(), "{:?}", err);
701    }
702
703    #[tokio::test]
704    async fn channel_abort_when_buffer_is_full() {
705        let (mut tx, mut rx) = Body::channel();
706
707        tx.try_send_data("chunk 1".into()).expect("send 1");
708        // buffer is full, but can still send abort
709        tx.abort();
710
711        let chunk1 = rx.data().await.expect("item 1").expect("chunk 1");
712        assert_eq!(chunk1, "chunk 1");
713
714        let err = rx.data().await.unwrap().unwrap_err();
715        assert!(err.is_body_write_aborted(), "{:?}", err);
716    }
717
718    #[test]
719    fn channel_buffers_one() {
720        let (mut tx, _rx) = Body::channel();
721
722        tx.try_send_data("chunk 1".into()).expect("send 1");
723
724        // buffer is now full
725        let chunk2 = tx.try_send_data("chunk 2".into()).expect_err("send 2");
726        assert_eq!(chunk2, "chunk 2");
727    }
728
729    #[tokio::test]
730    async fn channel_empty() {
731        let (_, mut rx) = Body::channel();
732
733        assert!(rx.data().await.is_none());
734    }
735
736    #[test]
737    fn channel_ready() {
738        let (mut tx, _rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ false);
739
740        let mut tx_ready = tokio_test::task::spawn(tx.ready());
741
742        assert!(tx_ready.poll().is_ready(), "tx is ready immediately");
743    }
744
745    #[test]
746    fn channel_wanter() {
747        let (mut tx, mut rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);
748
749        let mut tx_ready = tokio_test::task::spawn(tx.ready());
750        let mut rx_data = tokio_test::task::spawn(rx.data());
751
752        assert!(
753            tx_ready.poll().is_pending(),
754            "tx isn't ready before rx has been polled"
755        );
756
757        assert!(rx_data.poll().is_pending(), "poll rx.data");
758        assert!(tx_ready.is_woken(), "rx poll wakes tx");
759
760        assert!(
761            tx_ready.poll().is_ready(),
762            "tx is ready after rx has been polled"
763        );
764    }
765
766    #[test]
767    fn channel_notices_closure() {
768        let (mut tx, rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);
769
770        let mut tx_ready = tokio_test::task::spawn(tx.ready());
771
772        assert!(
773            tx_ready.poll().is_pending(),
774            "tx isn't ready before rx has been polled"
775        );
776
777        drop(rx);
778        assert!(tx_ready.is_woken(), "dropping rx wakes tx");
779
780        match tx_ready.poll() {
781            Poll::Ready(Err(ref e)) if e.is_closed() => (),
782            unexpected => panic!("tx poll ready unexpected: {:?}", unexpected),
783        }
784    }
785}