futures_lite/
io.rs

1//! Tools and combinators for I/O.
2//!
3//! # Examples
4//!
5//! ```
6//! use futures_lite::io::{self, AsyncReadExt};
7//!
8//! # spin_on::spin_on(async {
9//! let input: &[u8] = b"hello";
10//! let mut reader = io::BufReader::new(input);
11//!
12//! let mut contents = String::new();
13//! reader.read_to_string(&mut contents).await?;
14//! # std::io::Result::Ok(()) });
15//! ```
16
17#[doc(no_inline)]
18pub use std::io::{Error, ErrorKind, Result, SeekFrom};
19
20#[doc(no_inline)]
21pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
22
23use std::cmp;
24use std::fmt;
25use std::future::Future;
26use std::io::{IoSlice, IoSliceMut};
27use std::mem;
28use std::pin::Pin;
29use std::sync::{Arc, Mutex};
30use std::task::{Context, Poll};
31
32use futures_core::stream::Stream;
33use pin_project_lite::pin_project;
34
35use crate::future;
36use crate::ready;
37
38const DEFAULT_BUF_SIZE: usize = 8 * 1024;
39
40/// Copies the entire contents of a reader into a writer.
41///
42/// This function will read data from `reader` and write it into `writer` in a streaming fashion
43/// until `reader` returns EOF.
44///
45/// On success, returns the total number of bytes copied.
46///
47/// # Examples
48///
49/// ```
50/// use futures_lite::io::{self, BufReader, BufWriter};
51///
52/// # spin_on::spin_on(async {
53/// let input: &[u8] = b"hello";
54/// let reader = BufReader::new(input);
55///
56/// let mut output = Vec::new();
57/// let writer = BufWriter::new(&mut output);
58///
59/// io::copy(reader, writer).await?;
60/// # std::io::Result::Ok(()) });
61/// ```
62pub async fn copy<R, W>(reader: R, writer: W) -> Result<u64>
63where
64    R: AsyncRead + Unpin,
65    W: AsyncWrite + Unpin,
66{
67    pin_project! {
68        struct CopyFuture<R, W> {
69            #[pin]
70            reader: R,
71            #[pin]
72            writer: W,
73            amt: u64,
74        }
75    }
76
77    impl<R, W> Future for CopyFuture<R, W>
78    where
79        R: AsyncBufRead,
80        W: AsyncWrite + Unpin,
81    {
82        type Output = Result<u64>;
83
84        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
85            let mut this = self.project();
86            loop {
87                let buffer = ready!(this.reader.as_mut().poll_fill_buf(cx))?;
88                if buffer.is_empty() {
89                    ready!(this.writer.as_mut().poll_flush(cx))?;
90                    return Poll::Ready(Ok(*this.amt));
91                }
92
93                let i = ready!(this.writer.as_mut().poll_write(cx, buffer))?;
94                if i == 0 {
95                    return Poll::Ready(Err(ErrorKind::WriteZero.into()));
96                }
97                *this.amt += i as u64;
98                this.reader.as_mut().consume(i);
99            }
100        }
101    }
102
103    let future = CopyFuture {
104        reader: BufReader::new(reader),
105        writer,
106        amt: 0,
107    };
108    future.await
109}
110
111/// Asserts that a type implementing [`std::io`] traits can be used as an async type.
112///
113/// The underlying I/O handle should never block nor return the [`ErrorKind::WouldBlock`] error.
114/// This is usually the case for in-memory buffered I/O.
115///
116/// # Examples
117///
118/// ```
119/// use futures_lite::io::{AssertAsync, AsyncReadExt};
120///
121/// let reader: &[u8] = b"hello";
122///
123/// # spin_on::spin_on(async {
124/// let mut async_reader = AssertAsync::new(reader);
125/// let mut contents = String::new();
126///
127/// // This line works in async manner - note that there is await:
128/// async_reader.read_to_string(&mut contents).await?;
129/// # std::io::Result::Ok(()) });
130/// ```
131#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
132pub struct AssertAsync<T>(T);
133
134impl<T> Unpin for AssertAsync<T> {}
135
136impl<T> AssertAsync<T> {
137    /// Wraps an I/O handle implementing [`std::io`] traits.
138    ///
139    /// # Examples
140    ///
141    /// ```
142    /// use futures_lite::io::AssertAsync;
143    ///
144    /// let reader: &[u8] = b"hello";
145    ///
146    /// let async_reader = AssertAsync::new(reader);
147    /// ```
148    pub fn new(io: T) -> Self {
149        AssertAsync(io)
150    }
151
152    /// Gets a reference to the inner I/O handle.
153    ///
154    /// # Examples
155    ///
156    /// ```
157    /// use futures_lite::io::AssertAsync;
158    ///
159    /// let reader: &[u8] = b"hello";
160    ///
161    /// let async_reader = AssertAsync::new(reader);
162    /// let r = async_reader.get_ref();
163    /// ```
164    pub fn get_ref(&self) -> &T {
165        &self.0
166    }
167
168    /// Gets a mutable reference to the inner I/O handle.
169    ///
170    /// # Examples
171    ///
172    /// ```
173    /// use futures_lite::io::AssertAsync;
174    ///
175    /// let reader: &[u8] = b"hello";
176    ///
177    /// let mut async_reader = AssertAsync::new(reader);
178    /// let r = async_reader.get_mut();
179    /// ```
180    pub fn get_mut(&mut self) -> &mut T {
181        &mut self.0
182    }
183
184    /// Extracts the inner I/O handle.
185    ///
186    /// # Examples
187    ///
188    /// ```
189    /// use futures_lite::io::AssertAsync;
190    ///
191    /// let reader: &[u8] = b"hello";
192    ///
193    /// let async_reader = AssertAsync::new(reader);
194    /// let inner = async_reader.into_inner();
195    /// ```
196    pub fn into_inner(self) -> T {
197        self.0
198    }
199}
200
201impl<T: std::io::Read> AsyncRead for AssertAsync<T> {
202    fn poll_read(
203        mut self: Pin<&mut Self>,
204        _: &mut Context<'_>,
205        buf: &mut [u8],
206    ) -> Poll<Result<usize>> {
207        loop {
208            match self.0.read(buf) {
209                Err(err) if err.kind() == ErrorKind::Interrupted => {}
210                res => return Poll::Ready(res),
211            }
212        }
213    }
214
215    fn poll_read_vectored(
216        mut self: Pin<&mut Self>,
217        _: &mut Context<'_>,
218        bufs: &mut [IoSliceMut<'_>],
219    ) -> Poll<Result<usize>> {
220        loop {
221            match self.0.read_vectored(bufs) {
222                Err(err) if err.kind() == ErrorKind::Interrupted => {}
223                res => return Poll::Ready(res),
224            }
225        }
226    }
227}
228
229impl<T: std::io::Write> AsyncWrite for AssertAsync<T> {
230    fn poll_write(
231        mut self: Pin<&mut Self>,
232        _: &mut Context<'_>,
233        buf: &[u8],
234    ) -> Poll<Result<usize>> {
235        loop {
236            match self.0.write(buf) {
237                Err(err) if err.kind() == ErrorKind::Interrupted => {}
238                res => return Poll::Ready(res),
239            }
240        }
241    }
242
243    fn poll_write_vectored(
244        mut self: Pin<&mut Self>,
245        _: &mut Context<'_>,
246        bufs: &[IoSlice<'_>],
247    ) -> Poll<Result<usize>> {
248        loop {
249            match self.0.write_vectored(bufs) {
250                Err(err) if err.kind() == ErrorKind::Interrupted => {}
251                res => return Poll::Ready(res),
252            }
253        }
254    }
255
256    fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
257        loop {
258            match self.0.flush() {
259                Err(err) if err.kind() == ErrorKind::Interrupted => {}
260                res => return Poll::Ready(res),
261            }
262        }
263    }
264
265    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
266        self.poll_flush(cx)
267    }
268}
269
270impl<T: std::io::Seek> AsyncSeek for AssertAsync<T> {
271    fn poll_seek(
272        mut self: Pin<&mut Self>,
273        _: &mut Context<'_>,
274        pos: SeekFrom,
275    ) -> Poll<Result<u64>> {
276        loop {
277            match self.0.seek(pos) {
278                Err(err) if err.kind() == ErrorKind::Interrupted => {}
279                res => return Poll::Ready(res),
280            }
281        }
282    }
283}
284
285/// Blocks on all async I/O operations and implements [`std::io`] traits.
286///
287/// Sometimes async I/O needs to be used in a blocking manner. If calling [`future::block_on()`]
288/// manually all the time becomes too tedious, use this type for more convenient blocking on async
289/// I/O operations.
290///
291/// This type implements traits [`Read`][`std::io::Read`], [`Write`][`std::io::Write`], or
292/// [`Seek`][`std::io::Seek`] if the inner type implements [`AsyncRead`], [`AsyncWrite`], or
293/// [`AsyncSeek`], respectively.
294///
295/// If writing data through the [`Write`][`std::io::Write`] trait, make sure to flush before
296/// dropping the [`BlockOn`] handle or some buffered data might get lost.
297///
298/// # Examples
299///
300/// ```
301/// use futures_lite::io::BlockOn;
302/// use futures_lite::pin;
303/// use std::io::Read;
304///
305/// let reader: &[u8] = b"hello";
306/// pin!(reader);
307///
308/// let mut blocking_reader = BlockOn::new(reader);
309/// let mut contents = String::new();
310///
311/// // This line blocks - note that there is no await:
312/// blocking_reader.read_to_string(&mut contents)?;
313/// # std::io::Result::Ok(())
314/// ```
315#[derive(Debug)]
316pub struct BlockOn<T>(T);
317
318impl<T> BlockOn<T> {
319    /// Wraps an async I/O handle into a blocking interface.
320    ///
321    /// # Examples
322    ///
323    /// ```
324    /// use futures_lite::io::BlockOn;
325    /// use futures_lite::pin;
326    ///
327    /// let reader: &[u8] = b"hello";
328    /// pin!(reader);
329    ///
330    /// let blocking_reader = BlockOn::new(reader);
331    /// ```
332    pub fn new(io: T) -> BlockOn<T> {
333        BlockOn(io)
334    }
335
336    /// Gets a reference to the async I/O handle.
337    ///
338    /// # Examples
339    ///
340    /// ```
341    /// use futures_lite::io::BlockOn;
342    /// use futures_lite::pin;
343    ///
344    /// let reader: &[u8] = b"hello";
345    /// pin!(reader);
346    ///
347    /// let blocking_reader = BlockOn::new(reader);
348    /// let r = blocking_reader.get_ref();
349    /// ```
350    pub fn get_ref(&self) -> &T {
351        &self.0
352    }
353
354    /// Gets a mutable reference to the async I/O handle.
355    ///
356    /// # Examples
357    ///
358    /// ```
359    /// use futures_lite::io::BlockOn;
360    /// use futures_lite::pin;
361    ///
362    /// let reader: &[u8] = b"hello";
363    /// pin!(reader);
364    ///
365    /// let mut blocking_reader = BlockOn::new(reader);
366    /// let r = blocking_reader.get_mut();
367    /// ```
368    pub fn get_mut(&mut self) -> &mut T {
369        &mut self.0
370    }
371
372    /// Extracts the inner async I/O handle.
373    ///
374    /// # Examples
375    ///
376    /// ```
377    /// use futures_lite::io::BlockOn;
378    /// use futures_lite::pin;
379    ///
380    /// let reader: &[u8] = b"hello";
381    /// pin!(reader);
382    ///
383    /// let blocking_reader = BlockOn::new(reader);
384    /// let inner = blocking_reader.into_inner();
385    /// ```
386    pub fn into_inner(self) -> T {
387        self.0
388    }
389}
390
391impl<T: AsyncRead + Unpin> std::io::Read for BlockOn<T> {
392    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
393        future::block_on(self.0.read(buf))
394    }
395}
396
397impl<T: AsyncWrite + Unpin> std::io::Write for BlockOn<T> {
398    fn write(&mut self, buf: &[u8]) -> Result<usize> {
399        future::block_on(self.0.write(buf))
400    }
401
402    fn flush(&mut self) -> Result<()> {
403        future::block_on(self.0.flush())
404    }
405}
406
407impl<T: AsyncSeek + Unpin> std::io::Seek for BlockOn<T> {
408    fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
409        future::block_on(self.0.seek(pos))
410    }
411}
412
413pin_project! {
414    /// Adds buffering to a reader.
415    ///
416    /// It can be excessively inefficient to work directly with an [`AsyncRead`] instance. A
417    /// [`BufReader`] performs large, infrequent reads on the underlying [`AsyncRead`] and
418    /// maintains an in-memory buffer of the incoming byte stream.
419    ///
420    /// [`BufReader`] can improve the speed of programs that make *small* and *repeated* reads to
421    /// the same file or networking socket. It does not help when reading very large amounts at
422    /// once, or reading just once or a few times. It also provides no advantage when reading from
423    /// a source that is already in memory, like a `Vec<u8>`.
424    ///
425    /// When a [`BufReader`] is dropped, the contents of its buffer are discarded. Creating
426    /// multiple instances of [`BufReader`] on the same reader can cause data loss.
427    ///
428    /// # Examples
429    ///
430    /// ```
431    /// use futures_lite::io::{AsyncBufReadExt, BufReader};
432    ///
433    /// # spin_on::spin_on(async {
434    /// let input: &[u8] = b"hello";
435    /// let mut reader = BufReader::new(input);
436    ///
437    /// let mut line = String::new();
438    /// reader.read_line(&mut line).await?;
439    /// # std::io::Result::Ok(()) });
440    /// ```
441    pub struct BufReader<R> {
442        #[pin]
443        inner: R,
444        buf: Box<[u8]>,
445        pos: usize,
446        cap: usize,
447    }
448}
449
450impl<R: AsyncRead> BufReader<R> {
451    /// Creates a buffered reader with the default buffer capacity.
452    ///
453    /// The default capacity is currently 8 KB, but that may change in the future.
454    ///
455    /// # Examples
456    ///
457    /// ```
458    /// use futures_lite::io::BufReader;
459    ///
460    /// let input: &[u8] = b"hello";
461    /// let reader = BufReader::new(input);
462    /// ```
463    pub fn new(inner: R) -> BufReader<R> {
464        BufReader::with_capacity(DEFAULT_BUF_SIZE, inner)
465    }
466
467    /// Creates a buffered reader with the specified capacity.
468    ///
469    /// # Examples
470    ///
471    /// ```
472    /// use futures_lite::io::BufReader;
473    ///
474    /// let input: &[u8] = b"hello";
475    /// let reader = BufReader::with_capacity(1024, input);
476    /// ```
477    pub fn with_capacity(capacity: usize, inner: R) -> BufReader<R> {
478        BufReader {
479            inner,
480            buf: vec![0; capacity].into_boxed_slice(),
481            pos: 0,
482            cap: 0,
483        }
484    }
485}
486
487impl<R> BufReader<R> {
488    /// Gets a reference to the underlying reader.
489    ///
490    /// It is not advisable to directly read from the underlying reader.
491    ///
492    /// # Examples
493    ///
494    /// ```
495    /// use futures_lite::io::BufReader;
496    ///
497    /// let input: &[u8] = b"hello";
498    /// let reader = BufReader::new(input);
499    ///
500    /// let r = reader.get_ref();
501    /// ```
502    pub fn get_ref(&self) -> &R {
503        &self.inner
504    }
505
506    /// Gets a mutable reference to the underlying reader.
507    ///
508    /// It is not advisable to directly read from the underlying reader.
509    ///
510    /// # Examples
511    ///
512    /// ```
513    /// use futures_lite::io::BufReader;
514    ///
515    /// let input: &[u8] = b"hello";
516    /// let mut reader = BufReader::new(input);
517    ///
518    /// let r = reader.get_mut();
519    /// ```
520    pub fn get_mut(&mut self) -> &mut R {
521        &mut self.inner
522    }
523
524    /// Gets a pinned mutable reference to the underlying reader.
525    ///
526    /// It is not advisable to directly read from the underlying reader.
527    fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
528        self.project().inner
529    }
530
531    /// Returns a reference to the internal buffer.
532    ///
533    /// This method will not attempt to fill the buffer if it is empty.
534    ///
535    /// # Examples
536    ///
537    /// ```
538    /// use futures_lite::io::BufReader;
539    ///
540    /// let input: &[u8] = b"hello";
541    /// let reader = BufReader::new(input);
542    ///
543    /// // The internal buffer is empty until the first read request.
544    /// assert_eq!(reader.buffer(), &[]);
545    /// ```
546    pub fn buffer(&self) -> &[u8] {
547        &self.buf[self.pos..self.cap]
548    }
549
550    /// Unwraps the buffered reader, returning the underlying reader.
551    ///
552    /// Note that any leftover data in the internal buffer will be lost.
553    ///
554    /// # Examples
555    ///
556    /// ```
557    /// use futures_lite::io::BufReader;
558    ///
559    /// let input: &[u8] = b"hello";
560    /// let reader = BufReader::new(input);
561    ///
562    /// assert_eq!(reader.into_inner(), input);
563    /// ```
564    pub fn into_inner(self) -> R {
565        self.inner
566    }
567
568    /// Invalidates all data in the internal buffer.
569    #[inline]
570    fn discard_buffer(self: Pin<&mut Self>) {
571        let this = self.project();
572        *this.pos = 0;
573        *this.cap = 0;
574    }
575}
576
577impl<R: AsyncRead> AsyncRead for BufReader<R> {
578    fn poll_read(
579        mut self: Pin<&mut Self>,
580        cx: &mut Context<'_>,
581        buf: &mut [u8],
582    ) -> Poll<Result<usize>> {
583        // If we don't have any buffered data and we're doing a massive read
584        // (larger than our internal buffer), bypass our internal buffer
585        // entirely.
586        if self.pos == self.cap && buf.len() >= self.buf.len() {
587            let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf));
588            self.discard_buffer();
589            return Poll::Ready(res);
590        }
591        let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
592        let nread = std::io::Read::read(&mut rem, buf)?;
593        self.consume(nread);
594        Poll::Ready(Ok(nread))
595    }
596
597    fn poll_read_vectored(
598        mut self: Pin<&mut Self>,
599        cx: &mut Context<'_>,
600        bufs: &mut [IoSliceMut<'_>],
601    ) -> Poll<Result<usize>> {
602        let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
603        if self.pos == self.cap && total_len >= self.buf.len() {
604            let res = ready!(self.as_mut().get_pin_mut().poll_read_vectored(cx, bufs));
605            self.discard_buffer();
606            return Poll::Ready(res);
607        }
608        let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
609        let nread = std::io::Read::read_vectored(&mut rem, bufs)?;
610        self.consume(nread);
611        Poll::Ready(Ok(nread))
612    }
613}
614
615impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
616    fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, cx: &mut Context<'_>) -> Poll<Result<&'a [u8]>> {
617        let mut this = self.project();
618
619        // If we've reached the end of our internal buffer then we need to fetch
620        // some more data from the underlying reader.
621        // Branch using `>=` instead of the more correct `==`
622        // to tell the compiler that the pos..cap slice is always valid.
623        if *this.pos >= *this.cap {
624            debug_assert!(*this.pos == *this.cap);
625            *this.cap = ready!(this.inner.as_mut().poll_read(cx, this.buf))?;
626            *this.pos = 0;
627        }
628        Poll::Ready(Ok(&this.buf[*this.pos..*this.cap]))
629    }
630
631    fn consume(self: Pin<&mut Self>, amt: usize) {
632        let this = self.project();
633        *this.pos = cmp::min(*this.pos + amt, *this.cap);
634    }
635}
636
637impl<R: AsyncRead + fmt::Debug> fmt::Debug for BufReader<R> {
638    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
639        f.debug_struct("BufReader")
640            .field("reader", &self.inner)
641            .field(
642                "buffer",
643                &format_args!("{}/{}", self.cap - self.pos, self.buf.len()),
644            )
645            .finish()
646    }
647}
648
649impl<R: AsyncSeek> AsyncSeek for BufReader<R> {
650    /// Seeks to an offset, in bytes, in the underlying reader.
651    ///
652    /// The position used for seeking with [`SeekFrom::Current`] is the position the underlying
653    /// reader would be at if the [`BufReader`] had no internal buffer.
654    ///
655    /// Seeking always discards the internal buffer, even if the seek position would otherwise fall
656    /// within it. This guarantees that calling [`into_inner()`][`BufReader::into_inner()`]
657    /// immediately after a seek yields the underlying reader at the same position.
658    ///
659    /// See [`AsyncSeek`] for more details.
660    ///
661    /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)` where `n` minus the
662    /// internal buffer length overflows an `i64`, two seeks will be performed instead of one. If
663    /// the second seek returns `Err`, the underlying reader will be left at the same position it
664    /// would have if you called [`seek()`][`AsyncSeekExt::seek()`] with `SeekFrom::Current(0)`.
665    fn poll_seek(
666        mut self: Pin<&mut Self>,
667        cx: &mut Context<'_>,
668        pos: SeekFrom,
669    ) -> Poll<Result<u64>> {
670        let result: u64;
671        if let SeekFrom::Current(n) = pos {
672            let remainder = (self.cap - self.pos) as i64;
673            // it should be safe to assume that remainder fits within an i64 as the alternative
674            // means we managed to allocate 8 exbibytes and that's absurd.
675            // But it's not out of the realm of possibility for some weird underlying reader to
676            // support seeking by i64::min_value() so we need to handle underflow when subtracting
677            // remainder.
678            if let Some(offset) = n.checked_sub(remainder) {
679                result = ready!(self
680                    .as_mut()
681                    .get_pin_mut()
682                    .poll_seek(cx, SeekFrom::Current(offset)))?;
683            } else {
684                // seek backwards by our remainder, and then by the offset
685                ready!(self
686                    .as_mut()
687                    .get_pin_mut()
688                    .poll_seek(cx, SeekFrom::Current(-remainder)))?;
689                self.as_mut().discard_buffer();
690                result = ready!(self
691                    .as_mut()
692                    .get_pin_mut()
693                    .poll_seek(cx, SeekFrom::Current(n)))?;
694            }
695        } else {
696            // Seeking with Start/End doesn't care about our buffer length.
697            result = ready!(self.as_mut().get_pin_mut().poll_seek(cx, pos))?;
698        }
699        self.discard_buffer();
700        Poll::Ready(Ok(result))
701    }
702}
703
704pin_project! {
705    /// Adds buffering to a writer.
706    ///
707    /// It can be excessively inefficient to work directly with something that implements
708    /// [`AsyncWrite`]. For example, every call to [`write()`][`AsyncWriteExt::write()`] on a TCP
709    /// stream results in a system call. A [`BufWriter`] keeps an in-memory buffer of data and
710    /// writes it to the underlying writer in large, infrequent batches.
711    ///
712    /// [`BufWriter`] can improve the speed of programs that make *small* and *repeated* writes to
713    /// the same file or networking socket. It does not help when writing very large amounts at
714    /// once, or writing just once or a few times. It also provides no advantage when writing to a
715    /// destination that is in memory, like a `Vec<u8>`.
716    ///
717    /// Unlike [`std::io::BufWriter`], this type does not write out the contents of its buffer when
718    /// it is dropped. Therefore, it is important that users explicitly flush the buffer before
719    /// dropping the [`BufWriter`].
720    ///
721    /// # Examples
722    ///
723    /// ```
724    /// use futures_lite::io::{AsyncWriteExt, BufWriter};
725    ///
726    /// # spin_on::spin_on(async {
727    /// let mut output = Vec::new();
728    /// let mut writer = BufWriter::new(&mut output);
729    ///
730    /// writer.write_all(b"hello").await?;
731    /// writer.flush().await?;
732    /// # std::io::Result::Ok(()) });
733    /// ```
734    pub struct BufWriter<W> {
735        #[pin]
736        inner: W,
737        buf: Vec<u8>,
738        written: usize,
739    }
740}
741
742impl<W: AsyncWrite> BufWriter<W> {
743    /// Creates a buffered writer with the default buffer capacity.
744    ///
745    /// The default capacity is currently 8 KB, but that may change in the future.
746    ///
747    /// # Examples
748    ///
749    /// ```
750    /// use futures_lite::io::BufWriter;
751    ///
752    /// let mut output = Vec::new();
753    /// let writer = BufWriter::new(&mut output);
754    /// ```
755    pub fn new(inner: W) -> BufWriter<W> {
756        BufWriter::with_capacity(DEFAULT_BUF_SIZE, inner)
757    }
758
759    /// Creates a buffered writer with the specified buffer capacity.
760    ///
761    /// # Examples
762    ///
763    /// ```
764    /// use futures_lite::io::BufWriter;
765    ///
766    /// let mut output = Vec::new();
767    /// let writer = BufWriter::with_capacity(100, &mut output);
768    /// ```
769    pub fn with_capacity(capacity: usize, inner: W) -> BufWriter<W> {
770        BufWriter {
771            inner,
772            buf: Vec::with_capacity(capacity),
773            written: 0,
774        }
775    }
776
777    /// Gets a reference to the underlying writer.
778    ///
779    /// # Examples
780    ///
781    /// ```
782    /// use futures_lite::io::BufWriter;
783    ///
784    /// let mut output = Vec::new();
785    /// let writer = BufWriter::new(&mut output);
786    ///
787    /// let r = writer.get_ref();
788    /// ```
789    pub fn get_ref(&self) -> &W {
790        &self.inner
791    }
792
793    /// Gets a mutable reference to the underlying writer.
794    ///
795    /// It is not advisable to directly write to the underlying writer.
796    ///
797    /// # Examples
798    ///
799    /// ```
800    /// use futures_lite::io::BufWriter;
801    ///
802    /// let mut output = Vec::new();
803    /// let mut writer = BufWriter::new(&mut output);
804    ///
805    /// let r = writer.get_mut();
806    /// ```
807    pub fn get_mut(&mut self) -> &mut W {
808        &mut self.inner
809    }
810
811    /// Gets a pinned mutable reference to the underlying writer.
812    ///
813    /// It is not not advisable to directly write to the underlying writer.
814    fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> {
815        self.project().inner
816    }
817
818    /// Unwraps the buffered writer, returning the underlying writer.
819    ///
820    /// Note that any leftover data in the internal buffer will be lost. If you don't want to lose
821    /// that data, flush the buffered writer before unwrapping it.
822    ///
823    /// # Examples
824    ///
825    /// ```
826    /// use futures_lite::io::{AsyncWriteExt, BufWriter};
827    ///
828    /// # spin_on::spin_on(async {
829    /// let mut output = vec![1, 2, 3];
830    /// let mut writer = BufWriter::new(&mut output);
831    ///
832    /// writer.write_all(&[4]).await?;
833    /// writer.flush().await?;
834    /// assert_eq!(writer.into_inner(), &[1, 2, 3, 4]);
835    /// # std::io::Result::Ok(()) });
836    /// ```
837    pub fn into_inner(self) -> W {
838        self.inner
839    }
840
841    /// Returns a reference to the internal buffer.
842    ///
843    /// # Examples
844    ///
845    /// ```
846    /// use futures_lite::io::BufWriter;
847    ///
848    /// let mut output = Vec::new();
849    /// let writer = BufWriter::new(&mut output);
850    ///
851    /// // The internal buffer is empty until the first write request.
852    /// assert_eq!(writer.buffer(), &[]);
853    /// ```
854    pub fn buffer(&self) -> &[u8] {
855        &self.buf
856    }
857
858    /// Flush the buffer.
859    fn poll_flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
860        let mut this = self.project();
861        let len = this.buf.len();
862        let mut ret = Ok(());
863
864        while *this.written < len {
865            match this
866                .inner
867                .as_mut()
868                .poll_write(cx, &this.buf[*this.written..])
869            {
870                Poll::Ready(Ok(0)) => {
871                    ret = Err(Error::new(
872                        ErrorKind::WriteZero,
873                        "Failed to write buffered data",
874                    ));
875                    break;
876                }
877                Poll::Ready(Ok(n)) => *this.written += n,
878                Poll::Ready(Err(ref e)) if e.kind() == ErrorKind::Interrupted => {}
879                Poll::Ready(Err(e)) => {
880                    ret = Err(e);
881                    break;
882                }
883                Poll::Pending => return Poll::Pending,
884            }
885        }
886
887        if *this.written > 0 {
888            this.buf.drain(..*this.written);
889        }
890        *this.written = 0;
891
892        Poll::Ready(ret)
893    }
894}
895
896impl<W: AsyncWrite + fmt::Debug> fmt::Debug for BufWriter<W> {
897    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
898        f.debug_struct("BufWriter")
899            .field("writer", &self.inner)
900            .field("buf", &self.buf)
901            .finish()
902    }
903}
904
905impl<W: AsyncWrite> AsyncWrite for BufWriter<W> {
906    fn poll_write(
907        mut self: Pin<&mut Self>,
908        cx: &mut Context<'_>,
909        buf: &[u8],
910    ) -> Poll<Result<usize>> {
911        if self.buf.len() + buf.len() > self.buf.capacity() {
912            ready!(self.as_mut().poll_flush_buf(cx))?;
913        }
914        if buf.len() >= self.buf.capacity() {
915            self.get_pin_mut().poll_write(cx, buf)
916        } else {
917            Pin::new(&mut *self.project().buf).poll_write(cx, buf)
918        }
919    }
920
921    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
922        ready!(self.as_mut().poll_flush_buf(cx))?;
923        self.get_pin_mut().poll_flush(cx)
924    }
925
926    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
927        ready!(self.as_mut().poll_flush_buf(cx))?;
928        self.get_pin_mut().poll_close(cx)
929    }
930}
931
932impl<W: AsyncWrite + AsyncSeek> AsyncSeek for BufWriter<W> {
933    /// Seek to the offset, in bytes, in the underlying writer.
934    ///
935    /// Seeking always writes out the internal buffer before seeking.
936    fn poll_seek(
937        mut self: Pin<&mut Self>,
938        cx: &mut Context<'_>,
939        pos: SeekFrom,
940    ) -> Poll<Result<u64>> {
941        ready!(self.as_mut().poll_flush_buf(cx))?;
942        self.get_pin_mut().poll_seek(cx, pos)
943    }
944}
945
946/// Gives an in-memory buffer a cursor for reading and writing.
947///
948/// # Examples
949///
950/// ```
951/// use futures_lite::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, Cursor, SeekFrom};
952///
953/// # spin_on::spin_on(async {
954/// let mut bytes = b"hello".to_vec();
955/// let mut cursor = Cursor::new(&mut bytes);
956///
957/// // Overwrite 'h' with 'H'.
958/// cursor.write_all(b"H").await?;
959///
960/// // Move the cursor one byte forward.
961/// cursor.seek(SeekFrom::Current(1)).await?;
962///
963/// // Read a byte.
964/// let mut byte = [0];
965/// cursor.read_exact(&mut byte).await?;
966/// assert_eq!(&byte, b"l");
967///
968/// // Check the final buffer.
969/// assert_eq!(bytes, b"Hello");
970/// # std::io::Result::Ok(()) });
971/// ```
972#[derive(Clone, Debug, Default)]
973pub struct Cursor<T> {
974    inner: std::io::Cursor<T>,
975}
976
977impl<T> Cursor<T> {
978    /// Creates a cursor for an in-memory buffer.
979    ///
980    /// Cursor's initial position is 0 even if the underlying buffer is not empty. Writing using
981    /// [`Cursor`] will overwrite the existing contents unless the cursor is moved to the end of
982    /// the buffer using [`set_position()`][Cursor::set_position()`] or
983    /// [`seek()`][`AsyncSeekExt::seek()`].
984    ///
985    /// # Examples
986    ///
987    /// ```
988    /// use futures_lite::io::Cursor;
989    ///
990    /// let cursor = Cursor::new(Vec::<u8>::new());
991    /// ```
992    pub fn new(inner: T) -> Cursor<T> {
993        Cursor {
994            inner: std::io::Cursor::new(inner),
995        }
996    }
997
998    /// Gets a reference to the underlying buffer.
999    ///
1000    /// # Examples
1001    ///
1002    /// ```
1003    /// use futures_lite::io::Cursor;
1004    ///
1005    /// let cursor = Cursor::new(Vec::<u8>::new());
1006    /// let r = cursor.get_ref();
1007    /// ```
1008    pub fn get_ref(&self) -> &T {
1009        self.inner.get_ref()
1010    }
1011
1012    /// Gets a mutable reference to the underlying buffer.
1013    ///
1014    /// # Examples
1015    ///
1016    /// ```
1017    /// use futures_lite::io::Cursor;
1018    ///
1019    /// let mut cursor = Cursor::new(Vec::<u8>::new());
1020    /// let r = cursor.get_mut();
1021    /// ```
1022    pub fn get_mut(&mut self) -> &mut T {
1023        self.inner.get_mut()
1024    }
1025
1026    /// Unwraps the cursor, returning the underlying buffer.
1027    ///
1028    /// # Examples
1029    ///
1030    /// ```
1031    /// use futures_lite::io::Cursor;
1032    ///
1033    /// let cursor = Cursor::new(vec![1, 2, 3]);
1034    /// assert_eq!(cursor.into_inner(), [1, 2, 3]);
1035    /// ```
1036    pub fn into_inner(self) -> T {
1037        self.inner.into_inner()
1038    }
1039
1040    /// Returns the current position of this cursor.
1041    ///
1042    /// # Examples
1043    ///
1044    /// ```
1045    /// use futures_lite::io::{AsyncSeekExt, Cursor, SeekFrom};
1046    ///
1047    /// # spin_on::spin_on(async {
1048    /// let mut cursor = Cursor::new(b"hello");
1049    /// assert_eq!(cursor.position(), 0);
1050    ///
1051    /// cursor.seek(SeekFrom::Start(2)).await?;
1052    /// assert_eq!(cursor.position(), 2);
1053    /// # std::io::Result::Ok(()) });
1054    /// ```
1055    pub fn position(&self) -> u64 {
1056        self.inner.position()
1057    }
1058
1059    /// Sets the position of this cursor.
1060    ///
1061    /// # Examples
1062    ///
1063    /// ```
1064    /// use futures_lite::io::Cursor;
1065    ///
1066    /// let mut cursor = Cursor::new(b"hello");
1067    /// assert_eq!(cursor.position(), 0);
1068    ///
1069    /// cursor.set_position(2);
1070    /// assert_eq!(cursor.position(), 2);
1071    /// ```
1072    pub fn set_position(&mut self, pos: u64) {
1073        self.inner.set_position(pos)
1074    }
1075}
1076
1077impl<T> AsyncSeek for Cursor<T>
1078where
1079    T: AsRef<[u8]> + Unpin,
1080{
1081    fn poll_seek(
1082        mut self: Pin<&mut Self>,
1083        _: &mut Context<'_>,
1084        pos: SeekFrom,
1085    ) -> Poll<Result<u64>> {
1086        Poll::Ready(std::io::Seek::seek(&mut self.inner, pos))
1087    }
1088}
1089
1090impl<T> AsyncRead for Cursor<T>
1091where
1092    T: AsRef<[u8]> + Unpin,
1093{
1094    fn poll_read(
1095        mut self: Pin<&mut Self>,
1096        _cx: &mut Context<'_>,
1097        buf: &mut [u8],
1098    ) -> Poll<Result<usize>> {
1099        Poll::Ready(std::io::Read::read(&mut self.inner, buf))
1100    }
1101
1102    fn poll_read_vectored(
1103        mut self: Pin<&mut Self>,
1104        _: &mut Context<'_>,
1105        bufs: &mut [IoSliceMut<'_>],
1106    ) -> Poll<Result<usize>> {
1107        Poll::Ready(std::io::Read::read_vectored(&mut self.inner, bufs))
1108    }
1109}
1110
1111impl<T> AsyncBufRead for Cursor<T>
1112where
1113    T: AsRef<[u8]> + Unpin,
1114{
1115    fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<&[u8]>> {
1116        Poll::Ready(std::io::BufRead::fill_buf(&mut self.get_mut().inner))
1117    }
1118
1119    fn consume(mut self: Pin<&mut Self>, amt: usize) {
1120        std::io::BufRead::consume(&mut self.inner, amt)
1121    }
1122}
1123
1124impl AsyncWrite for Cursor<&mut [u8]> {
1125    fn poll_write(
1126        mut self: Pin<&mut Self>,
1127        _: &mut Context<'_>,
1128        buf: &[u8],
1129    ) -> Poll<Result<usize>> {
1130        Poll::Ready(std::io::Write::write(&mut self.inner, buf))
1131    }
1132
1133    fn poll_write_vectored(
1134        mut self: Pin<&mut Self>,
1135        _: &mut Context<'_>,
1136        bufs: &[IoSlice<'_>],
1137    ) -> Poll<Result<usize>> {
1138        Poll::Ready(std::io::Write::write_vectored(&mut self.inner, bufs))
1139    }
1140
1141    fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1142        Poll::Ready(std::io::Write::flush(&mut self.inner))
1143    }
1144
1145    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1146        self.poll_flush(cx)
1147    }
1148}
1149
1150impl AsyncWrite for Cursor<&mut Vec<u8>> {
1151    fn poll_write(
1152        mut self: Pin<&mut Self>,
1153        _: &mut Context<'_>,
1154        buf: &[u8],
1155    ) -> Poll<Result<usize>> {
1156        Poll::Ready(std::io::Write::write(&mut self.inner, buf))
1157    }
1158
1159    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1160        self.poll_flush(cx)
1161    }
1162
1163    fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1164        Poll::Ready(std::io::Write::flush(&mut self.inner))
1165    }
1166}
1167
1168impl AsyncWrite for Cursor<Vec<u8>> {
1169    fn poll_write(
1170        mut self: Pin<&mut Self>,
1171        _: &mut Context<'_>,
1172        buf: &[u8],
1173    ) -> Poll<Result<usize>> {
1174        Poll::Ready(std::io::Write::write(&mut self.inner, buf))
1175    }
1176
1177    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1178        self.poll_flush(cx)
1179    }
1180
1181    fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1182        Poll::Ready(std::io::Write::flush(&mut self.inner))
1183    }
1184}
1185
1186/// Creates an empty reader.
1187///
1188/// # Examples
1189///
1190/// ```
1191/// use futures_lite::io::{self, AsyncReadExt};
1192///
1193/// # spin_on::spin_on(async {
1194/// let mut reader = io::empty();
1195///
1196/// let mut contents = Vec::new();
1197/// reader.read_to_end(&mut contents).await?;
1198/// assert!(contents.is_empty());
1199/// # std::io::Result::Ok(()) });
1200/// ```
1201pub fn empty() -> Empty {
1202    Empty { _private: () }
1203}
1204
1205/// Reader for the [`empty()`] function.
1206pub struct Empty {
1207    _private: (),
1208}
1209
1210impl fmt::Debug for Empty {
1211    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1212        f.pad("Empty { .. }")
1213    }
1214}
1215
1216impl AsyncRead for Empty {
1217    #[inline]
1218    fn poll_read(self: Pin<&mut Self>, _: &mut Context<'_>, _: &mut [u8]) -> Poll<Result<usize>> {
1219        Poll::Ready(Ok(0))
1220    }
1221}
1222
1223impl AsyncBufRead for Empty {
1224    #[inline]
1225    fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, _: &mut Context<'_>) -> Poll<Result<&'a [u8]>> {
1226        Poll::Ready(Ok(&[]))
1227    }
1228
1229    #[inline]
1230    fn consume(self: Pin<&mut Self>, _: usize) {}
1231}
1232
1233/// Creates an infinite reader that reads the same byte repeatedly.
1234///
1235/// # Examples
1236///
1237/// ```
1238/// use futures_lite::io::{self, AsyncReadExt};
1239///
1240/// # spin_on::spin_on(async {
1241/// let mut reader = io::repeat(b'a');
1242///
1243/// let mut contents = vec![0; 5];
1244/// reader.read_exact(&mut contents).await?;
1245/// assert_eq!(contents, b"aaaaa");
1246/// # std::io::Result::Ok(()) });
1247/// ```
1248pub fn repeat(byte: u8) -> Repeat {
1249    Repeat { byte }
1250}
1251
1252/// Reader for the [`repeat()`] function.
1253#[derive(Debug)]
1254pub struct Repeat {
1255    byte: u8,
1256}
1257
1258impl AsyncRead for Repeat {
1259    #[inline]
1260    fn poll_read(self: Pin<&mut Self>, _: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
1261        for b in &mut *buf {
1262            *b = self.byte;
1263        }
1264        Poll::Ready(Ok(buf.len()))
1265    }
1266}
1267
1268/// Creates a writer that consumes and drops all data.
1269///
1270/// # Examples
1271///
1272/// ```
1273/// use futures_lite::io::{self, AsyncWriteExt};
1274///
1275/// # spin_on::spin_on(async {
1276/// let mut writer = io::sink();
1277/// writer.write_all(b"hello").await?;
1278/// # std::io::Result::Ok(()) });
1279/// ```
1280pub fn sink() -> Sink {
1281    Sink { _private: () }
1282}
1283
1284/// Writer for the [`sink()`] function.
1285#[derive(Debug)]
1286pub struct Sink {
1287    _private: (),
1288}
1289
1290impl AsyncWrite for Sink {
1291    #[inline]
1292    fn poll_write(self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
1293        Poll::Ready(Ok(buf.len()))
1294    }
1295
1296    #[inline]
1297    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1298        Poll::Ready(Ok(()))
1299    }
1300
1301    #[inline]
1302    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1303        Poll::Ready(Ok(()))
1304    }
1305}
1306
1307/// Extension trait for [`AsyncBufRead`].
1308pub trait AsyncBufReadExt: AsyncBufRead {
1309    /// Returns the contents of the internal buffer, filling it with more data if empty.
1310    ///
1311    /// If the stream has reached EOF, an empty buffer will be returned.
1312    ///
1313    /// # Examples
1314    ///
1315    /// ```
1316    /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1317    /// use std::pin::Pin;
1318    ///
1319    /// # spin_on::spin_on(async {
1320    /// let input: &[u8] = b"hello world";
1321    /// let mut reader = BufReader::with_capacity(5, input);
1322    ///
1323    /// assert_eq!(reader.fill_buf().await?, b"hello");
1324    /// reader.consume(2);
1325    /// assert_eq!(reader.fill_buf().await?, b"llo");
1326    /// reader.consume(3);
1327    /// assert_eq!(reader.fill_buf().await?, b" worl");
1328    /// # std::io::Result::Ok(()) });
1329    /// ```
1330    fn fill_buf(&mut self) -> FillBuf<'_, Self>
1331    where
1332        Self: Unpin,
1333    {
1334        FillBuf { reader: Some(self) }
1335    }
1336
1337    /// Consumes `amt` buffered bytes.
1338    ///
1339    /// This method does not perform any I/O, it simply consumes some amount of bytes from the
1340    /// internal buffer.
1341    ///
1342    /// The `amt` must be <= the number of bytes in the buffer returned by
1343    /// [`fill_buf()`][`AsyncBufReadExt::fill_buf()`].
1344    ///
1345    /// # Examples
1346    ///
1347    /// ```
1348    /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1349    /// use std::pin::Pin;
1350    ///
1351    /// # spin_on::spin_on(async {
1352    /// let input: &[u8] = b"hello";
1353    /// let mut reader = BufReader::with_capacity(4, input);
1354    ///
1355    /// assert_eq!(reader.fill_buf().await?, b"hell");
1356    /// reader.consume(2);
1357    /// assert_eq!(reader.fill_buf().await?, b"ll");
1358    /// # std::io::Result::Ok(()) });
1359    /// ```
1360    fn consume(&mut self, amt: usize)
1361    where
1362        Self: Unpin,
1363    {
1364        AsyncBufRead::consume(Pin::new(self), amt);
1365    }
1366
1367    /// Reads all bytes and appends them into `buf` until the delimiter `byte` or EOF is found.
1368    ///
1369    /// This method will read bytes from the underlying stream until the delimiter or EOF is
1370    /// found. All bytes up to and including the delimiter (if found) will be appended to `buf`.
1371    ///
1372    /// If successful, returns the total number of bytes read.
1373    ///
1374    /// # Examples
1375    ///
1376    /// ```
1377    /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1378    ///
1379    /// # spin_on::spin_on(async {
1380    /// let input: &[u8] = b"hello";
1381    /// let mut reader = BufReader::new(input);
1382    ///
1383    /// let mut buf = Vec::new();
1384    /// let n = reader.read_until(b'\n', &mut buf).await?;
1385    /// # std::io::Result::Ok(()) });
1386    /// ```
1387    fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> ReadUntilFuture<'_, Self>
1388    where
1389        Self: Unpin,
1390    {
1391        ReadUntilFuture {
1392            reader: self,
1393            byte,
1394            buf,
1395            read: 0,
1396        }
1397    }
1398
1399    /// Reads all bytes and appends them into `buf` until a newline (the 0xA byte) or EOF is found.
1400    ///
1401    /// This method will read bytes from the underlying stream until the newline delimiter (the
1402    /// 0xA byte) or EOF is found. All bytes up to, and including, the newline delimiter (if found)
1403    /// will be appended to `buf`.
1404    ///
1405    /// If successful, returns the total number of bytes read.
1406    ///
1407    /// # Examples
1408    ///
1409    /// ```
1410    /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1411    ///
1412    /// # spin_on::spin_on(async {
1413    /// let input: &[u8] = b"hello";
1414    /// let mut reader = BufReader::new(input);
1415    ///
1416    /// let mut line = String::new();
1417    /// let n = reader.read_line(&mut line).await?;
1418    /// # std::io::Result::Ok(()) });
1419    /// ```
1420    fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLineFuture<'_, Self>
1421    where
1422        Self: Unpin,
1423    {
1424        ReadLineFuture {
1425            reader: self,
1426            buf,
1427            bytes: Vec::new(),
1428            read: 0,
1429        }
1430    }
1431
1432    /// Returns a stream over the lines of this byte stream.
1433    ///
1434    /// The stream returned from this method yields items of type
1435    /// [`io::Result`][`super::io::Result`]`<`[`String`]`>`.
1436    /// Each string returned will *not* have a newline byte (the 0xA byte) or CRLF (0xD, 0xA bytes)
1437    /// at the end.
1438    ///
1439    /// # Examples
1440    ///
1441    /// ```
1442    /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1443    /// use futures_lite::stream::StreamExt;
1444    ///
1445    /// # spin_on::spin_on(async {
1446    /// let input: &[u8] = b"hello\nworld\n";
1447    /// let mut reader = BufReader::new(input);
1448    /// let mut lines = reader.lines();
1449    ///
1450    /// let mut line = String::new();
1451    /// while let Some(line) = lines.next().await {
1452    ///     println!("{}", line?);
1453    /// }
1454    /// # std::io::Result::Ok(()) });
1455    /// ```
1456    fn lines(self) -> Lines<Self>
1457    where
1458        Self: Unpin + Sized,
1459    {
1460        Lines {
1461            reader: self,
1462            buf: String::new(),
1463            bytes: Vec::new(),
1464            read: 0,
1465        }
1466    }
1467
1468    /// Returns a stream over the contents of this reader split on the specified `byte`.
1469    ///
1470    /// The stream returned from this method yields items of type
1471    /// [`io::Result`][`super::io::Result`]`<`[`Vec<u8>`][`Vec`]`>`.
1472    /// Each vector returned will *not* have the delimiter byte at the end.
1473    ///
1474    /// # Examples
1475    ///
1476    /// ```
1477    /// use futures_lite::io::{AsyncBufReadExt, Cursor};
1478    /// use futures_lite::stream::StreamExt;
1479    ///
1480    /// # spin_on::spin_on(async {
1481    /// let cursor = Cursor::new(b"lorem-ipsum-dolor");
1482    /// let items: Vec<Vec<u8>> = cursor.split(b'-').try_collect().await?;
1483    ///
1484    /// assert_eq!(items[0], b"lorem");
1485    /// assert_eq!(items[1], b"ipsum");
1486    /// assert_eq!(items[2], b"dolor");
1487    /// # std::io::Result::Ok(()) });
1488    /// ```
1489    fn split(self, byte: u8) -> Split<Self>
1490    where
1491        Self: Sized,
1492    {
1493        Split {
1494            reader: self,
1495            buf: Vec::new(),
1496            delim: byte,
1497            read: 0,
1498        }
1499    }
1500}
1501
1502impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}
1503
1504/// Future for the [`AsyncBufReadExt::fill_buf()`] method.
1505#[derive(Debug)]
1506#[must_use = "futures do nothing unless you `.await` or poll them"]
1507pub struct FillBuf<'a, R: ?Sized> {
1508    reader: Option<&'a mut R>,
1509}
1510
1511impl<R: ?Sized> Unpin for FillBuf<'_, R> {}
1512
1513impl<'a, R> Future for FillBuf<'a, R>
1514where
1515    R: AsyncBufRead + Unpin + ?Sized,
1516{
1517    type Output = Result<&'a [u8]>;
1518
1519    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1520        let this = &mut *self;
1521        let reader = this
1522            .reader
1523            .take()
1524            .expect("polled `FillBuf` after completion");
1525
1526        match Pin::new(&mut *reader).poll_fill_buf(cx) {
1527            Poll::Ready(Ok(_)) => match Pin::new(reader).poll_fill_buf(cx) {
1528                Poll::Ready(Ok(slice)) => Poll::Ready(Ok(slice)),
1529                poll => panic!("`poll_fill_buf()` was ready but now it isn't: {:?}", poll),
1530            },
1531            Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
1532            Poll::Pending => {
1533                this.reader = Some(reader);
1534                Poll::Pending
1535            }
1536        }
1537    }
1538}
1539
1540/// Future for the [`AsyncBufReadExt::read_until()`] method.
1541#[derive(Debug)]
1542#[must_use = "futures do nothing unless you `.await` or poll them"]
1543pub struct ReadUntilFuture<'a, R: Unpin + ?Sized> {
1544    reader: &'a mut R,
1545    byte: u8,
1546    buf: &'a mut Vec<u8>,
1547    read: usize,
1548}
1549
1550impl<R: Unpin + ?Sized> Unpin for ReadUntilFuture<'_, R> {}
1551
1552impl<R: AsyncBufRead + Unpin + ?Sized> Future for ReadUntilFuture<'_, R> {
1553    type Output = Result<usize>;
1554
1555    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1556        let Self {
1557            reader,
1558            byte,
1559            buf,
1560            read,
1561        } = &mut *self;
1562        read_until_internal(Pin::new(reader), cx, *byte, buf, read)
1563    }
1564}
1565
1566fn read_until_internal<R: AsyncBufReadExt + ?Sized>(
1567    mut reader: Pin<&mut R>,
1568    cx: &mut Context<'_>,
1569    byte: u8,
1570    buf: &mut Vec<u8>,
1571    read: &mut usize,
1572) -> Poll<Result<usize>> {
1573    loop {
1574        let (done, used) = {
1575            let available = ready!(reader.as_mut().poll_fill_buf(cx))?;
1576
1577            if let Some(i) = memchr::memchr(byte, available) {
1578                buf.extend_from_slice(&available[..=i]);
1579                (true, i + 1)
1580            } else {
1581                buf.extend_from_slice(available);
1582                (false, available.len())
1583            }
1584        };
1585
1586        reader.as_mut().consume(used);
1587        *read += used;
1588
1589        if done || used == 0 {
1590            return Poll::Ready(Ok(mem::replace(read, 0)));
1591        }
1592    }
1593}
1594
1595/// Future for the [`AsyncBufReadExt::read_line()`] method.
1596#[derive(Debug)]
1597#[must_use = "futures do nothing unless you `.await` or poll them"]
1598pub struct ReadLineFuture<'a, R: Unpin + ?Sized> {
1599    reader: &'a mut R,
1600    buf: &'a mut String,
1601    bytes: Vec<u8>,
1602    read: usize,
1603}
1604
1605impl<R: Unpin + ?Sized> Unpin for ReadLineFuture<'_, R> {}
1606
1607impl<R: AsyncBufRead + Unpin + ?Sized> Future for ReadLineFuture<'_, R> {
1608    type Output = Result<usize>;
1609
1610    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1611        let Self {
1612            reader,
1613            buf,
1614            bytes,
1615            read,
1616        } = &mut *self;
1617        read_line_internal(Pin::new(reader), cx, buf, bytes, read)
1618    }
1619}
1620
1621pin_project! {
1622    /// Stream for the [`AsyncBufReadExt::lines()`] method.
1623    #[derive(Debug)]
1624    #[must_use = "streams do nothing unless polled"]
1625    pub struct Lines<R> {
1626        #[pin]
1627        reader: R,
1628        buf: String,
1629        bytes: Vec<u8>,
1630        read: usize,
1631    }
1632}
1633
1634impl<R: AsyncBufRead> Stream for Lines<R> {
1635    type Item = Result<String>;
1636
1637    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1638        let this = self.project();
1639
1640        let n = ready!(read_line_internal(
1641            this.reader,
1642            cx,
1643            this.buf,
1644            this.bytes,
1645            this.read
1646        ))?;
1647        if n == 0 && this.buf.is_empty() {
1648            return Poll::Ready(None);
1649        }
1650
1651        if this.buf.ends_with('\n') {
1652            this.buf.pop();
1653            if this.buf.ends_with('\r') {
1654                this.buf.pop();
1655            }
1656        }
1657        Poll::Ready(Some(Ok(mem::replace(this.buf, String::new()))))
1658    }
1659}
1660
1661fn read_line_internal<R: AsyncBufRead + ?Sized>(
1662    reader: Pin<&mut R>,
1663    cx: &mut Context<'_>,
1664    buf: &mut String,
1665    bytes: &mut Vec<u8>,
1666    read: &mut usize,
1667) -> Poll<Result<usize>> {
1668    let ret = ready!(read_until_internal(reader, cx, b'\n', bytes, read));
1669
1670    match String::from_utf8(mem::replace(bytes, Vec::new())) {
1671        Ok(s) => {
1672            debug_assert!(buf.is_empty());
1673            debug_assert_eq!(*read, 0);
1674            *buf = s;
1675            Poll::Ready(ret)
1676        }
1677        Err(_) => Poll::Ready(ret.and_then(|_| {
1678            Err(Error::new(
1679                ErrorKind::InvalidData,
1680                "stream did not contain valid UTF-8",
1681            ))
1682        })),
1683    }
1684}
1685
1686pin_project! {
1687    /// Stream for the [`AsyncBufReadExt::split()`] method.
1688    #[derive(Debug)]
1689    #[must_use = "streams do nothing unless polled"]
1690    pub struct Split<R> {
1691        #[pin]
1692        reader: R,
1693        buf: Vec<u8>,
1694        read: usize,
1695        delim: u8,
1696    }
1697}
1698
1699impl<R: AsyncBufRead> Stream for Split<R> {
1700    type Item = Result<Vec<u8>>;
1701
1702    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1703        let this = self.project();
1704
1705        let n = ready!(read_until_internal(
1706            this.reader,
1707            cx,
1708            *this.delim,
1709            this.buf,
1710            this.read
1711        ))?;
1712        if n == 0 && this.buf.is_empty() {
1713            return Poll::Ready(None);
1714        }
1715
1716        if this.buf[this.buf.len() - 1] == *this.delim {
1717            this.buf.pop();
1718        }
1719        Poll::Ready(Some(Ok(mem::replace(this.buf, vec![]))))
1720    }
1721}
1722
1723/// Extension trait for [`AsyncRead`].
1724pub trait AsyncReadExt: AsyncRead {
1725    /// Reads some bytes from the byte stream.
1726    ///
1727    /// On success, returns the total number of bytes read.
1728    ///
1729    /// If the return value is `Ok(n)`, then it must be guaranteed that
1730    /// `0 <= n <= buf.len()`. A nonzero `n` value indicates that the buffer has been
1731    /// filled with `n` bytes of data. If `n` is `0`, then it can indicate one of two
1732    /// scenarios:
1733    ///
1734    /// 1. This reader has reached its "end of file" and will likely no longer be able to
1735    ///    produce bytes. Note that this does not mean that the reader will always no
1736    ///    longer be able to produce bytes.
1737    /// 2. The buffer specified was 0 bytes in length.
1738    ///
1739    /// # Examples
1740    ///
1741    /// ```
1742    /// use futures_lite::io::{AsyncReadExt, BufReader};
1743    ///
1744    /// # spin_on::spin_on(async {
1745    /// let input: &[u8] = b"hello";
1746    /// let mut reader = BufReader::new(input);
1747    ///
1748    /// let mut buf = vec![0; 1024];
1749    /// let n = reader.read(&mut buf).await?;
1750    /// # std::io::Result::Ok(()) });
1751    /// ```
1752    fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadFuture<'a, Self>
1753    where
1754        Self: Unpin,
1755    {
1756        ReadFuture { reader: self, buf }
1757    }
1758
1759    /// Like [`read()`][`AsyncReadExt::read()`], except it reads into a slice of buffers.
1760    ///
1761    /// Data is copied to fill each buffer in order, with the final buffer possibly being
1762    /// only partially filled. This method must behave same as a single call to
1763    /// [`read()`][`AsyncReadExt::read()`] with the buffers concatenated would.
1764    fn read_vectored<'a>(
1765        &'a mut self,
1766        bufs: &'a mut [IoSliceMut<'a>],
1767    ) -> ReadVectoredFuture<'a, Self>
1768    where
1769        Self: Unpin,
1770    {
1771        ReadVectoredFuture { reader: self, bufs }
1772    }
1773
1774    /// Reads the entire contents and appends them to a [`Vec`].
1775    ///
1776    /// On success, returns the total number of bytes read.
1777    ///
1778    /// # Examples
1779    ///
1780    /// ```
1781    /// use futures_lite::io::{AsyncReadExt, Cursor};
1782    ///
1783    /// # spin_on::spin_on(async {
1784    /// let mut reader = Cursor::new(vec![1, 2, 3]);
1785    /// let mut contents = Vec::new();
1786    ///
1787    /// let n = reader.read_to_end(&mut contents).await?;
1788    /// assert_eq!(n, 3);
1789    /// assert_eq!(contents, [1, 2, 3]);
1790    /// # std::io::Result::Ok(()) });
1791    /// ```
1792    fn read_to_end<'a>(&'a mut self, buf: &'a mut Vec<u8>) -> ReadToEndFuture<'a, Self>
1793    where
1794        Self: Unpin,
1795    {
1796        let start_len = buf.len();
1797        ReadToEndFuture {
1798            reader: self,
1799            buf,
1800            start_len,
1801        }
1802    }
1803
1804    /// Reads the entire contents and appends them to a [`String`].
1805    ///
1806    /// On success, returns the total number of bytes read.
1807    ///
1808    /// # Examples
1809    ///
1810    /// ```
1811    /// use futures_lite::io::{AsyncReadExt, Cursor};
1812    ///
1813    /// # spin_on::spin_on(async {
1814    /// let mut reader = Cursor::new(&b"hello");
1815    /// let mut contents = String::new();
1816    ///
1817    /// let n = reader.read_to_string(&mut contents).await?;
1818    /// assert_eq!(n, 5);
1819    /// assert_eq!(contents, "hello");
1820    /// # std::io::Result::Ok(()) });
1821    /// ```
1822    fn read_to_string<'a>(&'a mut self, buf: &'a mut String) -> ReadToStringFuture<'a, Self>
1823    where
1824        Self: Unpin,
1825    {
1826        ReadToStringFuture {
1827            reader: self,
1828            buf,
1829            bytes: Vec::new(),
1830            start_len: 0,
1831        }
1832    }
1833
1834    /// Reads the exact number of bytes required to fill `buf`.
1835    ///
1836    /// On success, returns the total number of bytes read.
1837    ///
1838    /// # Examples
1839    ///
1840    /// ```
1841    /// use futures_lite::io::{AsyncReadExt, Cursor};
1842    ///
1843    /// # spin_on::spin_on(async {
1844    /// let mut reader = Cursor::new(&b"hello");
1845    /// let mut contents = vec![0; 3];
1846    ///
1847    /// reader.read_exact(&mut contents).await?;
1848    /// assert_eq!(contents, b"hel");
1849    /// # std::io::Result::Ok(()) });
1850    /// ```
1851    fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExactFuture<'a, Self>
1852    where
1853        Self: Unpin,
1854    {
1855        ReadExactFuture { reader: self, buf }
1856    }
1857
1858    /// Creates an adapter which will read at most `limit` bytes from it.
1859    ///
1860    /// This method returns a new instance of [`AsyncRead`] which will read at most
1861    /// `limit` bytes, after which it will always return `Ok(0)` indicating EOF.
1862    ///
1863    /// # Examples
1864    ///
1865    /// ```
1866    /// use futures_lite::io::{AsyncReadExt, Cursor};
1867    ///
1868    /// # spin_on::spin_on(async {
1869    /// let mut reader = Cursor::new(&b"hello");
1870    /// let mut contents = String::new();
1871    ///
1872    /// let n = reader.take(3).read_to_string(&mut contents).await?;
1873    /// assert_eq!(n, 3);
1874    /// assert_eq!(contents, "hel");
1875    /// # std::io::Result::Ok(()) });
1876    /// ```
1877    fn take(self, limit: u64) -> Take<Self>
1878    where
1879        Self: Sized,
1880    {
1881        Take { inner: self, limit }
1882    }
1883
1884    /// Converts this [`AsyncRead`] into a [`Stream`] of bytes.
1885    ///
1886    /// The returned type implements [`Stream`] where `Item` is `io::Result<u8>`.
1887    ///
1888    /// ```
1889    /// use futures_lite::io::{AsyncReadExt, Cursor};
1890    /// use futures_lite::stream::StreamExt;
1891    ///
1892    /// # spin_on::spin_on(async {
1893    /// let reader = Cursor::new(&b"hello");
1894    /// let mut bytes = reader.bytes();
1895    ///
1896    /// while let Some(byte) = bytes.next().await {
1897    ///     println!("byte: {}", byte?);
1898    /// }
1899    /// # std::io::Result::Ok(()) });
1900    /// ```
1901    fn bytes(self) -> Bytes<Self>
1902    where
1903        Self: Sized,
1904    {
1905        Bytes { inner: self }
1906    }
1907
1908    /// Creates an adapter which will chain this stream with another.
1909    ///
1910    /// The returned [`AsyncRead`] instance will first read all bytes from this reader
1911    /// until EOF is found, and then continue with `next`.
1912    ///
1913    /// # Examples
1914    ///
1915    /// ```
1916    /// use futures_lite::io::{AsyncReadExt, Cursor};
1917    ///
1918    /// # spin_on::spin_on(async {
1919    /// let r1 = Cursor::new(&b"hello");
1920    /// let r2 = Cursor::new(&b"world");
1921    /// let mut reader = r1.chain(r2);
1922    ///
1923    /// let mut contents = String::new();
1924    /// reader.read_to_string(&mut contents).await?;
1925    /// assert_eq!(contents, "helloworld");
1926    /// # std::io::Result::Ok(()) });
1927    /// ```
1928    fn chain<R: AsyncRead>(self, next: R) -> Chain<Self, R>
1929    where
1930        Self: Sized,
1931    {
1932        Chain {
1933            first: self,
1934            second: next,
1935            done_first: false,
1936        }
1937    }
1938
1939    /// Boxes the reader and changes its type to `dyn AsyncRead + Send + 'a`.
1940    ///
1941    /// # Examples
1942    ///
1943    /// ```
1944    /// use futures_lite::io::AsyncReadExt;
1945    ///
1946    /// let reader = [1, 2, 3].boxed_reader();
1947    /// ```
1948    #[cfg(feature = "alloc")]
1949    fn boxed_reader<'a>(self) -> Pin<Box<dyn AsyncRead + Send + 'a>>
1950    where
1951        Self: Sized + Send + 'a,
1952    {
1953        Box::pin(self)
1954    }
1955}
1956
1957impl<R: AsyncRead + ?Sized> AsyncReadExt for R {}
1958
1959/// Future for the [`AsyncReadExt::read()`] method.
1960#[derive(Debug)]
1961#[must_use = "futures do nothing unless you `.await` or poll them"]
1962pub struct ReadFuture<'a, R: Unpin + ?Sized> {
1963    reader: &'a mut R,
1964    buf: &'a mut [u8],
1965}
1966
1967impl<R: Unpin + ?Sized> Unpin for ReadFuture<'_, R> {}
1968
1969impl<R: AsyncRead + Unpin + ?Sized> Future for ReadFuture<'_, R> {
1970    type Output = Result<usize>;
1971
1972    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1973        let Self { reader, buf } = &mut *self;
1974        Pin::new(reader).poll_read(cx, buf)
1975    }
1976}
1977
1978/// Future for the [`AsyncReadExt::read_vectored()`] method.
1979#[derive(Debug)]
1980#[must_use = "futures do nothing unless you `.await` or poll them"]
1981pub struct ReadVectoredFuture<'a, R: Unpin + ?Sized> {
1982    reader: &'a mut R,
1983    bufs: &'a mut [IoSliceMut<'a>],
1984}
1985
1986impl<R: Unpin + ?Sized> Unpin for ReadVectoredFuture<'_, R> {}
1987
1988impl<R: AsyncRead + Unpin + ?Sized> Future for ReadVectoredFuture<'_, R> {
1989    type Output = Result<usize>;
1990
1991    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1992        let Self { reader, bufs } = &mut *self;
1993        Pin::new(reader).poll_read_vectored(cx, bufs)
1994    }
1995}
1996
1997/// Future for the [`AsyncReadExt::read_to_end()`] method.
1998#[derive(Debug)]
1999#[must_use = "futures do nothing unless you `.await` or poll them"]
2000pub struct ReadToEndFuture<'a, R: Unpin + ?Sized> {
2001    reader: &'a mut R,
2002    buf: &'a mut Vec<u8>,
2003    start_len: usize,
2004}
2005
2006impl<R: Unpin + ?Sized> Unpin for ReadToEndFuture<'_, R> {}
2007
2008impl<R: AsyncRead + Unpin + ?Sized> Future for ReadToEndFuture<'_, R> {
2009    type Output = Result<usize>;
2010
2011    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2012        let Self {
2013            reader,
2014            buf,
2015            start_len,
2016        } = &mut *self;
2017        read_to_end_internal(Pin::new(reader), cx, buf, *start_len)
2018    }
2019}
2020
2021/// Future for the [`AsyncReadExt::read_to_string()`] method.
2022#[derive(Debug)]
2023#[must_use = "futures do nothing unless you `.await` or poll them"]
2024pub struct ReadToStringFuture<'a, R: Unpin + ?Sized> {
2025    reader: &'a mut R,
2026    buf: &'a mut String,
2027    bytes: Vec<u8>,
2028    start_len: usize,
2029}
2030
2031impl<R: Unpin + ?Sized> Unpin for ReadToStringFuture<'_, R> {}
2032
2033impl<R: AsyncRead + Unpin + ?Sized> Future for ReadToStringFuture<'_, R> {
2034    type Output = Result<usize>;
2035
2036    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2037        let Self {
2038            reader,
2039            buf,
2040            bytes,
2041            start_len,
2042        } = &mut *self;
2043        let reader = Pin::new(reader);
2044
2045        let ret = ready!(read_to_end_internal(reader, cx, bytes, *start_len));
2046
2047        match String::from_utf8(mem::replace(bytes, Vec::new())) {
2048            Ok(s) => {
2049                debug_assert!(buf.is_empty());
2050                **buf = s;
2051                Poll::Ready(ret)
2052            }
2053            Err(_) => Poll::Ready(ret.and_then(|_| {
2054                Err(Error::new(
2055                    ErrorKind::InvalidData,
2056                    "stream did not contain valid UTF-8",
2057                ))
2058            })),
2059        }
2060    }
2061}
2062
2063// This uses an adaptive system to extend the vector when it fills. We want to
2064// avoid paying to allocate and zero a huge chunk of memory if the reader only
2065// has 4 bytes while still making large reads if the reader does have a ton
2066// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
2067// time is 4,500 times (!) slower than this if the reader has a very small
2068// amount of data to return.
2069//
2070// Because we're extending the buffer with uninitialized data for trusted
2071// readers, we need to make sure to truncate that if any of this panics.
2072fn read_to_end_internal<R: AsyncRead + ?Sized>(
2073    mut rd: Pin<&mut R>,
2074    cx: &mut Context<'_>,
2075    buf: &mut Vec<u8>,
2076    start_len: usize,
2077) -> Poll<Result<usize>> {
2078    struct Guard<'a> {
2079        buf: &'a mut Vec<u8>,
2080        len: usize,
2081    }
2082
2083    impl Drop for Guard<'_> {
2084        fn drop(&mut self) {
2085            self.buf.resize(self.len, 0);
2086        }
2087    }
2088
2089    let mut g = Guard {
2090        len: buf.len(),
2091        buf,
2092    };
2093    let ret;
2094    loop {
2095        if g.len == g.buf.len() {
2096            g.buf.reserve(32);
2097            let capacity = g.buf.capacity();
2098            g.buf.resize(capacity, 0);
2099        }
2100
2101        match ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) {
2102            Ok(0) => {
2103                ret = Poll::Ready(Ok(g.len - start_len));
2104                break;
2105            }
2106            Ok(n) => g.len += n,
2107            Err(e) => {
2108                ret = Poll::Ready(Err(e));
2109                break;
2110            }
2111        }
2112    }
2113
2114    ret
2115}
2116
2117/// Future for the [`AsyncReadExt::read_exact()`] method.
2118#[derive(Debug)]
2119#[must_use = "futures do nothing unless you `.await` or poll them"]
2120pub struct ReadExactFuture<'a, R: Unpin + ?Sized> {
2121    reader: &'a mut R,
2122    buf: &'a mut [u8],
2123}
2124
2125impl<R: Unpin + ?Sized> Unpin for ReadExactFuture<'_, R> {}
2126
2127impl<R: AsyncRead + Unpin + ?Sized> Future for ReadExactFuture<'_, R> {
2128    type Output = Result<()>;
2129
2130    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2131        let Self { reader, buf } = &mut *self;
2132
2133        while !buf.is_empty() {
2134            let n = ready!(Pin::new(&mut *reader).poll_read(cx, buf))?;
2135            let (_, rest) = mem::replace(buf, &mut []).split_at_mut(n);
2136            *buf = rest;
2137
2138            if n == 0 {
2139                return Poll::Ready(Err(ErrorKind::UnexpectedEof.into()));
2140            }
2141        }
2142
2143        Poll::Ready(Ok(()))
2144    }
2145}
2146
2147pin_project! {
2148    /// Reader for the [`AsyncReadExt::take()`] method.
2149    #[derive(Debug)]
2150    pub struct Take<R> {
2151        #[pin]
2152        inner: R,
2153        limit: u64,
2154    }
2155}
2156
2157impl<R> Take<R> {
2158    /// Returns the number of bytes before this adapter will return EOF.
2159    ///
2160    /// Note that EOF may be reached sooner if the underlying reader is shorter than the limit.
2161    ///
2162    /// # Examples
2163    ///
2164    /// ```
2165    /// use futures_lite::io::{AsyncReadExt, Cursor};
2166    ///
2167    /// let reader = Cursor::new("hello");
2168    ///
2169    /// let reader = reader.take(3);
2170    /// assert_eq!(reader.limit(), 3);
2171    /// ```
2172    pub fn limit(&self) -> u64 {
2173        self.limit
2174    }
2175
2176    /// Puts a limit on the number of bytes.
2177    ///
2178    /// Changing the limit is equivalent to creating a new adapter with [`AsyncReadExt::take()`].
2179    ///
2180    /// # Examples
2181    ///
2182    /// ```
2183    /// use futures_lite::io::{AsyncReadExt, Cursor};
2184    ///
2185    /// let reader = Cursor::new("hello");
2186    ///
2187    /// let mut reader = reader.take(10);
2188    /// assert_eq!(reader.limit(), 10);
2189    ///
2190    /// reader.set_limit(3);
2191    /// assert_eq!(reader.limit(), 3);
2192    /// ```
2193    pub fn set_limit(&mut self, limit: u64) {
2194        self.limit = limit;
2195    }
2196
2197    /// Gets a reference to the underlying reader.
2198    ///
2199    /// # Examples
2200    ///
2201    /// ```
2202    /// use futures_lite::io::{AsyncReadExt, Cursor};
2203    ///
2204    /// let reader = Cursor::new("hello");
2205    ///
2206    /// let reader = reader.take(3);
2207    /// let r = reader.get_ref();
2208    /// ```
2209    pub fn get_ref(&self) -> &R {
2210        &self.inner
2211    }
2212
2213    /// Gets a mutable reference to the underlying reader.
2214    ///
2215    /// # Examples
2216    ///
2217    /// ```
2218    /// use futures_lite::io::{AsyncReadExt, Cursor};
2219    ///
2220    /// let reader = Cursor::new("hello");
2221    ///
2222    /// let mut reader = reader.take(3);
2223    /// let r = reader.get_mut();
2224    /// ```
2225    pub fn get_mut(&mut self) -> &mut R {
2226        &mut self.inner
2227    }
2228
2229    /// Unwraps the adapter, returning the underlying reader.
2230    ///
2231    /// # Examples
2232    ///
2233    /// ```
2234    /// use futures_lite::io::{AsyncReadExt, Cursor};
2235    ///
2236    /// let reader = Cursor::new("hello");
2237    ///
2238    /// let reader = reader.take(3);
2239    /// let reader = reader.into_inner();
2240    /// ```
2241    pub fn into_inner(self) -> R {
2242        self.inner
2243    }
2244}
2245
2246impl<R: AsyncRead> AsyncRead for Take<R> {
2247    fn poll_read(
2248        self: Pin<&mut Self>,
2249        cx: &mut Context<'_>,
2250        buf: &mut [u8],
2251    ) -> Poll<Result<usize>> {
2252        let this = self.project();
2253        take_read_internal(this.inner, cx, buf, this.limit)
2254    }
2255}
2256
2257fn take_read_internal<R: AsyncRead + ?Sized>(
2258    mut rd: Pin<&mut R>,
2259    cx: &mut Context<'_>,
2260    buf: &mut [u8],
2261    limit: &mut u64,
2262) -> Poll<Result<usize>> {
2263    // Don't call into inner reader at all at EOF because it may still block
2264    if *limit == 0 {
2265        return Poll::Ready(Ok(0));
2266    }
2267
2268    let max = cmp::min(buf.len() as u64, *limit) as usize;
2269
2270    match ready!(rd.as_mut().poll_read(cx, &mut buf[..max])) {
2271        Ok(n) => {
2272            *limit -= n as u64;
2273            Poll::Ready(Ok(n))
2274        }
2275        Err(e) => Poll::Ready(Err(e)),
2276    }
2277}
2278
2279impl<R: AsyncBufRead> AsyncBufRead for Take<R> {
2280    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
2281        let this = self.project();
2282
2283        if *this.limit == 0 {
2284            return Poll::Ready(Ok(&[]));
2285        }
2286
2287        match ready!(this.inner.poll_fill_buf(cx)) {
2288            Ok(buf) => {
2289                let cap = cmp::min(buf.len() as u64, *this.limit) as usize;
2290                Poll::Ready(Ok(&buf[..cap]))
2291            }
2292            Err(e) => Poll::Ready(Err(e)),
2293        }
2294    }
2295
2296    fn consume(self: Pin<&mut Self>, amt: usize) {
2297        let this = self.project();
2298        // Don't let callers reset the limit by passing an overlarge value
2299        let amt = cmp::min(amt as u64, *this.limit) as usize;
2300        *this.limit -= amt as u64;
2301
2302        this.inner.consume(amt);
2303    }
2304}
2305
2306pin_project! {
2307    /// Reader for the [`AsyncReadExt::bytes()`] method.
2308    #[derive(Debug)]
2309    pub struct Bytes<R> {
2310        #[pin]
2311        inner: R,
2312    }
2313}
2314
2315impl<R: AsyncRead + Unpin> Stream for Bytes<R> {
2316    type Item = Result<u8>;
2317
2318    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2319        let mut byte = 0;
2320
2321        let rd = Pin::new(&mut self.inner);
2322
2323        match ready!(rd.poll_read(cx, std::slice::from_mut(&mut byte))) {
2324            Ok(0) => Poll::Ready(None),
2325            Ok(..) => Poll::Ready(Some(Ok(byte))),
2326            Err(ref e) if e.kind() == ErrorKind::Interrupted => Poll::Pending,
2327            Err(e) => Poll::Ready(Some(Err(e))),
2328        }
2329    }
2330}
2331
2332impl<R: AsyncRead> AsyncRead for Bytes<R> {
2333    fn poll_read(
2334        self: Pin<&mut Self>,
2335        cx: &mut Context<'_>,
2336        buf: &mut [u8],
2337    ) -> Poll<Result<usize>> {
2338        self.project().inner.poll_read(cx, buf)
2339    }
2340
2341    fn poll_read_vectored(
2342        self: Pin<&mut Self>,
2343        cx: &mut Context<'_>,
2344        bufs: &mut [IoSliceMut<'_>],
2345    ) -> Poll<Result<usize>> {
2346        self.project().inner.poll_read_vectored(cx, bufs)
2347    }
2348}
2349
2350pin_project! {
2351    /// Reader for the [`AsyncReadExt::chain()`] method.
2352    pub struct Chain<R1, R2> {
2353        #[pin]
2354        first: R1,
2355        #[pin]
2356        second: R2,
2357        done_first: bool,
2358    }
2359}
2360
2361impl<R1, R2> Chain<R1, R2> {
2362    /// Gets references to the underlying readers.
2363    ///
2364    /// # Examples
2365    ///
2366    /// ```
2367    /// use futures_lite::io::{AsyncReadExt, Cursor};
2368    ///
2369    /// let r1 = Cursor::new(b"hello");
2370    /// let r2 = Cursor::new(b"world");
2371    ///
2372    /// let reader = r1.chain(r2);
2373    /// let (r1, r2) = reader.get_ref();
2374    /// ```
2375    pub fn get_ref(&self) -> (&R1, &R2) {
2376        (&self.first, &self.second)
2377    }
2378
2379    /// Gets mutable references to the underlying readers.
2380    ///
2381    /// # Examples
2382    ///
2383    /// ```
2384    /// use futures_lite::io::{AsyncReadExt, Cursor};
2385    ///
2386    /// let r1 = Cursor::new(b"hello");
2387    /// let r2 = Cursor::new(b"world");
2388    ///
2389    /// let mut reader = r1.chain(r2);
2390    /// let (r1, r2) = reader.get_mut();
2391    /// ```
2392    pub fn get_mut(&mut self) -> (&mut R1, &mut R2) {
2393        (&mut self.first, &mut self.second)
2394    }
2395
2396    /// Unwraps the adapter, returning the underlying readers.
2397    ///
2398    /// # Examples
2399    ///
2400    /// ```
2401    /// use futures_lite::io::{AsyncReadExt, Cursor};
2402    ///
2403    /// let r1 = Cursor::new(b"hello");
2404    /// let r2 = Cursor::new(b"world");
2405    ///
2406    /// let reader = r1.chain(r2);
2407    /// let (r1, r2) = reader.into_inner();
2408    /// ```
2409    pub fn into_inner(self) -> (R1, R2) {
2410        (self.first, self.second)
2411    }
2412}
2413
2414impl<R1: fmt::Debug, R2: fmt::Debug> fmt::Debug for Chain<R1, R2> {
2415    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2416        f.debug_struct("Chain")
2417            .field("r1", &self.first)
2418            .field("r2", &self.second)
2419            .finish()
2420    }
2421}
2422
2423impl<R1: AsyncRead, R2: AsyncRead> AsyncRead for Chain<R1, R2> {
2424    fn poll_read(
2425        self: Pin<&mut Self>,
2426        cx: &mut Context<'_>,
2427        buf: &mut [u8],
2428    ) -> Poll<Result<usize>> {
2429        let this = self.project();
2430        if !*this.done_first {
2431            match ready!(this.first.poll_read(cx, buf)) {
2432                Ok(0) if !buf.is_empty() => *this.done_first = true,
2433                Ok(n) => return Poll::Ready(Ok(n)),
2434                Err(err) => return Poll::Ready(Err(err)),
2435            }
2436        }
2437
2438        this.second.poll_read(cx, buf)
2439    }
2440
2441    fn poll_read_vectored(
2442        self: Pin<&mut Self>,
2443        cx: &mut Context<'_>,
2444        bufs: &mut [IoSliceMut<'_>],
2445    ) -> Poll<Result<usize>> {
2446        let this = self.project();
2447        if !*this.done_first {
2448            match ready!(this.first.poll_read_vectored(cx, bufs)) {
2449                Ok(0) if !bufs.is_empty() => *this.done_first = true,
2450                Ok(n) => return Poll::Ready(Ok(n)),
2451                Err(err) => return Poll::Ready(Err(err)),
2452            }
2453        }
2454
2455        this.second.poll_read_vectored(cx, bufs)
2456    }
2457}
2458
2459impl<R1: AsyncBufRead, R2: AsyncBufRead> AsyncBufRead for Chain<R1, R2> {
2460    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
2461        let this = self.project();
2462        if !*this.done_first {
2463            match ready!(this.first.poll_fill_buf(cx)) {
2464                Ok(buf) if buf.is_empty() => {
2465                    *this.done_first = true;
2466                }
2467                Ok(buf) => return Poll::Ready(Ok(buf)),
2468                Err(err) => return Poll::Ready(Err(err)),
2469            }
2470        }
2471
2472        this.second.poll_fill_buf(cx)
2473    }
2474
2475    fn consume(self: Pin<&mut Self>, amt: usize) {
2476        let this = self.project();
2477        if !*this.done_first {
2478            this.first.consume(amt)
2479        } else {
2480            this.second.consume(amt)
2481        }
2482    }
2483}
2484
2485/// Extension trait for [`AsyncSeek`].
2486pub trait AsyncSeekExt: AsyncSeek {
2487    /// Seeks to a new position in a byte stream.
2488    ///
2489    /// Returns the new position in the byte stream.
2490    ///
2491    /// A seek beyond the end of stream is allowed, but behavior is defined by the implementation.
2492    ///
2493    /// # Examples
2494    ///
2495    /// ```
2496    /// use futures_lite::io::{AsyncSeekExt, Cursor, SeekFrom};
2497    ///
2498    /// # spin_on::spin_on(async {
2499    /// let mut cursor = Cursor::new("hello");
2500    ///
2501    /// // Move the cursor to the end.
2502    /// cursor.seek(SeekFrom::End(0)).await?;
2503    ///
2504    /// // Check the current position.
2505    /// assert_eq!(cursor.seek(SeekFrom::Current(0)).await?, 5);
2506    /// # std::io::Result::Ok(()) });
2507    /// ```
2508    fn seek(&mut self, pos: SeekFrom) -> SeekFuture<'_, Self>
2509    where
2510        Self: Unpin,
2511    {
2512        SeekFuture { seeker: self, pos }
2513    }
2514}
2515
2516impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {}
2517
2518/// Future for the [`AsyncSeekExt::seek()`] method.
2519#[derive(Debug)]
2520#[must_use = "futures do nothing unless you `.await` or poll them"]
2521pub struct SeekFuture<'a, S: Unpin + ?Sized> {
2522    seeker: &'a mut S,
2523    pos: SeekFrom,
2524}
2525
2526impl<S: Unpin + ?Sized> Unpin for SeekFuture<'_, S> {}
2527
2528impl<S: AsyncSeek + Unpin + ?Sized> Future for SeekFuture<'_, S> {
2529    type Output = Result<u64>;
2530
2531    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2532        let pos = self.pos;
2533        Pin::new(&mut *self.seeker).poll_seek(cx, pos)
2534    }
2535}
2536
2537/// Extension trait for [`AsyncWrite`].
2538pub trait AsyncWriteExt: AsyncWrite {
2539    /// Writes some bytes into the byte stream.
2540    ///
2541    /// Returns the number of bytes written from the start of the buffer.
2542    ///
2543    /// If the return value is `Ok(n)` then it must be guaranteed that
2544    /// `0 <= n <= buf.len()`. A return value of `0` typically means that the underlying
2545    /// object is no longer able to accept bytes and will likely not be able to in the
2546    /// future as well, or that the provided buffer is empty.
2547    ///
2548    /// # Examples
2549    ///
2550    /// ```
2551    /// use futures_lite::io::{AsyncWriteExt, BufWriter};
2552    ///
2553    /// # spin_on::spin_on(async {
2554    /// let mut output = Vec::new();
2555    /// let mut writer = BufWriter::new(&mut output);
2556    ///
2557    /// let n = writer.write(b"hello").await?;
2558    /// # std::io::Result::Ok(()) });
2559    /// ```
2560    fn write<'a>(&'a mut self, buf: &'a [u8]) -> WriteFuture<'a, Self>
2561    where
2562        Self: Unpin,
2563    {
2564        WriteFuture { writer: self, buf }
2565    }
2566
2567    /// Like [`write()`][`AsyncWriteExt::write()`], except that it writes a slice of buffers.
2568    ///
2569    /// Data is copied from each buffer in order, with the final buffer possibly being only
2570    /// partially consumed. This method must behave same as a call to
2571    /// [`write()`][`AsyncWriteExt::write()`] with the buffers concatenated would.
2572    fn write_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'a>]) -> WriteVectoredFuture<'a, Self>
2573    where
2574        Self: Unpin,
2575    {
2576        WriteVectoredFuture { writer: self, bufs }
2577    }
2578
2579    /// Writes an entire buffer into the byte stream.
2580    ///
2581    /// This method will keep calling [`write()`][`AsyncWriteExt::write()`] until there is no more
2582    /// data to be written or an error occurs. It will not return before the entire buffer is
2583    /// successfully written or an error occurs.
2584    ///
2585    /// # Examples
2586    ///
2587    /// ```
2588    /// use futures_lite::io::{AsyncWriteExt, BufWriter};
2589    ///
2590    /// # spin_on::spin_on(async {
2591    /// let mut output = Vec::new();
2592    /// let mut writer = BufWriter::new(&mut output);
2593    ///
2594    /// let n = writer.write_all(b"hello").await?;
2595    /// # std::io::Result::Ok(()) });
2596    /// ```
2597    fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAllFuture<'a, Self>
2598    where
2599        Self: Unpin,
2600    {
2601        WriteAllFuture { writer: self, buf }
2602    }
2603
2604    /// Flushes the stream to ensure that all buffered contents reach their destination.
2605    ///
2606    /// # Examples
2607    ///
2608    /// ```
2609    /// use futures_lite::io::{AsyncWriteExt, BufWriter};
2610    ///
2611    /// # spin_on::spin_on(async {
2612    /// let mut output = Vec::new();
2613    /// let mut writer = BufWriter::new(&mut output);
2614    ///
2615    /// writer.write_all(b"hello").await?;
2616    /// writer.flush().await?;
2617    /// # std::io::Result::Ok(()) });
2618    /// ```
2619    fn flush(&mut self) -> FlushFuture<'_, Self>
2620    where
2621        Self: Unpin,
2622    {
2623        FlushFuture { writer: self }
2624    }
2625
2626    /// Closes the writer.
2627    ///
2628    /// # Examples
2629    ///
2630    /// ```
2631    /// use futures_lite::io::{AsyncWriteExt, BufWriter};
2632    ///
2633    /// # spin_on::spin_on(async {
2634    /// let mut output = Vec::new();
2635    /// let mut writer = BufWriter::new(&mut output);
2636    ///
2637    /// writer.close().await?;
2638    /// # std::io::Result::Ok(()) });
2639    /// ```
2640    fn close(&mut self) -> CloseFuture<'_, Self>
2641    where
2642        Self: Unpin,
2643    {
2644        CloseFuture { writer: self }
2645    }
2646
2647    /// Boxes the writer and changes its type to `dyn AsyncWrite + Send + 'a`.
2648    ///
2649    /// # Examples
2650    ///
2651    /// ```
2652    /// use futures_lite::io::AsyncWriteExt;
2653    ///
2654    /// let writer = Vec::<u8>::new().boxed_writer();
2655    /// ```
2656    #[cfg(feature = "alloc")]
2657    fn boxed_writer<'a>(self) -> Pin<Box<dyn AsyncWrite + Send + 'a>>
2658    where
2659        Self: Sized + Send + 'a,
2660    {
2661        Box::pin(self)
2662    }
2663}
2664
2665impl<W: AsyncWrite + ?Sized> AsyncWriteExt for W {}
2666
2667/// Future for the [`AsyncWriteExt::write()`] method.
2668#[derive(Debug)]
2669#[must_use = "futures do nothing unless you `.await` or poll them"]
2670pub struct WriteFuture<'a, W: Unpin + ?Sized> {
2671    writer: &'a mut W,
2672    buf: &'a [u8],
2673}
2674
2675impl<W: Unpin + ?Sized> Unpin for WriteFuture<'_, W> {}
2676
2677impl<W: AsyncWrite + Unpin + ?Sized> Future for WriteFuture<'_, W> {
2678    type Output = Result<usize>;
2679
2680    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2681        let buf = self.buf;
2682        Pin::new(&mut *self.writer).poll_write(cx, buf)
2683    }
2684}
2685
2686/// Future for the [`AsyncWriteExt::write_vectored()`] method.
2687#[derive(Debug)]
2688#[must_use = "futures do nothing unless you `.await` or poll them"]
2689pub struct WriteVectoredFuture<'a, W: Unpin + ?Sized> {
2690    writer: &'a mut W,
2691    bufs: &'a [IoSlice<'a>],
2692}
2693
2694impl<W: Unpin + ?Sized> Unpin for WriteVectoredFuture<'_, W> {}
2695
2696impl<W: AsyncWrite + Unpin + ?Sized> Future for WriteVectoredFuture<'_, W> {
2697    type Output = Result<usize>;
2698
2699    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2700        let bufs = self.bufs;
2701        Pin::new(&mut *self.writer).poll_write_vectored(cx, bufs)
2702    }
2703}
2704
2705/// Future for the [`AsyncWriteExt::write_all()`] method.
2706#[derive(Debug)]
2707#[must_use = "futures do nothing unless you `.await` or poll them"]
2708pub struct WriteAllFuture<'a, W: Unpin + ?Sized> {
2709    writer: &'a mut W,
2710    buf: &'a [u8],
2711}
2712
2713impl<W: Unpin + ?Sized> Unpin for WriteAllFuture<'_, W> {}
2714
2715impl<W: AsyncWrite + Unpin + ?Sized> Future for WriteAllFuture<'_, W> {
2716    type Output = Result<()>;
2717
2718    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2719        let Self { writer, buf } = &mut *self;
2720
2721        while !buf.is_empty() {
2722            let n = ready!(Pin::new(&mut **writer).poll_write(cx, buf))?;
2723            let (_, rest) = mem::replace(buf, &[]).split_at(n);
2724            *buf = rest;
2725
2726            if n == 0 {
2727                return Poll::Ready(Err(ErrorKind::WriteZero.into()));
2728            }
2729        }
2730
2731        Poll::Ready(Ok(()))
2732    }
2733}
2734
2735/// Future for the [`AsyncWriteExt::flush()`] method.
2736#[derive(Debug)]
2737#[must_use = "futures do nothing unless you `.await` or poll them"]
2738pub struct FlushFuture<'a, W: Unpin + ?Sized> {
2739    writer: &'a mut W,
2740}
2741
2742impl<W: Unpin + ?Sized> Unpin for FlushFuture<'_, W> {}
2743
2744impl<W: AsyncWrite + Unpin + ?Sized> Future for FlushFuture<'_, W> {
2745    type Output = Result<()>;
2746
2747    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2748        Pin::new(&mut *self.writer).poll_flush(cx)
2749    }
2750}
2751
2752/// Future for the [`AsyncWriteExt::close()`] method.
2753#[derive(Debug)]
2754#[must_use = "futures do nothing unless you `.await` or poll them"]
2755pub struct CloseFuture<'a, W: Unpin + ?Sized> {
2756    writer: &'a mut W,
2757}
2758
2759impl<W: Unpin + ?Sized> Unpin for CloseFuture<'_, W> {}
2760
2761impl<W: AsyncWrite + Unpin + ?Sized> Future for CloseFuture<'_, W> {
2762    type Output = Result<()>;
2763
2764    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2765        Pin::new(&mut *self.writer).poll_close(cx)
2766    }
2767}
2768
2769/// Type alias for `Pin<Box<dyn AsyncRead + Send + 'static>>`.
2770///
2771/// # Examples
2772///
2773/// ```
2774/// use futures_lite::io::AsyncReadExt;
2775///
2776/// let reader = [1, 2, 3].boxed_reader();
2777/// ```
2778#[cfg(feature = "alloc")]
2779pub type BoxedReader = Pin<Box<dyn AsyncRead + Send + 'static>>;
2780
2781/// Type alias for `Pin<Box<dyn AsyncWrite + Send + 'static>>`.
2782///
2783/// # Examples
2784///
2785/// ```
2786/// use futures_lite::io::AsyncWriteExt;
2787///
2788/// let writer = Vec::<u8>::new().boxed_writer();
2789/// ```
2790#[cfg(feature = "alloc")]
2791pub type BoxedWriter = Pin<Box<dyn AsyncWrite + Send + 'static>>;
2792
2793/// Splits a stream into [`AsyncRead`] and [`AsyncWrite`] halves.
2794///
2795/// # Examples
2796///
2797/// ```
2798/// use futures_lite::io::{self, Cursor};
2799///
2800/// # spin_on::spin_on(async {
2801/// let stream = Cursor::new(vec![]);
2802/// let (mut reader, mut writer) = io::split(stream);
2803/// # std::io::Result::Ok(()) });
2804/// ```
2805pub fn split<T>(stream: T) -> (ReadHalf<T>, WriteHalf<T>)
2806where
2807    T: AsyncRead + AsyncWrite + Unpin,
2808{
2809    let inner = Arc::new(Mutex::new(stream));
2810    (ReadHalf(inner.clone()), WriteHalf(inner))
2811}
2812
2813/// The read half returned by [`split()`].
2814#[derive(Debug)]
2815pub struct ReadHalf<T>(Arc<Mutex<T>>);
2816
2817/// The write half returned by [`split()`].
2818#[derive(Debug)]
2819pub struct WriteHalf<T>(Arc<Mutex<T>>);
2820
2821impl<T: AsyncRead + Unpin> AsyncRead for ReadHalf<T> {
2822    fn poll_read(
2823        self: Pin<&mut Self>,
2824        cx: &mut Context<'_>,
2825        buf: &mut [u8],
2826    ) -> Poll<Result<usize>> {
2827        let mut inner = self.0.lock().unwrap();
2828        Pin::new(&mut *inner).poll_read(cx, buf)
2829    }
2830
2831    fn poll_read_vectored(
2832        self: Pin<&mut Self>,
2833        cx: &mut Context<'_>,
2834        bufs: &mut [IoSliceMut<'_>],
2835    ) -> Poll<Result<usize>> {
2836        let mut inner = self.0.lock().unwrap();
2837        Pin::new(&mut *inner).poll_read_vectored(cx, bufs)
2838    }
2839}
2840
2841impl<T: AsyncWrite + Unpin> AsyncWrite for WriteHalf<T> {
2842    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
2843        let mut inner = self.0.lock().unwrap();
2844        Pin::new(&mut *inner).poll_write(cx, buf)
2845    }
2846
2847    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
2848        let mut inner = self.0.lock().unwrap();
2849        Pin::new(&mut *inner).poll_flush(cx)
2850    }
2851
2852    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
2853        let mut inner = self.0.lock().unwrap();
2854        Pin::new(&mut *inner).poll_close(cx)
2855    }
2856}