futures_util/io/
chain.rs

1use futures_core::ready;
2use futures_core::task::{Context, Poll};
3use futures_io::{AsyncBufRead, AsyncRead, IoSliceMut};
4use pin_project_lite::pin_project;
5use std::fmt;
6use std::io;
7use std::pin::Pin;
8
9pin_project! {
10    /// Reader for the [`chain`](super::AsyncReadExt::chain) method.
11    #[must_use = "readers do nothing unless polled"]
12    pub struct Chain<T, U> {
13        #[pin]
14        first: T,
15        #[pin]
16        second: U,
17        done_first: bool,
18    }
19}
20
21impl<T, U> Chain<T, U>
22where
23    T: AsyncRead,
24    U: AsyncRead,
25{
26    pub(super) fn new(first: T, second: U) -> Self {
27        Self { first, second, done_first: false }
28    }
29
30    /// Gets references to the underlying readers in this `Chain`.
31    pub fn get_ref(&self) -> (&T, &U) {
32        (&self.first, &self.second)
33    }
34
35    /// Gets mutable references to the underlying readers in this `Chain`.
36    ///
37    /// Care should be taken to avoid modifying the internal I/O state of the
38    /// underlying readers as doing so may corrupt the internal state of this
39    /// `Chain`.
40    pub fn get_mut(&mut self) -> (&mut T, &mut U) {
41        (&mut self.first, &mut self.second)
42    }
43
44    /// Gets pinned mutable references to the underlying readers in this `Chain`.
45    ///
46    /// Care should be taken to avoid modifying the internal I/O state of the
47    /// underlying readers as doing so may corrupt the internal state of this
48    /// `Chain`.
49    pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut T>, Pin<&mut U>) {
50        let this = self.project();
51        (this.first, this.second)
52    }
53
54    /// Consumes the `Chain`, returning the wrapped readers.
55    pub fn into_inner(self) -> (T, U) {
56        (self.first, self.second)
57    }
58}
59
60impl<T, U> fmt::Debug for Chain<T, U>
61where
62    T: fmt::Debug,
63    U: fmt::Debug,
64{
65    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
66        f.debug_struct("Chain")
67            .field("t", &self.first)
68            .field("u", &self.second)
69            .field("done_first", &self.done_first)
70            .finish()
71    }
72}
73
74impl<T, U> AsyncRead for Chain<T, U>
75where
76    T: AsyncRead,
77    U: AsyncRead,
78{
79    fn poll_read(
80        self: Pin<&mut Self>,
81        cx: &mut Context<'_>,
82        buf: &mut [u8],
83    ) -> Poll<io::Result<usize>> {
84        let this = self.project();
85
86        if !*this.done_first {
87            match ready!(this.first.poll_read(cx, buf)?) {
88                0 if !buf.is_empty() => *this.done_first = true,
89                n => return Poll::Ready(Ok(n)),
90            }
91        }
92        this.second.poll_read(cx, buf)
93    }
94
95    fn poll_read_vectored(
96        self: Pin<&mut Self>,
97        cx: &mut Context<'_>,
98        bufs: &mut [IoSliceMut<'_>],
99    ) -> Poll<io::Result<usize>> {
100        let this = self.project();
101
102        if !*this.done_first {
103            let n = ready!(this.first.poll_read_vectored(cx, bufs)?);
104            if n == 0 && bufs.iter().any(|b| !b.is_empty()) {
105                *this.done_first = true
106            } else {
107                return Poll::Ready(Ok(n));
108            }
109        }
110        this.second.poll_read_vectored(cx, bufs)
111    }
112}
113
114impl<T, U> AsyncBufRead for Chain<T, U>
115where
116    T: AsyncBufRead,
117    U: AsyncBufRead,
118{
119    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
120        let this = self.project();
121
122        if !*this.done_first {
123            match ready!(this.first.poll_fill_buf(cx)?) {
124                buf if buf.is_empty() => {
125                    *this.done_first = true;
126                }
127                buf => return Poll::Ready(Ok(buf)),
128            }
129        }
130        this.second.poll_fill_buf(cx)
131    }
132
133    fn consume(self: Pin<&mut Self>, amt: usize) {
134        let this = self.project();
135
136        if !*this.done_first {
137            this.first.consume(amt)
138        } else {
139            this.second.consume(amt)
140        }
141    }
142}