hyper/proto/h1/
dispatch.rs

1use std::error::Error as StdError;
2
3use bytes::{Buf, Bytes};
4use http::Request;
5use tokio::io::{AsyncRead, AsyncWrite};
6use tracing::{debug, trace};
7
8use super::{Http1Transaction, Wants};
9use crate::body::{Body, DecodedLength, HttpBody};
10use crate::common::{task, Future, Pin, Poll, Unpin};
11use crate::proto::{
12    BodyLength, Conn, Dispatched, MessageHead, RequestHead,
13};
14use crate::upgrade::OnUpgrade;
15
16pub(crate) struct Dispatcher<D, Bs: HttpBody, I, T> {
17    conn: Conn<I, Bs::Data, T>,
18    dispatch: D,
19    body_tx: Option<crate::body::Sender>,
20    body_rx: Pin<Box<Option<Bs>>>,
21    is_closing: bool,
22}
23
24pub(crate) trait Dispatch {
25    type PollItem;
26    type PollBody;
27    type PollError;
28    type RecvItem;
29    fn poll_msg(
30        self: Pin<&mut Self>,
31        cx: &mut task::Context<'_>,
32    ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>>;
33    fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()>;
34    fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>>;
35    fn should_poll(&self) -> bool;
36}
37
38cfg_server! {
39    use crate::service::HttpService;
40
41    pub(crate) struct Server<S: HttpService<B>, B> {
42        in_flight: Pin<Box<Option<S::Future>>>,
43        pub(crate) service: S,
44    }
45}
46
47cfg_client! {
48    pin_project_lite::pin_project! {
49        pub(crate) struct Client<B> {
50            callback: Option<crate::client::dispatch::Callback<Request<B>, http::Response<Body>>>,
51            #[pin]
52            rx: ClientRx<B>,
53            rx_closed: bool,
54        }
55    }
56
57    type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, http::Response<Body>>;
58}
59
60impl<D, Bs, I, T> Dispatcher<D, Bs, I, T>
61where
62    D: Dispatch<
63        PollItem = MessageHead<T::Outgoing>,
64        PollBody = Bs,
65        RecvItem = MessageHead<T::Incoming>,
66    > + Unpin,
67    D::PollError: Into<Box<dyn StdError + Send + Sync>>,
68    I: AsyncRead + AsyncWrite + Unpin,
69    T: Http1Transaction + Unpin,
70    Bs: HttpBody + 'static,
71    Bs::Error: Into<Box<dyn StdError + Send + Sync>>,
72{
73    pub(crate) fn new(dispatch: D, conn: Conn<I, Bs::Data, T>) -> Self {
74        Dispatcher {
75            conn,
76            dispatch,
77            body_tx: None,
78            body_rx: Box::pin(None),
79            is_closing: false,
80        }
81    }
82
83    #[cfg(feature = "server")]
84    pub(crate) fn disable_keep_alive(&mut self) {
85        self.conn.disable_keep_alive();
86        if self.conn.is_write_closed() {
87            self.close();
88        }
89    }
90
91    pub(crate) fn into_inner(self) -> (I, Bytes, D) {
92        let (io, buf) = self.conn.into_inner();
93        (io, buf, self.dispatch)
94    }
95
96    /// Run this dispatcher until HTTP says this connection is done,
97    /// but don't call `AsyncWrite::shutdown` on the underlying IO.
98    ///
99    /// This is useful for old-style HTTP upgrades, but ignores
100    /// newer-style upgrade API.
101    pub(crate) fn poll_without_shutdown(
102        &mut self,
103        cx: &mut task::Context<'_>,
104    ) -> Poll<crate::Result<()>>
105    where
106        Self: Unpin,
107    {
108        Pin::new(self).poll_catch(cx, false).map_ok(|ds| {
109            if let Dispatched::Upgrade(pending) = ds {
110                pending.manual();
111            }
112        })
113    }
114
115    fn poll_catch(
116        &mut self,
117        cx: &mut task::Context<'_>,
118        should_shutdown: bool,
119    ) -> Poll<crate::Result<Dispatched>> {
120        Poll::Ready(ready!(self.poll_inner(cx, should_shutdown)).or_else(|e| {
121            // An error means we're shutting down either way.
122            // We just try to give the error to the user,
123            // and close the connection with an Ok. If we
124            // cannot give it to the user, then return the Err.
125            self.dispatch.recv_msg(Err(e))?;
126            Ok(Dispatched::Shutdown)
127        }))
128    }
129
130    fn poll_inner(
131        &mut self,
132        cx: &mut task::Context<'_>,
133        should_shutdown: bool,
134    ) -> Poll<crate::Result<Dispatched>> {
135        T::update_date();
136
137        ready!(self.poll_loop(cx))?;
138
139        if self.is_done() {
140            if let Some(pending) = self.conn.pending_upgrade() {
141                self.conn.take_error()?;
142                return Poll::Ready(Ok(Dispatched::Upgrade(pending)));
143            } else if should_shutdown {
144                ready!(self.conn.poll_shutdown(cx)).map_err(crate::Error::new_shutdown)?;
145            }
146            self.conn.take_error()?;
147            Poll::Ready(Ok(Dispatched::Shutdown))
148        } else {
149            Poll::Pending
150        }
151    }
152
153    fn poll_loop(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
154        // Limit the looping on this connection, in case it is ready far too
155        // often, so that other futures don't starve.
156        //
157        // 16 was chosen arbitrarily, as that is number of pipelined requests
158        // benchmarks often use. Perhaps it should be a config option instead.
159        for _ in 0..16 {
160            let _ = self.poll_read(cx)?;
161            let _ = self.poll_write(cx)?;
162            let _ = self.poll_flush(cx)?;
163
164            // This could happen if reading paused before blocking on IO,
165            // such as getting to the end of a framed message, but then
166            // writing/flushing set the state back to Init. In that case,
167            // if the read buffer still had bytes, we'd want to try poll_read
168            // again, or else we wouldn't ever be woken up again.
169            //
170            // Using this instead of task::current() and notify() inside
171            // the Conn is noticeably faster in pipelined benchmarks.
172            if !self.conn.wants_read_again() {
173                //break;
174                return Poll::Ready(Ok(()));
175            }
176        }
177
178        trace!("poll_loop yielding (self = {:p})", self);
179
180        task::yield_now(cx).map(|never| match never {})
181    }
182
183    fn poll_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
184        loop {
185            if self.is_closing {
186                return Poll::Ready(Ok(()));
187            } else if self.conn.can_read_head() {
188                ready!(self.poll_read_head(cx))?;
189            } else if let Some(mut body) = self.body_tx.take() {
190                if self.conn.can_read_body() {
191                    match body.poll_ready(cx) {
192                        Poll::Ready(Ok(())) => (),
193                        Poll::Pending => {
194                            self.body_tx = Some(body);
195                            return Poll::Pending;
196                        }
197                        Poll::Ready(Err(_canceled)) => {
198                            // user doesn't care about the body
199                            // so we should stop reading
200                            trace!("body receiver dropped before eof, draining or closing");
201                            self.conn.poll_drain_or_close_read(cx);
202                            continue;
203                        }
204                    }
205                    match self.conn.poll_read_body(cx) {
206                        Poll::Ready(Some(Ok(chunk))) => match body.try_send_data(chunk) {
207                            Ok(()) => {
208                                self.body_tx = Some(body);
209                            }
210                            Err(_canceled) => {
211                                if self.conn.can_read_body() {
212                                    trace!("body receiver dropped before eof, closing");
213                                    self.conn.close_read();
214                                }
215                            }
216                        },
217                        Poll::Ready(None) => {
218                            // just drop, the body will close automatically
219                        }
220                        Poll::Pending => {
221                            self.body_tx = Some(body);
222                            return Poll::Pending;
223                        }
224                        Poll::Ready(Some(Err(e))) => {
225                            body.send_error(crate::Error::new_body(e));
226                        }
227                    }
228                } else {
229                    // just drop, the body will close automatically
230                }
231            } else {
232                return self.conn.poll_read_keep_alive(cx);
233            }
234        }
235    }
236
237    fn poll_read_head(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
238        // can dispatch receive, or does it still care about, an incoming message?
239        match ready!(self.dispatch.poll_ready(cx)) {
240            Ok(()) => (),
241            Err(()) => {
242                trace!("dispatch no longer receiving messages");
243                self.close();
244                return Poll::Ready(Ok(()));
245            }
246        }
247        // dispatch is ready for a message, try to read one
248        match ready!(self.conn.poll_read_head(cx)) {
249            Some(Ok((mut head, body_len, wants))) => {
250                let body = match body_len {
251                    DecodedLength::ZERO => Body::empty(),
252                    other => {
253                        let (tx, rx) = Body::new_channel(other, wants.contains(Wants::EXPECT));
254                        self.body_tx = Some(tx);
255                        rx
256                    }
257                };
258                if wants.contains(Wants::UPGRADE) {
259                    let upgrade = self.conn.on_upgrade();
260                    debug_assert!(!upgrade.is_none(), "empty upgrade");
261                    debug_assert!(head.extensions.get::<OnUpgrade>().is_none(), "OnUpgrade already set");
262                    head.extensions.insert(upgrade);
263                }
264                self.dispatch.recv_msg(Ok((head, body)))?;
265                Poll::Ready(Ok(()))
266            }
267            Some(Err(err)) => {
268                debug!("read_head error: {}", err);
269                self.dispatch.recv_msg(Err(err))?;
270                // if here, the dispatcher gave the user the error
271                // somewhere else. we still need to shutdown, but
272                // not as a second error.
273                self.close();
274                Poll::Ready(Ok(()))
275            }
276            None => {
277                // read eof, the write side will have been closed too unless
278                // allow_read_close was set to true, in which case just do
279                // nothing...
280                debug_assert!(self.conn.is_read_closed());
281                if self.conn.is_write_closed() {
282                    self.close();
283                }
284                Poll::Ready(Ok(()))
285            }
286        }
287    }
288
289    fn poll_write(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
290        loop {
291            if self.is_closing {
292                return Poll::Ready(Ok(()));
293            } else if self.body_rx.is_none()
294                && self.conn.can_write_head()
295                && self.dispatch.should_poll()
296            {
297                if let Some(msg) = ready!(Pin::new(&mut self.dispatch).poll_msg(cx)) {
298                    let (head, mut body) = msg.map_err(crate::Error::new_user_service)?;
299
300                    // Check if the body knows its full data immediately.
301                    //
302                    // If so, we can skip a bit of bookkeeping that streaming
303                    // bodies need to do.
304                    if let Some(full) = crate::body::take_full_data(&mut body) {
305                        self.conn.write_full_msg(head, full);
306                        return Poll::Ready(Ok(()));
307                    }
308
309                    let body_type = if body.is_end_stream() {
310                        self.body_rx.set(None);
311                        None
312                    } else {
313                        let btype = body
314                            .size_hint()
315                            .exact()
316                            .map(BodyLength::Known)
317                            .or_else(|| Some(BodyLength::Unknown));
318                        self.body_rx.set(Some(body));
319                        btype
320                    };
321                    self.conn.write_head(head, body_type);
322                } else {
323                    self.close();
324                    return Poll::Ready(Ok(()));
325                }
326            } else if !self.conn.can_buffer_body() {
327                ready!(self.poll_flush(cx))?;
328            } else {
329                // A new scope is needed :(
330                if let (Some(mut body), clear_body) =
331                    OptGuard::new(self.body_rx.as_mut()).guard_mut()
332                {
333                    debug_assert!(!*clear_body, "opt guard defaults to keeping body");
334                    if !self.conn.can_write_body() {
335                        trace!(
336                            "no more write body allowed, user body is_end_stream = {}",
337                            body.is_end_stream(),
338                        );
339                        *clear_body = true;
340                        continue;
341                    }
342
343                    let item = ready!(body.as_mut().poll_data(cx));
344                    if let Some(item) = item {
345                        let chunk = item.map_err(|e| {
346                            *clear_body = true;
347                            crate::Error::new_user_body(e)
348                        })?;
349                        let eos = body.is_end_stream();
350                        if eos {
351                            *clear_body = true;
352                            if chunk.remaining() == 0 {
353                                trace!("discarding empty chunk");
354                                self.conn.end_body()?;
355                            } else {
356                                self.conn.write_body_and_end(chunk);
357                            }
358                        } else {
359                            if chunk.remaining() == 0 {
360                                trace!("discarding empty chunk");
361                                continue;
362                            }
363                            self.conn.write_body(chunk);
364                        }
365                    } else {
366                        *clear_body = true;
367                        self.conn.end_body()?;
368                    }
369                } else {
370                    return Poll::Pending;
371                }
372            }
373        }
374    }
375
376    fn poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
377        self.conn.poll_flush(cx).map_err(|err| {
378            debug!("error writing: {}", err);
379            crate::Error::new_body_write(err)
380        })
381    }
382
383    fn close(&mut self) {
384        self.is_closing = true;
385        self.conn.close_read();
386        self.conn.close_write();
387    }
388
389    fn is_done(&self) -> bool {
390        if self.is_closing {
391            return true;
392        }
393
394        let read_done = self.conn.is_read_closed();
395
396        if !T::should_read_first() && read_done {
397            // a client that cannot read may was well be done.
398            true
399        } else {
400            let write_done = self.conn.is_write_closed()
401                || (!self.dispatch.should_poll() && self.body_rx.is_none());
402            read_done && write_done
403        }
404    }
405}
406
407impl<D, Bs, I, T> Future for Dispatcher<D, Bs, I, T>
408where
409    D: Dispatch<
410        PollItem = MessageHead<T::Outgoing>,
411        PollBody = Bs,
412        RecvItem = MessageHead<T::Incoming>,
413    > + Unpin,
414    D::PollError: Into<Box<dyn StdError + Send + Sync>>,
415    I: AsyncRead + AsyncWrite + Unpin,
416    T: Http1Transaction + Unpin,
417    Bs: HttpBody + 'static,
418    Bs::Error: Into<Box<dyn StdError + Send + Sync>>,
419{
420    type Output = crate::Result<Dispatched>;
421
422    #[inline]
423    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
424        self.poll_catch(cx, true)
425    }
426}
427
428// ===== impl OptGuard =====
429
430/// A drop guard to allow a mutable borrow of an Option while being able to
431/// set whether the `Option` should be cleared on drop.
432struct OptGuard<'a, T>(Pin<&'a mut Option<T>>, bool);
433
434impl<'a, T> OptGuard<'a, T> {
435    fn new(pin: Pin<&'a mut Option<T>>) -> Self {
436        OptGuard(pin, false)
437    }
438
439    fn guard_mut(&mut self) -> (Option<Pin<&mut T>>, &mut bool) {
440        (self.0.as_mut().as_pin_mut(), &mut self.1)
441    }
442}
443
444impl<'a, T> Drop for OptGuard<'a, T> {
445    fn drop(&mut self) {
446        if self.1 {
447            self.0.set(None);
448        }
449    }
450}
451
452// ===== impl Server =====
453
454cfg_server! {
455    impl<S, B> Server<S, B>
456    where
457        S: HttpService<B>,
458    {
459        pub(crate) fn new(service: S) -> Server<S, B> {
460            Server {
461                in_flight: Box::pin(None),
462                service,
463            }
464        }
465
466        pub(crate) fn into_service(self) -> S {
467            self.service
468        }
469    }
470
471    // Service is never pinned
472    impl<S: HttpService<B>, B> Unpin for Server<S, B> {}
473
474    impl<S, Bs> Dispatch for Server<S, Body>
475    where
476        S: HttpService<Body, ResBody = Bs>,
477        S::Error: Into<Box<dyn StdError + Send + Sync>>,
478        Bs: HttpBody,
479    {
480        type PollItem = MessageHead<http::StatusCode>;
481        type PollBody = Bs;
482        type PollError = S::Error;
483        type RecvItem = RequestHead;
484
485        fn poll_msg(
486            mut self: Pin<&mut Self>,
487            cx: &mut task::Context<'_>,
488        ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>> {
489            let mut this = self.as_mut();
490            let ret = if let Some(ref mut fut) = this.in_flight.as_mut().as_pin_mut() {
491                let resp = ready!(fut.as_mut().poll(cx)?);
492                let (parts, body) = resp.into_parts();
493                let head = MessageHead {
494                    version: parts.version,
495                    subject: parts.status,
496                    headers: parts.headers,
497                    extensions: parts.extensions,
498                };
499                Poll::Ready(Some(Ok((head, body))))
500            } else {
501                unreachable!("poll_msg shouldn't be called if no inflight");
502            };
503
504            // Since in_flight finished, remove it
505            this.in_flight.set(None);
506            ret
507        }
508
509        fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()> {
510            let (msg, body) = msg?;
511            let mut req = Request::new(body);
512            *req.method_mut() = msg.subject.0;
513            *req.uri_mut() = msg.subject.1;
514            *req.headers_mut() = msg.headers;
515            *req.version_mut() = msg.version;
516            *req.extensions_mut() = msg.extensions;
517            let fut = self.service.call(req);
518            self.in_flight.set(Some(fut));
519            Ok(())
520        }
521
522        fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>> {
523            if self.in_flight.is_some() {
524                Poll::Pending
525            } else {
526                self.service.poll_ready(cx).map_err(|_e| {
527                    // FIXME: return error value.
528                    trace!("service closed");
529                })
530            }
531        }
532
533        fn should_poll(&self) -> bool {
534            self.in_flight.is_some()
535        }
536    }
537}
538
539// ===== impl Client =====
540
541cfg_client! {
542    impl<B> Client<B> {
543        pub(crate) fn new(rx: ClientRx<B>) -> Client<B> {
544            Client {
545                callback: None,
546                rx,
547                rx_closed: false,
548            }
549        }
550    }
551
552    impl<B> Dispatch for Client<B>
553    where
554        B: HttpBody,
555    {
556        type PollItem = RequestHead;
557        type PollBody = B;
558        type PollError = crate::common::Never;
559        type RecvItem = crate::proto::ResponseHead;
560
561        fn poll_msg(
562            mut self: Pin<&mut Self>,
563            cx: &mut task::Context<'_>,
564        ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), crate::common::Never>>> {
565            let mut this = self.as_mut();
566            debug_assert!(!this.rx_closed);
567            match this.rx.poll_recv(cx) {
568                Poll::Ready(Some((req, mut cb))) => {
569                    // check that future hasn't been canceled already
570                    match cb.poll_canceled(cx) {
571                        Poll::Ready(()) => {
572                            trace!("request canceled");
573                            Poll::Ready(None)
574                        }
575                        Poll::Pending => {
576                            let (parts, body) = req.into_parts();
577                            let head = RequestHead {
578                                version: parts.version,
579                                subject: crate::proto::RequestLine(parts.method, parts.uri),
580                                headers: parts.headers,
581                                extensions: parts.extensions,
582                            };
583                            this.callback = Some(cb);
584                            Poll::Ready(Some(Ok((head, body))))
585                        }
586                    }
587                }
588                Poll::Ready(None) => {
589                    // user has dropped sender handle
590                    trace!("client tx closed");
591                    this.rx_closed = true;
592                    Poll::Ready(None)
593                }
594                Poll::Pending => Poll::Pending,
595            }
596        }
597
598        fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()> {
599            match msg {
600                Ok((msg, body)) => {
601                    if let Some(cb) = self.callback.take() {
602                        let res = msg.into_response(body);
603                        cb.send(Ok(res));
604                        Ok(())
605                    } else {
606                        // Getting here is likely a bug! An error should have happened
607                        // in Conn::require_empty_read() before ever parsing a
608                        // full message!
609                        Err(crate::Error::new_unexpected_message())
610                    }
611                }
612                Err(err) => {
613                    if let Some(cb) = self.callback.take() {
614                        cb.send(Err((err, None)));
615                        Ok(())
616                    } else if !self.rx_closed {
617                        self.rx.close();
618                        if let Some((req, cb)) = self.rx.try_recv() {
619                            trace!("canceling queued request with connection error: {}", err);
620                            // in this case, the message was never even started, so it's safe to tell
621                            // the user that the request was completely canceled
622                            cb.send(Err((crate::Error::new_canceled().with(err), Some(req))));
623                            Ok(())
624                        } else {
625                            Err(err)
626                        }
627                    } else {
628                        Err(err)
629                    }
630                }
631            }
632        }
633
634        fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>> {
635            match self.callback {
636                Some(ref mut cb) => match cb.poll_canceled(cx) {
637                    Poll::Ready(()) => {
638                        trace!("callback receiver has dropped");
639                        Poll::Ready(Err(()))
640                    }
641                    Poll::Pending => Poll::Ready(Ok(())),
642                },
643                None => Poll::Ready(Err(())),
644            }
645        }
646
647        fn should_poll(&self) -> bool {
648            self.callback.is_none()
649        }
650    }
651}
652
653#[cfg(test)]
654mod tests {
655    use super::*;
656    use crate::proto::h1::ClientTransaction;
657    use std::time::Duration;
658
659    #[test]
660    fn client_read_bytes_before_writing_request() {
661        let _ = pretty_env_logger::try_init();
662
663        tokio_test::task::spawn(()).enter(|cx, _| {
664            let (io, mut handle) = tokio_test::io::Builder::new().build_with_handle();
665
666            // Block at 0 for now, but we will release this response before
667            // the request is ready to write later...
668            let (mut tx, rx) = crate::client::dispatch::channel();
669            let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io);
670            let mut dispatcher = Dispatcher::new(Client::new(rx), conn);
671
672            // First poll is needed to allow tx to send...
673            assert!(Pin::new(&mut dispatcher).poll(cx).is_pending());
674
675            // Unblock our IO, which has a response before we've sent request!
676            //
677            handle.read(b"HTTP/1.1 200 OK\r\n\r\n");
678
679            let mut res_rx = tx
680                .try_send(crate::Request::new(crate::Body::empty()))
681                .unwrap();
682
683            tokio_test::assert_ready_ok!(Pin::new(&mut dispatcher).poll(cx));
684            let err = tokio_test::assert_ready_ok!(Pin::new(&mut res_rx).poll(cx))
685                .expect_err("callback should send error");
686
687            match (err.0.kind(), err.1) {
688                (&crate::error::Kind::Canceled, Some(_)) => (),
689                other => panic!("expected Canceled, got {:?}", other),
690            }
691        });
692    }
693
694    #[tokio::test]
695    async fn client_flushing_is_not_ready_for_next_request() {
696        let _ = pretty_env_logger::try_init();
697
698        let (io, _handle) = tokio_test::io::Builder::new()
699            .write(b"POST / HTTP/1.1\r\ncontent-length: 4\r\n\r\n")
700            .read(b"HTTP/1.1 200 OK\r\ncontent-length: 0\r\n\r\n")
701            .wait(std::time::Duration::from_secs(2))
702            .build_with_handle();
703
704        let (mut tx, rx) = crate::client::dispatch::channel();
705        let mut conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io);
706        conn.set_write_strategy_queue();
707
708        let dispatcher = Dispatcher::new(Client::new(rx), conn);
709        let _dispatcher = tokio::spawn(async move { dispatcher.await });
710
711        let req = crate::Request::builder()
712            .method("POST")
713            .body(crate::Body::from("reee"))
714            .unwrap();
715
716        let res = tx.try_send(req).unwrap().await.expect("response");
717        drop(res);
718
719        assert!(!tx.is_ready());
720    }
721
722    #[tokio::test]
723    async fn body_empty_chunks_ignored() {
724        let _ = pretty_env_logger::try_init();
725
726        let io = tokio_test::io::Builder::new()
727            // no reading or writing, just be blocked for the test...
728            .wait(Duration::from_secs(5))
729            .build();
730
731        let (mut tx, rx) = crate::client::dispatch::channel();
732        let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io);
733        let mut dispatcher = tokio_test::task::spawn(Dispatcher::new(Client::new(rx), conn));
734
735        // First poll is needed to allow tx to send...
736        assert!(dispatcher.poll().is_pending());
737
738        let body = {
739            let (mut tx, body) = crate::Body::channel();
740            tx.try_send_data("".into()).unwrap();
741            body
742        };
743
744        let _res_rx = tx.try_send(crate::Request::new(body)).unwrap();
745
746        // Ensure conn.write_body wasn't called with the empty chunk.
747        // If it is, it will trigger an assertion.
748        assert!(dispatcher.poll().is_pending());
749    }
750}