futures_util/io/
buf_reader.rs

1use super::DEFAULT_BUF_SIZE;
2use futures_core::future::Future;
3use futures_core::ready;
4use futures_core::task::{Context, Poll};
5use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSliceMut, SeekFrom};
6use pin_project_lite::pin_project;
7use std::io::{self, Read};
8use std::pin::Pin;
9use std::{cmp, fmt};
10
11pin_project! {
12    /// The `BufReader` struct adds buffering to any reader.
13    ///
14    /// It can be excessively inefficient to work directly with a [`AsyncRead`]
15    /// instance. A `BufReader` performs large, infrequent reads on the underlying
16    /// [`AsyncRead`] and maintains an in-memory buffer of the results.
17    ///
18    /// `BufReader` can improve the speed of programs that make *small* and
19    /// *repeated* read calls to the same file or network socket. It does not
20    /// help when reading very large amounts at once, or reading just one or a few
21    /// times. It also provides no advantage when reading from a source that is
22    /// already in memory, like a `Vec<u8>`.
23    ///
24    /// When the `BufReader` is dropped, the contents of its buffer will be
25    /// discarded. Creating multiple instances of a `BufReader` on the same
26    /// stream can cause data loss.
27    ///
28    /// [`AsyncRead`]: futures_io::AsyncRead
29    ///
30    // TODO: Examples
31    pub struct BufReader<R> {
32        #[pin]
33        inner: R,
34        buffer: Box<[u8]>,
35        pos: usize,
36        cap: usize,
37    }
38}
39
40impl<R: AsyncRead> BufReader<R> {
41    /// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB,
42    /// but may change in the future.
43    pub fn new(inner: R) -> Self {
44        Self::with_capacity(DEFAULT_BUF_SIZE, inner)
45    }
46
47    /// Creates a new `BufReader` with the specified buffer capacity.
48    pub fn with_capacity(capacity: usize, inner: R) -> Self {
49        unsafe {
50            let mut buffer = Vec::with_capacity(capacity);
51            buffer.set_len(capacity);
52            super::initialize(&inner, &mut buffer);
53            Self { inner, buffer: buffer.into_boxed_slice(), pos: 0, cap: 0 }
54        }
55    }
56
57    delegate_access_inner!(inner, R, ());
58
59    /// Returns a reference to the internally buffered data.
60    ///
61    /// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty.
62    pub fn buffer(&self) -> &[u8] {
63        &self.buffer[self.pos..self.cap]
64    }
65
66    /// Invalidates all data in the internal buffer.
67    #[inline]
68    fn discard_buffer(self: Pin<&mut Self>) {
69        let this = self.project();
70        *this.pos = 0;
71        *this.cap = 0;
72    }
73}
74
75impl<R: AsyncRead + AsyncSeek> BufReader<R> {
76    /// Seeks relative to the current position. If the new position lies within the buffer,
77    /// the buffer will not be flushed, allowing for more efficient seeks.
78    /// This method does not return the location of the underlying reader, so the caller
79    /// must track this information themselves if it is required.
80    pub fn seek_relative(self: Pin<&mut Self>, offset: i64) -> SeeKRelative<'_, R> {
81        SeeKRelative { inner: self, offset, first: true }
82    }
83
84    /// Attempts to seek relative to the current position. If the new position lies within the buffer,
85    /// the buffer will not be flushed, allowing for more efficient seeks.
86    /// This method does not return the location of the underlying reader, so the caller
87    /// must track this information themselves if it is required.
88    pub fn poll_seek_relative(
89        self: Pin<&mut Self>,
90        cx: &mut Context<'_>,
91        offset: i64,
92    ) -> Poll<io::Result<()>> {
93        let pos = self.pos as u64;
94        if offset < 0 {
95            if let Some(new_pos) = pos.checked_sub((-offset) as u64) {
96                *self.project().pos = new_pos as usize;
97                return Poll::Ready(Ok(()));
98            }
99        } else if let Some(new_pos) = pos.checked_add(offset as u64) {
100            if new_pos <= self.cap as u64 {
101                *self.project().pos = new_pos as usize;
102                return Poll::Ready(Ok(()));
103            }
104        }
105        self.poll_seek(cx, SeekFrom::Current(offset)).map(|res| res.map(|_| ()))
106    }
107}
108
109impl<R: AsyncRead> AsyncRead for BufReader<R> {
110    fn poll_read(
111        mut self: Pin<&mut Self>,
112        cx: &mut Context<'_>,
113        buf: &mut [u8],
114    ) -> Poll<io::Result<usize>> {
115        // If we don't have any buffered data and we're doing a massive read
116        // (larger than our internal buffer), bypass our internal buffer
117        // entirely.
118        if self.pos == self.cap && buf.len() >= self.buffer.len() {
119            let res = ready!(self.as_mut().project().inner.poll_read(cx, buf));
120            self.discard_buffer();
121            return Poll::Ready(res);
122        }
123        let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
124        let nread = rem.read(buf)?;
125        self.consume(nread);
126        Poll::Ready(Ok(nread))
127    }
128
129    fn poll_read_vectored(
130        mut self: Pin<&mut Self>,
131        cx: &mut Context<'_>,
132        bufs: &mut [IoSliceMut<'_>],
133    ) -> Poll<io::Result<usize>> {
134        let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
135        if self.pos == self.cap && total_len >= self.buffer.len() {
136            let res = ready!(self.as_mut().project().inner.poll_read_vectored(cx, bufs));
137            self.discard_buffer();
138            return Poll::Ready(res);
139        }
140        let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
141        let nread = rem.read_vectored(bufs)?;
142        self.consume(nread);
143        Poll::Ready(Ok(nread))
144    }
145}
146
147impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
148    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
149        let this = self.project();
150
151        // If we've reached the end of our internal buffer then we need to fetch
152        // some more data from the underlying reader.
153        // Branch using `>=` instead of the more correct `==`
154        // to tell the compiler that the pos..cap slice is always valid.
155        if *this.pos >= *this.cap {
156            debug_assert!(*this.pos == *this.cap);
157            *this.cap = ready!(this.inner.poll_read(cx, this.buffer))?;
158            *this.pos = 0;
159        }
160        Poll::Ready(Ok(&this.buffer[*this.pos..*this.cap]))
161    }
162
163    fn consume(self: Pin<&mut Self>, amt: usize) {
164        *self.project().pos = cmp::min(self.pos + amt, self.cap);
165    }
166}
167
168impl<R: AsyncWrite> AsyncWrite for BufReader<R> {
169    delegate_async_write!(inner);
170}
171
172impl<R: fmt::Debug> fmt::Debug for BufReader<R> {
173    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
174        f.debug_struct("BufReader")
175            .field("reader", &self.inner)
176            .field("buffer", &format_args!("{}/{}", self.cap - self.pos, self.buffer.len()))
177            .finish()
178    }
179}
180
181impl<R: AsyncRead + AsyncSeek> AsyncSeek for BufReader<R> {
182    /// Seek to an offset, in bytes, in the underlying reader.
183    ///
184    /// The position used for seeking with `SeekFrom::Current(_)` is the
185    /// position the underlying reader would be at if the `BufReader` had no
186    /// internal buffer.
187    ///
188    /// Seeking always discards the internal buffer, even if the seek position
189    /// would otherwise fall within it. This guarantees that calling
190    /// `.into_inner()` immediately after a seek yields the underlying reader
191    /// at the same position.
192    ///
193    /// To seek without discarding the internal buffer, use
194    /// [`BufReader::seek_relative`](BufReader::seek_relative) or
195    /// [`BufReader::poll_seek_relative`](BufReader::poll_seek_relative).
196    ///
197    /// See [`AsyncSeek`](futures_io::AsyncSeek) for more details.
198    ///
199    /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)`
200    /// where `n` minus the internal buffer length overflows an `i64`, two
201    /// seeks will be performed instead of one. If the second seek returns
202    /// `Err`, the underlying reader will be left at the same position it would
203    /// have if you called `seek` with `SeekFrom::Current(0)`.
204    fn poll_seek(
205        mut self: Pin<&mut Self>,
206        cx: &mut Context<'_>,
207        pos: SeekFrom,
208    ) -> Poll<io::Result<u64>> {
209        let result: u64;
210        if let SeekFrom::Current(n) = pos {
211            let remainder = (self.cap - self.pos) as i64;
212            // it should be safe to assume that remainder fits within an i64 as the alternative
213            // means we managed to allocate 8 exbibytes and that's absurd.
214            // But it's not out of the realm of possibility for some weird underlying reader to
215            // support seeking by i64::min_value() so we need to handle underflow when subtracting
216            // remainder.
217            if let Some(offset) = n.checked_sub(remainder) {
218                result =
219                    ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(offset)))?;
220            } else {
221                // seek backwards by our remainder, and then by the offset
222                ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(-remainder)))?;
223                self.as_mut().discard_buffer();
224                result = ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(n)))?;
225            }
226        } else {
227            // Seeking with Start/End doesn't care about our buffer length.
228            result = ready!(self.as_mut().project().inner.poll_seek(cx, pos))?;
229        }
230        self.discard_buffer();
231        Poll::Ready(Ok(result))
232    }
233}
234
235/// Future for the [`BufReader::seek_relative`](self::BufReader::seek_relative) method.
236#[derive(Debug)]
237#[must_use = "futures do nothing unless polled"]
238pub struct SeeKRelative<'a, R> {
239    inner: Pin<&'a mut BufReader<R>>,
240    offset: i64,
241    first: bool,
242}
243
244impl<R> Future for SeeKRelative<'_, R>
245where
246    R: AsyncRead + AsyncSeek,
247{
248    type Output = io::Result<()>;
249
250    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
251        let offset = self.offset;
252        if self.first {
253            self.first = false;
254            self.inner.as_mut().poll_seek_relative(cx, offset)
255        } else {
256            self.inner
257                .as_mut()
258                .as_mut()
259                .poll_seek(cx, SeekFrom::Current(offset))
260                .map(|res| res.map(|_| ()))
261        }
262    }
263}