futures_util/stream/try_stream/
into_async_read.rs

1use core::pin::Pin;
2use futures_core::ready;
3use futures_core::stream::TryStream;
4use futures_core::task::{Context, Poll};
5use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite};
6use pin_project_lite::pin_project;
7use std::cmp;
8use std::io::{Error, Result};
9
10pin_project! {
11    /// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method.
12    #[derive(Debug)]
13    #[must_use = "readers do nothing unless polled"]
14    #[cfg_attr(docsrs, doc(cfg(feature = "io")))]
15    pub struct IntoAsyncRead<St>
16    where
17        St: TryStream<Error = Error>,
18        St::Ok: AsRef<[u8]>,
19    {
20        #[pin]
21        stream: St,
22        state: ReadState<St::Ok>,
23    }
24}
25
26#[derive(Debug)]
27enum ReadState<T: AsRef<[u8]>> {
28    Ready { chunk: T, chunk_start: usize },
29    PendingChunk,
30    Eof,
31}
32
33impl<St> IntoAsyncRead<St>
34where
35    St: TryStream<Error = Error>,
36    St::Ok: AsRef<[u8]>,
37{
38    pub(super) fn new(stream: St) -> Self {
39        Self { stream, state: ReadState::PendingChunk }
40    }
41}
42
43impl<St> AsyncRead for IntoAsyncRead<St>
44where
45    St: TryStream<Error = Error>,
46    St::Ok: AsRef<[u8]>,
47{
48    fn poll_read(
49        self: Pin<&mut Self>,
50        cx: &mut Context<'_>,
51        buf: &mut [u8],
52    ) -> Poll<Result<usize>> {
53        let mut this = self.project();
54
55        loop {
56            match this.state {
57                ReadState::Ready { chunk, chunk_start } => {
58                    let chunk = chunk.as_ref();
59                    let len = cmp::min(buf.len(), chunk.len() - *chunk_start);
60
61                    buf[..len].copy_from_slice(&chunk[*chunk_start..*chunk_start + len]);
62                    *chunk_start += len;
63
64                    if chunk.len() == *chunk_start {
65                        *this.state = ReadState::PendingChunk;
66                    }
67
68                    return Poll::Ready(Ok(len));
69                }
70                ReadState::PendingChunk => match ready!(this.stream.as_mut().try_poll_next(cx)) {
71                    Some(Ok(chunk)) => {
72                        if !chunk.as_ref().is_empty() {
73                            *this.state = ReadState::Ready { chunk, chunk_start: 0 };
74                        }
75                    }
76                    Some(Err(err)) => {
77                        *this.state = ReadState::Eof;
78                        return Poll::Ready(Err(err));
79                    }
80                    None => {
81                        *this.state = ReadState::Eof;
82                        return Poll::Ready(Ok(0));
83                    }
84                },
85                ReadState::Eof => {
86                    return Poll::Ready(Ok(0));
87                }
88            }
89        }
90    }
91}
92
93impl<St> AsyncWrite for IntoAsyncRead<St>
94where
95    St: TryStream<Error = Error> + AsyncWrite,
96    St::Ok: AsRef<[u8]>,
97{
98    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
99        let this = self.project();
100        this.stream.poll_write(cx, buf)
101    }
102
103    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
104        let this = self.project();
105        this.stream.poll_flush(cx)
106    }
107
108    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
109        let this = self.project();
110        this.stream.poll_close(cx)
111    }
112}
113
114impl<St> AsyncBufRead for IntoAsyncRead<St>
115where
116    St: TryStream<Error = Error>,
117    St::Ok: AsRef<[u8]>,
118{
119    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
120        let mut this = self.project();
121
122        while let ReadState::PendingChunk = this.state {
123            match ready!(this.stream.as_mut().try_poll_next(cx)) {
124                Some(Ok(chunk)) => {
125                    if !chunk.as_ref().is_empty() {
126                        *this.state = ReadState::Ready { chunk, chunk_start: 0 };
127                    }
128                }
129                Some(Err(err)) => {
130                    *this.state = ReadState::Eof;
131                    return Poll::Ready(Err(err));
132                }
133                None => {
134                    *this.state = ReadState::Eof;
135                    return Poll::Ready(Ok(&[]));
136                }
137            }
138        }
139
140        if let &mut ReadState::Ready { ref chunk, chunk_start } = this.state {
141            let chunk = chunk.as_ref();
142            return Poll::Ready(Ok(&chunk[chunk_start..]));
143        }
144
145        // To get to this point we must be in ReadState::Eof
146        Poll::Ready(Ok(&[]))
147    }
148
149    fn consume(self: Pin<&mut Self>, amount: usize) {
150        let this = self.project();
151
152        // https://github.com/rust-lang/futures-rs/pull/1556#discussion_r281644295
153        if amount == 0 {
154            return;
155        }
156        if let ReadState::Ready { chunk, chunk_start } = this.state {
157            *chunk_start += amount;
158            debug_assert!(*chunk_start <= chunk.as_ref().len());
159            if *chunk_start >= chunk.as_ref().len() {
160                *this.state = ReadState::PendingChunk;
161            }
162        } else {
163            debug_assert!(false, "Attempted to consume from IntoAsyncRead without chunk");
164        }
165    }
166}