1use super::DEFAULT_BUF_SIZE;
2use futures_core::future::Future;
3use futures_core::ready;
4use futures_core::task::{Context, Poll};
5use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSliceMut, SeekFrom};
6use pin_project_lite::pin_project;
7use std::io::{self, Read};
8use std::pin::Pin;
9use std::{cmp, fmt};
1011pin_project! {
12/// The `BufReader` struct adds buffering to any reader.
13 ///
14 /// It can be excessively inefficient to work directly with a [`AsyncRead`]
15 /// instance. A `BufReader` performs large, infrequent reads on the underlying
16 /// [`AsyncRead`] and maintains an in-memory buffer of the results.
17 ///
18 /// `BufReader` can improve the speed of programs that make *small* and
19 /// *repeated* read calls to the same file or network socket. It does not
20 /// help when reading very large amounts at once, or reading just one or a few
21 /// times. It also provides no advantage when reading from a source that is
22 /// already in memory, like a `Vec<u8>`.
23 ///
24 /// When the `BufReader` is dropped, the contents of its buffer will be
25 /// discarded. Creating multiple instances of a `BufReader` on the same
26 /// stream can cause data loss.
27 ///
28 /// [`AsyncRead`]: futures_io::AsyncRead
29 ///
30// TODO: Examples
31pub struct BufReader<R> {
32#[pin]
33inner: R,
34 buffer: Box<[u8]>,
35 pos: usize,
36 cap: usize,
37 }
38}
3940impl<R: AsyncRead> BufReader<R> {
41/// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB,
42 /// but may change in the future.
43pub fn new(inner: R) -> Self {
44Self::with_capacity(DEFAULT_BUF_SIZE, inner)
45 }
4647/// Creates a new `BufReader` with the specified buffer capacity.
48pub fn with_capacity(capacity: usize, inner: R) -> Self {
49unsafe {
50let mut buffer = Vec::with_capacity(capacity);
51 buffer.set_len(capacity);
52super::initialize(&inner, &mut buffer);
53Self { inner, buffer: buffer.into_boxed_slice(), pos: 0, cap: 0 }
54 }
55 }
5657delegate_access_inner!(inner, R, ());
5859/// Returns a reference to the internally buffered data.
60 ///
61 /// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty.
62pub fn buffer(&self) -> &[u8] {
63&self.buffer[self.pos..self.cap]
64 }
6566/// Invalidates all data in the internal buffer.
67#[inline]
68fn discard_buffer(self: Pin<&mut Self>) {
69let this = self.project();
70*this.pos = 0;
71*this.cap = 0;
72 }
73}
7475impl<R: AsyncRead + AsyncSeek> BufReader<R> {
76/// Seeks relative to the current position. If the new position lies within the buffer,
77 /// the buffer will not be flushed, allowing for more efficient seeks.
78 /// This method does not return the location of the underlying reader, so the caller
79 /// must track this information themselves if it is required.
80pub fn seek_relative(self: Pin<&mut Self>, offset: i64) -> SeeKRelative<'_, R> {
81 SeeKRelative { inner: self, offset, first: true }
82 }
8384/// Attempts to seek relative to the current position. If the new position lies within the buffer,
85 /// the buffer will not be flushed, allowing for more efficient seeks.
86 /// This method does not return the location of the underlying reader, so the caller
87 /// must track this information themselves if it is required.
88pub fn poll_seek_relative(
89self: Pin<&mut Self>,
90 cx: &mut Context<'_>,
91 offset: i64,
92 ) -> Poll<io::Result<()>> {
93let pos = self.pos as u64;
94if offset < 0 {
95if let Some(new_pos) = pos.checked_sub((-offset) as u64) {
96*self.project().pos = new_pos as usize;
97return Poll::Ready(Ok(()));
98 }
99 } else if let Some(new_pos) = pos.checked_add(offset as u64) {
100if new_pos <= self.cap as u64 {
101*self.project().pos = new_pos as usize;
102return Poll::Ready(Ok(()));
103 }
104 }
105self.poll_seek(cx, SeekFrom::Current(offset)).map(|res| res.map(|_| ()))
106 }
107}
108109impl<R: AsyncRead> AsyncRead for BufReader<R> {
110fn poll_read(
111mut self: Pin<&mut Self>,
112 cx: &mut Context<'_>,
113 buf: &mut [u8],
114 ) -> Poll<io::Result<usize>> {
115// If we don't have any buffered data and we're doing a massive read
116 // (larger than our internal buffer), bypass our internal buffer
117 // entirely.
118if self.pos == self.cap && buf.len() >= self.buffer.len() {
119let res = ready!(self.as_mut().project().inner.poll_read(cx, buf));
120self.discard_buffer();
121return Poll::Ready(res);
122 }
123let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
124let nread = rem.read(buf)?;
125self.consume(nread);
126 Poll::Ready(Ok(nread))
127 }
128129fn poll_read_vectored(
130mut self: Pin<&mut Self>,
131 cx: &mut Context<'_>,
132 bufs: &mut [IoSliceMut<'_>],
133 ) -> Poll<io::Result<usize>> {
134let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
135if self.pos == self.cap && total_len >= self.buffer.len() {
136let res = ready!(self.as_mut().project().inner.poll_read_vectored(cx, bufs));
137self.discard_buffer();
138return Poll::Ready(res);
139 }
140let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
141let nread = rem.read_vectored(bufs)?;
142self.consume(nread);
143 Poll::Ready(Ok(nread))
144 }
145}
146147impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
148fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
149let this = self.project();
150151// If we've reached the end of our internal buffer then we need to fetch
152 // some more data from the underlying reader.
153 // Branch using `>=` instead of the more correct `==`
154 // to tell the compiler that the pos..cap slice is always valid.
155if *this.pos >= *this.cap {
156debug_assert!(*this.pos == *this.cap);
157*this.cap = ready!(this.inner.poll_read(cx, this.buffer))?;
158*this.pos = 0;
159 }
160 Poll::Ready(Ok(&this.buffer[*this.pos..*this.cap]))
161 }
162163fn consume(self: Pin<&mut Self>, amt: usize) {
164*self.project().pos = cmp::min(self.pos + amt, self.cap);
165 }
166}
167168impl<R: AsyncWrite> AsyncWrite for BufReader<R> {
169delegate_async_write!(inner);
170}
171172impl<R: fmt::Debug> fmt::Debug for BufReader<R> {
173fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
174 f.debug_struct("BufReader")
175 .field("reader", &self.inner)
176 .field("buffer", &format_args!("{}/{}", self.cap - self.pos, self.buffer.len()))
177 .finish()
178 }
179}
180181impl<R: AsyncRead + AsyncSeek> AsyncSeek for BufReader<R> {
182/// Seek to an offset, in bytes, in the underlying reader.
183 ///
184 /// The position used for seeking with `SeekFrom::Current(_)` is the
185 /// position the underlying reader would be at if the `BufReader` had no
186 /// internal buffer.
187 ///
188 /// Seeking always discards the internal buffer, even if the seek position
189 /// would otherwise fall within it. This guarantees that calling
190 /// `.into_inner()` immediately after a seek yields the underlying reader
191 /// at the same position.
192 ///
193 /// To seek without discarding the internal buffer, use
194 /// [`BufReader::seek_relative`](BufReader::seek_relative) or
195 /// [`BufReader::poll_seek_relative`](BufReader::poll_seek_relative).
196 ///
197 /// See [`AsyncSeek`](futures_io::AsyncSeek) for more details.
198 ///
199 /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)`
200 /// where `n` minus the internal buffer length overflows an `i64`, two
201 /// seeks will be performed instead of one. If the second seek returns
202 /// `Err`, the underlying reader will be left at the same position it would
203 /// have if you called `seek` with `SeekFrom::Current(0)`.
204fn poll_seek(
205mut self: Pin<&mut Self>,
206 cx: &mut Context<'_>,
207 pos: SeekFrom,
208 ) -> Poll<io::Result<u64>> {
209let result: u64;
210if let SeekFrom::Current(n) = pos {
211let remainder = (self.cap - self.pos) as i64;
212// it should be safe to assume that remainder fits within an i64 as the alternative
213 // means we managed to allocate 8 exbibytes and that's absurd.
214 // But it's not out of the realm of possibility for some weird underlying reader to
215 // support seeking by i64::min_value() so we need to handle underflow when subtracting
216 // remainder.
217if let Some(offset) = n.checked_sub(remainder) {
218 result =
219ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(offset)))?;
220 } else {
221// seek backwards by our remainder, and then by the offset
222ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(-remainder)))?;
223self.as_mut().discard_buffer();
224 result = ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(n)))?;
225 }
226 } else {
227// Seeking with Start/End doesn't care about our buffer length.
228result = ready!(self.as_mut().project().inner.poll_seek(cx, pos))?;
229 }
230self.discard_buffer();
231 Poll::Ready(Ok(result))
232 }
233}
234235/// Future for the [`BufReader::seek_relative`](self::BufReader::seek_relative) method.
236#[derive(Debug)]
237#[must_use = "futures do nothing unless polled"]
238pub struct SeeKRelative<'a, R> {
239 inner: Pin<&'a mut BufReader<R>>,
240 offset: i64,
241 first: bool,
242}
243244impl<R> Future for SeeKRelative<'_, R>
245where
246R: AsyncRead + AsyncSeek,
247{
248type Output = io::Result<()>;
249250fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
251let offset = self.offset;
252if self.first {
253self.first = false;
254self.inner.as_mut().poll_seek_relative(cx, offset)
255 } else {
256self.inner
257 .as_mut()
258 .as_mut()
259 .poll_seek(cx, SeekFrom::Current(offset))
260 .map(|res| res.map(|_| ()))
261 }
262 }
263}