futures_util/io/
mod.rs

1//! Asynchronous I/O.
2//!
3//! This module is the asynchronous version of `std::io`. It defines four
4//! traits, [`AsyncRead`], [`AsyncWrite`], [`AsyncSeek`], and [`AsyncBufRead`],
5//! which mirror the `Read`, `Write`, `Seek`, and `BufRead` traits of the
6//! standard library. However, these traits integrate with the asynchronous
7//! task system, so that if an I/O object isn't ready for reading (or writing),
8//! the thread is not blocked, and instead the current task is queued to be
9//! woken when I/O is ready.
10//!
11//! In addition, the [`AsyncReadExt`], [`AsyncWriteExt`], [`AsyncSeekExt`], and
12//! [`AsyncBufReadExt`] extension traits offer a variety of useful combinators
13//! for operating with asynchronous I/O objects, including ways to work with
14//! them using futures, streams and sinks.
15//!
16//! This module is only available when the `std` feature of this
17//! library is activated, and it is activated by default.
18
19#[cfg(feature = "io-compat")]
20#[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
21use crate::compat::Compat;
22use crate::future::assert_future;
23use crate::stream::assert_stream;
24use std::{pin::Pin, ptr};
25
26// Re-export some types from `std::io` so that users don't have to deal
27// with conflicts when `use`ing `futures::io` and `std::io`.
28#[doc(no_inline)]
29pub use std::io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom};
30
31pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
32
33// used by `BufReader` and `BufWriter`
34// https://github.com/rust-lang/rust/blob/master/src/libstd/sys_common/io.rs#L1
35const DEFAULT_BUF_SIZE: usize = 8 * 1024;
36
37/// Initializes a buffer if necessary.
38///
39/// A buffer is currently always initialized.
40#[inline]
41unsafe fn initialize<R: AsyncRead>(_reader: &R, buf: &mut [u8]) {
42    ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len())
43}
44
45mod allow_std;
46pub use self::allow_std::AllowStdIo;
47
48mod buf_reader;
49pub use self::buf_reader::{BufReader, SeeKRelative};
50
51mod buf_writer;
52pub use self::buf_writer::BufWriter;
53
54mod line_writer;
55pub use self::line_writer::LineWriter;
56
57mod chain;
58pub use self::chain::Chain;
59
60mod close;
61pub use self::close::Close;
62
63mod copy;
64pub use self::copy::{copy, Copy};
65
66mod copy_buf;
67pub use self::copy_buf::{copy_buf, CopyBuf};
68
69mod copy_buf_abortable;
70pub use self::copy_buf_abortable::{copy_buf_abortable, CopyBufAbortable};
71
72mod cursor;
73pub use self::cursor::Cursor;
74
75mod empty;
76pub use self::empty::{empty, Empty};
77
78mod fill_buf;
79pub use self::fill_buf::FillBuf;
80
81mod flush;
82pub use self::flush::Flush;
83
84#[cfg(feature = "sink")]
85#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
86mod into_sink;
87#[cfg(feature = "sink")]
88#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
89pub use self::into_sink::IntoSink;
90
91mod lines;
92pub use self::lines::Lines;
93
94mod read;
95pub use self::read::Read;
96
97mod read_vectored;
98pub use self::read_vectored::ReadVectored;
99
100mod read_exact;
101pub use self::read_exact::ReadExact;
102
103mod read_line;
104pub use self::read_line::ReadLine;
105
106mod read_to_end;
107pub use self::read_to_end::ReadToEnd;
108
109mod read_to_string;
110pub use self::read_to_string::ReadToString;
111
112mod read_until;
113pub use self::read_until::ReadUntil;
114
115mod repeat;
116pub use self::repeat::{repeat, Repeat};
117
118mod seek;
119pub use self::seek::Seek;
120
121mod sink;
122pub use self::sink::{sink, Sink};
123
124mod split;
125pub use self::split::{ReadHalf, ReuniteError, WriteHalf};
126
127mod take;
128pub use self::take::Take;
129
130mod window;
131pub use self::window::Window;
132
133mod write;
134pub use self::write::Write;
135
136mod write_vectored;
137pub use self::write_vectored::WriteVectored;
138
139mod write_all;
140pub use self::write_all::WriteAll;
141
142#[cfg(feature = "write-all-vectored")]
143mod write_all_vectored;
144#[cfg(feature = "write-all-vectored")]
145pub use self::write_all_vectored::WriteAllVectored;
146
147/// An extension trait which adds utility methods to `AsyncRead` types.
148pub trait AsyncReadExt: AsyncRead {
149    /// Creates an adaptor which will chain this stream with another.
150    ///
151    /// The returned `AsyncRead` instance will first read all bytes from this object
152    /// until EOF is encountered. Afterwards the output is equivalent to the
153    /// output of `next`.
154    ///
155    /// # Examples
156    ///
157    /// ```
158    /// # futures::executor::block_on(async {
159    /// use futures::io::{AsyncReadExt, Cursor};
160    ///
161    /// let reader1 = Cursor::new([1, 2, 3, 4]);
162    /// let reader2 = Cursor::new([5, 6, 7, 8]);
163    ///
164    /// let mut reader = reader1.chain(reader2);
165    /// let mut buffer = Vec::new();
166    ///
167    /// // read the value into a Vec.
168    /// reader.read_to_end(&mut buffer).await?;
169    /// assert_eq!(buffer, [1, 2, 3, 4, 5, 6, 7, 8]);
170    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
171    /// ```
172    fn chain<R>(self, next: R) -> Chain<Self, R>
173    where
174        Self: Sized,
175        R: AsyncRead,
176    {
177        assert_read(Chain::new(self, next))
178    }
179
180    /// Tries to read some bytes directly into the given `buf` in asynchronous
181    /// manner, returning a future type.
182    ///
183    /// The returned future will resolve to the number of bytes read once the read
184    /// operation is completed.
185    ///
186    /// # Examples
187    ///
188    /// ```
189    /// # futures::executor::block_on(async {
190    /// use futures::io::{AsyncReadExt, Cursor};
191    ///
192    /// let mut reader = Cursor::new([1, 2, 3, 4]);
193    /// let mut output = [0u8; 5];
194    ///
195    /// let bytes = reader.read(&mut output[..]).await?;
196    ///
197    /// // This is only guaranteed to be 4 because `&[u8]` is a synchronous
198    /// // reader. In a real system you could get anywhere from 1 to
199    /// // `output.len()` bytes in a single read.
200    /// assert_eq!(bytes, 4);
201    /// assert_eq!(output, [1, 2, 3, 4, 0]);
202    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
203    /// ```
204    fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Read<'a, Self>
205    where
206        Self: Unpin,
207    {
208        assert_future::<Result<usize>, _>(Read::new(self, buf))
209    }
210
211    /// Creates a future which will read from the `AsyncRead` into `bufs` using vectored
212    /// IO operations.
213    ///
214    /// The returned future will resolve to the number of bytes read once the read
215    /// operation is completed.
216    fn read_vectored<'a>(&'a mut self, bufs: &'a mut [IoSliceMut<'a>]) -> ReadVectored<'a, Self>
217    where
218        Self: Unpin,
219    {
220        assert_future::<Result<usize>, _>(ReadVectored::new(self, bufs))
221    }
222
223    /// Creates a future which will read exactly enough bytes to fill `buf`,
224    /// returning an error if end of file (EOF) is hit sooner.
225    ///
226    /// The returned future will resolve once the read operation is completed.
227    ///
228    /// In the case of an error the buffer and the object will be discarded, with
229    /// the error yielded.
230    ///
231    /// # Examples
232    ///
233    /// ```
234    /// # futures::executor::block_on(async {
235    /// use futures::io::{AsyncReadExt, Cursor};
236    ///
237    /// let mut reader = Cursor::new([1, 2, 3, 4]);
238    /// let mut output = [0u8; 4];
239    ///
240    /// reader.read_exact(&mut output).await?;
241    ///
242    /// assert_eq!(output, [1, 2, 3, 4]);
243    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
244    /// ```
245    ///
246    /// ## EOF is hit before `buf` is filled
247    ///
248    /// ```
249    /// # futures::executor::block_on(async {
250    /// use futures::io::{self, AsyncReadExt, Cursor};
251    ///
252    /// let mut reader = Cursor::new([1, 2, 3, 4]);
253    /// let mut output = [0u8; 5];
254    ///
255    /// let result = reader.read_exact(&mut output).await;
256    ///
257    /// assert_eq!(result.unwrap_err().kind(), io::ErrorKind::UnexpectedEof);
258    /// # });
259    /// ```
260    fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExact<'a, Self>
261    where
262        Self: Unpin,
263    {
264        assert_future::<Result<()>, _>(ReadExact::new(self, buf))
265    }
266
267    /// Creates a future which will read all the bytes from this `AsyncRead`.
268    ///
269    /// On success the total number of bytes read is returned.
270    ///
271    /// # Examples
272    ///
273    /// ```
274    /// # futures::executor::block_on(async {
275    /// use futures::io::{AsyncReadExt, Cursor};
276    ///
277    /// let mut reader = Cursor::new([1, 2, 3, 4]);
278    /// let mut output = Vec::with_capacity(4);
279    ///
280    /// let bytes = reader.read_to_end(&mut output).await?;
281    ///
282    /// assert_eq!(bytes, 4);
283    /// assert_eq!(output, vec![1, 2, 3, 4]);
284    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
285    /// ```
286    fn read_to_end<'a>(&'a mut self, buf: &'a mut Vec<u8>) -> ReadToEnd<'a, Self>
287    where
288        Self: Unpin,
289    {
290        assert_future::<Result<usize>, _>(ReadToEnd::new(self, buf))
291    }
292
293    /// Creates a future which will read all the bytes from this `AsyncRead`.
294    ///
295    /// On success the total number of bytes read is returned.
296    ///
297    /// # Examples
298    ///
299    /// ```
300    /// # futures::executor::block_on(async {
301    /// use futures::io::{AsyncReadExt, Cursor};
302    ///
303    /// let mut reader = Cursor::new(&b"1234"[..]);
304    /// let mut buffer = String::with_capacity(4);
305    ///
306    /// let bytes = reader.read_to_string(&mut buffer).await?;
307    ///
308    /// assert_eq!(bytes, 4);
309    /// assert_eq!(buffer, String::from("1234"));
310    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
311    /// ```
312    fn read_to_string<'a>(&'a mut self, buf: &'a mut String) -> ReadToString<'a, Self>
313    where
314        Self: Unpin,
315    {
316        assert_future::<Result<usize>, _>(ReadToString::new(self, buf))
317    }
318
319    /// Helper method for splitting this read/write object into two halves.
320    ///
321    /// The two halves returned implement the `AsyncRead` and `AsyncWrite`
322    /// traits, respectively.
323    ///
324    /// # Examples
325    ///
326    /// ```
327    /// # futures::executor::block_on(async {
328    /// use futures::io::{self, AsyncReadExt, Cursor};
329    ///
330    /// // Note that for `Cursor` the read and write halves share a single
331    /// // seek position. This may or may not be true for other types that
332    /// // implement both `AsyncRead` and `AsyncWrite`.
333    ///
334    /// let reader = Cursor::new([1, 2, 3, 4]);
335    /// let mut buffer = Cursor::new(vec![0, 0, 0, 0, 5, 6, 7, 8]);
336    /// let mut writer = Cursor::new(vec![0u8; 5]);
337    ///
338    /// {
339    ///     let (buffer_reader, mut buffer_writer) = (&mut buffer).split();
340    ///     io::copy(reader, &mut buffer_writer).await?;
341    ///     io::copy(buffer_reader, &mut writer).await?;
342    /// }
343    ///
344    /// assert_eq!(buffer.into_inner(), [1, 2, 3, 4, 5, 6, 7, 8]);
345    /// assert_eq!(writer.into_inner(), [5, 6, 7, 8, 0]);
346    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
347    /// ```
348    fn split(self) -> (ReadHalf<Self>, WriteHalf<Self>)
349    where
350        Self: AsyncWrite + Sized,
351    {
352        let (r, w) = split::split(self);
353        (assert_read(r), assert_write(w))
354    }
355
356    /// Creates an AsyncRead adapter which will read at most `limit` bytes
357    /// from the underlying reader.
358    ///
359    /// # Examples
360    ///
361    /// ```
362    /// # futures::executor::block_on(async {
363    /// use futures::io::{AsyncReadExt, Cursor};
364    ///
365    /// let reader = Cursor::new(&b"12345678"[..]);
366    /// let mut buffer = [0; 5];
367    ///
368    /// let mut take = reader.take(4);
369    /// let n = take.read(&mut buffer).await?;
370    ///
371    /// assert_eq!(n, 4);
372    /// assert_eq!(&buffer, b"1234\0");
373    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
374    /// ```
375    fn take(self, limit: u64) -> Take<Self>
376    where
377        Self: Sized,
378    {
379        assert_read(Take::new(self, limit))
380    }
381
382    /// Wraps an [`AsyncRead`] in a compatibility wrapper that allows it to be
383    /// used as a futures 0.1 / tokio-io 0.1 `AsyncRead`. If the wrapped type
384    /// implements [`AsyncWrite`] as well, the result will also implement the
385    /// futures 0.1 / tokio 0.1 `AsyncWrite` trait.
386    ///
387    /// Requires the `io-compat` feature to enable.
388    #[cfg(feature = "io-compat")]
389    #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
390    fn compat(self) -> Compat<Self>
391    where
392        Self: Sized + Unpin,
393    {
394        Compat::new(self)
395    }
396}
397
398impl<R: AsyncRead + ?Sized> AsyncReadExt for R {}
399
400/// An extension trait which adds utility methods to `AsyncWrite` types.
401pub trait AsyncWriteExt: AsyncWrite {
402    /// Creates a future which will entirely flush this `AsyncWrite`.
403    ///
404    /// # Examples
405    ///
406    /// ```
407    /// # futures::executor::block_on(async {
408    /// use futures::io::{AllowStdIo, AsyncWriteExt};
409    /// use std::io::{BufWriter, Cursor};
410    ///
411    /// let mut output = vec![0u8; 5];
412    ///
413    /// {
414    ///     let writer = Cursor::new(&mut output);
415    ///     let mut buffered = AllowStdIo::new(BufWriter::new(writer));
416    ///     buffered.write_all(&[1, 2]).await?;
417    ///     buffered.write_all(&[3, 4]).await?;
418    ///     buffered.flush().await?;
419    /// }
420    ///
421    /// assert_eq!(output, [1, 2, 3, 4, 0]);
422    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
423    /// ```
424    fn flush(&mut self) -> Flush<'_, Self>
425    where
426        Self: Unpin,
427    {
428        assert_future::<Result<()>, _>(Flush::new(self))
429    }
430
431    /// Creates a future which will entirely close this `AsyncWrite`.
432    fn close(&mut self) -> Close<'_, Self>
433    where
434        Self: Unpin,
435    {
436        assert_future::<Result<()>, _>(Close::new(self))
437    }
438
439    /// Creates a future which will write bytes from `buf` into the object.
440    ///
441    /// The returned future will resolve to the number of bytes written once the write
442    /// operation is completed.
443    fn write<'a>(&'a mut self, buf: &'a [u8]) -> Write<'a, Self>
444    where
445        Self: Unpin,
446    {
447        assert_future::<Result<usize>, _>(Write::new(self, buf))
448    }
449
450    /// Creates a future which will write bytes from `bufs` into the object using vectored
451    /// IO operations.
452    ///
453    /// The returned future will resolve to the number of bytes written once the write
454    /// operation is completed.
455    fn write_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'a>]) -> WriteVectored<'a, Self>
456    where
457        Self: Unpin,
458    {
459        assert_future::<Result<usize>, _>(WriteVectored::new(self, bufs))
460    }
461
462    /// Write data into this object.
463    ///
464    /// Creates a future that will write the entire contents of the buffer `buf` into
465    /// this `AsyncWrite`.
466    ///
467    /// The returned future will not complete until all the data has been written.
468    ///
469    /// # Examples
470    ///
471    /// ```
472    /// # futures::executor::block_on(async {
473    /// use futures::io::{AsyncWriteExt, Cursor};
474    ///
475    /// let mut writer = Cursor::new(vec![0u8; 5]);
476    ///
477    /// writer.write_all(&[1, 2, 3, 4]).await?;
478    ///
479    /// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]);
480    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
481    /// ```
482    fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAll<'a, Self>
483    where
484        Self: Unpin,
485    {
486        assert_future::<Result<()>, _>(WriteAll::new(self, buf))
487    }
488
489    /// Attempts to write multiple buffers into this writer.
490    ///
491    /// Creates a future that will write the entire contents of `bufs` into this
492    /// `AsyncWrite` using [vectored writes].
493    ///
494    /// The returned future will not complete until all the data has been
495    /// written.
496    ///
497    /// [vectored writes]: std::io::Write::write_vectored
498    ///
499    /// # Notes
500    ///
501    /// Unlike `io::Write::write_vectored`, this takes a *mutable* reference to
502    /// a slice of `IoSlice`s, not an immutable one. That's because we need to
503    /// modify the slice to keep track of the bytes already written.
504    ///
505    /// Once this futures returns, the contents of `bufs` are unspecified, as
506    /// this depends on how many calls to `write_vectored` were necessary. It is
507    /// best to understand this function as taking ownership of `bufs` and to
508    /// not use `bufs` afterwards. The underlying buffers, to which the
509    /// `IoSlice`s point (but not the `IoSlice`s themselves), are unchanged and
510    /// can be reused.
511    ///
512    /// # Examples
513    ///
514    /// ```
515    /// # futures::executor::block_on(async {
516    /// use futures::io::AsyncWriteExt;
517    /// use futures_util::io::Cursor;
518    /// use std::io::IoSlice;
519    ///
520    /// let mut writer = Cursor::new(Vec::new());
521    /// let bufs = &mut [
522    ///     IoSlice::new(&[1]),
523    ///     IoSlice::new(&[2, 3]),
524    ///     IoSlice::new(&[4, 5, 6]),
525    /// ];
526    ///
527    /// writer.write_all_vectored(bufs).await?;
528    /// // Note: the contents of `bufs` is now unspecified, see the Notes section.
529    ///
530    /// assert_eq!(writer.into_inner(), &[1, 2, 3, 4, 5, 6]);
531    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
532    /// ```
533    #[cfg(feature = "write-all-vectored")]
534    fn write_all_vectored<'a>(
535        &'a mut self,
536        bufs: &'a mut [IoSlice<'a>],
537    ) -> WriteAllVectored<'a, Self>
538    where
539        Self: Unpin,
540    {
541        assert_future::<Result<()>, _>(WriteAllVectored::new(self, bufs))
542    }
543
544    /// Wraps an [`AsyncWrite`] in a compatibility wrapper that allows it to be
545    /// used as a futures 0.1 / tokio-io 0.1 `AsyncWrite`.
546    /// Requires the `io-compat` feature to enable.
547    #[cfg(feature = "io-compat")]
548    #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
549    fn compat_write(self) -> Compat<Self>
550    where
551        Self: Sized + Unpin,
552    {
553        Compat::new(self)
554    }
555
556    /// Allow using an [`AsyncWrite`] as a [`Sink`](futures_sink::Sink)`<Item: AsRef<[u8]>>`.
557    ///
558    /// This adapter produces a sink that will write each value passed to it
559    /// into the underlying writer.
560    ///
561    /// Note that this function consumes the given writer, returning a wrapped
562    /// version.
563    ///
564    /// # Examples
565    ///
566    /// ```
567    /// # futures::executor::block_on(async {
568    /// use futures::io::AsyncWriteExt;
569    /// use futures::stream::{self, StreamExt};
570    ///
571    /// let stream = stream::iter(vec![Ok([1, 2, 3]), Ok([4, 5, 6])]);
572    ///
573    /// let mut writer = vec![];
574    ///
575    /// stream.forward((&mut writer).into_sink()).await?;
576    ///
577    /// assert_eq!(writer, vec![1, 2, 3, 4, 5, 6]);
578    /// # Ok::<(), Box<dyn std::error::Error>>(())
579    /// # })?;
580    /// # Ok::<(), Box<dyn std::error::Error>>(())
581    /// ```
582    #[cfg(feature = "sink")]
583    #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
584    fn into_sink<Item: AsRef<[u8]>>(self) -> IntoSink<Self, Item>
585    where
586        Self: Sized,
587    {
588        crate::sink::assert_sink::<Item, Error, _>(IntoSink::new(self))
589    }
590}
591
592impl<W: AsyncWrite + ?Sized> AsyncWriteExt for W {}
593
594/// An extension trait which adds utility methods to `AsyncSeek` types.
595pub trait AsyncSeekExt: AsyncSeek {
596    /// Creates a future which will seek an IO object, and then yield the
597    /// new position in the object and the object itself.
598    ///
599    /// In the case of an error the buffer and the object will be discarded, with
600    /// the error yielded.
601    fn seek(&mut self, pos: SeekFrom) -> Seek<'_, Self>
602    where
603        Self: Unpin,
604    {
605        assert_future::<Result<u64>, _>(Seek::new(self, pos))
606    }
607
608    /// Creates a future which will return the current seek position from the
609    /// start of the stream.
610    ///
611    /// This is equivalent to `self.seek(SeekFrom::Current(0))`.
612    fn stream_position(&mut self) -> Seek<'_, Self>
613    where
614        Self: Unpin,
615    {
616        self.seek(SeekFrom::Current(0))
617    }
618}
619
620impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {}
621
622/// An extension trait which adds utility methods to `AsyncBufRead` types.
623pub trait AsyncBufReadExt: AsyncBufRead {
624    /// Creates a future which will wait for a non-empty buffer to be available from this I/O
625    /// object or EOF to be reached.
626    ///
627    /// This method is the async equivalent to [`BufRead::fill_buf`](std::io::BufRead::fill_buf).
628    ///
629    /// ```rust
630    /// # futures::executor::block_on(async {
631    /// use futures::{io::AsyncBufReadExt as _, stream::{iter, TryStreamExt as _}};
632    ///
633    /// let mut stream = iter(vec![Ok(vec![1, 2, 3]), Ok(vec![4, 5, 6])]).into_async_read();
634    ///
635    /// assert_eq!(stream.fill_buf().await?, vec![1, 2, 3]);
636    /// stream.consume_unpin(2);
637    ///
638    /// assert_eq!(stream.fill_buf().await?, vec![3]);
639    /// stream.consume_unpin(1);
640    ///
641    /// assert_eq!(stream.fill_buf().await?, vec![4, 5, 6]);
642    /// stream.consume_unpin(3);
643    ///
644    /// assert_eq!(stream.fill_buf().await?, vec![]);
645    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
646    /// ```
647    fn fill_buf(&mut self) -> FillBuf<'_, Self>
648    where
649        Self: Unpin,
650    {
651        assert_future::<Result<&[u8]>, _>(FillBuf::new(self))
652    }
653
654    /// A convenience for calling [`AsyncBufRead::consume`] on [`Unpin`] IO types.
655    ///
656    /// ```rust
657    /// # futures::executor::block_on(async {
658    /// use futures::{io::AsyncBufReadExt as _, stream::{iter, TryStreamExt as _}};
659    ///
660    /// let mut stream = iter(vec![Ok(vec![1, 2, 3])]).into_async_read();
661    ///
662    /// assert_eq!(stream.fill_buf().await?, vec![1, 2, 3]);
663    /// stream.consume_unpin(2);
664    ///
665    /// assert_eq!(stream.fill_buf().await?, vec![3]);
666    /// stream.consume_unpin(1);
667    ///
668    /// assert_eq!(stream.fill_buf().await?, vec![]);
669    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
670    /// ```
671    fn consume_unpin(&mut self, amt: usize)
672    where
673        Self: Unpin,
674    {
675        Pin::new(self).consume(amt)
676    }
677
678    /// Creates a future which will read all the bytes associated with this I/O
679    /// object into `buf` until the delimiter `byte` or EOF is reached.
680    /// This method is the async equivalent to [`BufRead::read_until`](std::io::BufRead::read_until).
681    ///
682    /// This function will read bytes from the underlying stream until the
683    /// delimiter or EOF is found. Once found, all bytes up to, and including,
684    /// the delimiter (if found) will be appended to `buf`.
685    ///
686    /// The returned future will resolve to the number of bytes read once the read
687    /// operation is completed.
688    ///
689    /// In the case of an error the buffer and the object will be discarded, with
690    /// the error yielded.
691    ///
692    /// # Examples
693    ///
694    /// ```
695    /// # futures::executor::block_on(async {
696    /// use futures::io::{AsyncBufReadExt, Cursor};
697    ///
698    /// let mut cursor = Cursor::new(b"lorem-ipsum");
699    /// let mut buf = vec![];
700    ///
701    /// // cursor is at 'l'
702    /// let num_bytes = cursor.read_until(b'-', &mut buf).await?;
703    /// assert_eq!(num_bytes, 6);
704    /// assert_eq!(buf, b"lorem-");
705    /// buf.clear();
706    ///
707    /// // cursor is at 'i'
708    /// let num_bytes = cursor.read_until(b'-', &mut buf).await?;
709    /// assert_eq!(num_bytes, 5);
710    /// assert_eq!(buf, b"ipsum");
711    /// buf.clear();
712    ///
713    /// // cursor is at EOF
714    /// let num_bytes = cursor.read_until(b'-', &mut buf).await?;
715    /// assert_eq!(num_bytes, 0);
716    /// assert_eq!(buf, b"");
717    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
718    /// ```
719    fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> ReadUntil<'a, Self>
720    where
721        Self: Unpin,
722    {
723        assert_future::<Result<usize>, _>(ReadUntil::new(self, byte, buf))
724    }
725
726    /// Creates a future which will read all the bytes associated with this I/O
727    /// object into `buf` until a newline (the 0xA byte) or EOF is reached,
728    /// This method is the async equivalent to [`BufRead::read_line`](std::io::BufRead::read_line).
729    ///
730    /// This function will read bytes from the underlying stream until the
731    /// newline delimiter (the 0xA byte) or EOF is found. Once found, all bytes
732    /// up to, and including, the delimiter (if found) will be appended to
733    /// `buf`.
734    ///
735    /// The returned future will resolve to the number of bytes read once the read
736    /// operation is completed.
737    ///
738    /// In the case of an error the buffer and the object will be discarded, with
739    /// the error yielded.
740    ///
741    /// # Errors
742    ///
743    /// This function has the same error semantics as [`read_until`] and will
744    /// also return an error if the read bytes are not valid UTF-8. If an I/O
745    /// error is encountered then `buf` may contain some bytes already read in
746    /// the event that all data read so far was valid UTF-8.
747    ///
748    /// [`read_until`]: AsyncBufReadExt::read_until
749    ///
750    /// # Examples
751    ///
752    /// ```
753    /// # futures::executor::block_on(async {
754    /// use futures::io::{AsyncBufReadExt, Cursor};
755    ///
756    /// let mut cursor = Cursor::new(b"foo\nbar");
757    /// let mut buf = String::new();
758    ///
759    /// // cursor is at 'f'
760    /// let num_bytes = cursor.read_line(&mut buf).await?;
761    /// assert_eq!(num_bytes, 4);
762    /// assert_eq!(buf, "foo\n");
763    /// buf.clear();
764    ///
765    /// // cursor is at 'b'
766    /// let num_bytes = cursor.read_line(&mut buf).await?;
767    /// assert_eq!(num_bytes, 3);
768    /// assert_eq!(buf, "bar");
769    /// buf.clear();
770    ///
771    /// // cursor is at EOF
772    /// let num_bytes = cursor.read_line(&mut buf).await?;
773    /// assert_eq!(num_bytes, 0);
774    /// assert_eq!(buf, "");
775    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
776    /// ```
777    fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self>
778    where
779        Self: Unpin,
780    {
781        assert_future::<Result<usize>, _>(ReadLine::new(self, buf))
782    }
783
784    /// Returns a stream over the lines of this reader.
785    /// This method is the async equivalent to [`BufRead::lines`](std::io::BufRead::lines).
786    ///
787    /// The stream returned from this function will yield instances of
788    /// [`io::Result`]`<`[`String`]`>`. Each string returned will *not* have a newline
789    /// byte (the 0xA byte) or CRLF (0xD, 0xA bytes) at the end.
790    ///
791    /// [`io::Result`]: std::io::Result
792    /// [`String`]: String
793    ///
794    /// # Errors
795    ///
796    /// Each line of the stream has the same error semantics as [`AsyncBufReadExt::read_line`].
797    ///
798    /// [`AsyncBufReadExt::read_line`]: AsyncBufReadExt::read_line
799    ///
800    /// # Examples
801    ///
802    /// ```
803    /// # futures::executor::block_on(async {
804    /// use futures::io::{AsyncBufReadExt, Cursor};
805    /// use futures::stream::StreamExt;
806    ///
807    /// let cursor = Cursor::new(b"lorem\nipsum\xc2\r\ndolor");
808    ///
809    /// let mut lines_stream = cursor.lines().map(|l| l.unwrap_or(String::from("invalid UTF_8")));
810    /// assert_eq!(lines_stream.next().await, Some(String::from("lorem")));
811    /// assert_eq!(lines_stream.next().await, Some(String::from("invalid UTF_8")));
812    /// assert_eq!(lines_stream.next().await, Some(String::from("dolor")));
813    /// assert_eq!(lines_stream.next().await, None);
814    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
815    /// ```
816    fn lines(self) -> Lines<Self>
817    where
818        Self: Sized,
819    {
820        assert_stream::<Result<String>, _>(Lines::new(self))
821    }
822}
823
824impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}
825
826// Just a helper function to ensure the reader we're returning all have the
827// right implementations.
828pub(crate) fn assert_read<R>(reader: R) -> R
829where
830    R: AsyncRead,
831{
832    reader
833}
834// Just a helper function to ensure the writer we're returning all have the
835// right implementations.
836pub(crate) fn assert_write<W>(writer: W) -> W
837where
838    W: AsyncWrite,
839{
840    writer
841}