futures_lite/io.rs
1//! Tools and combinators for I/O.
2//!
3//! # Examples
4//!
5//! ```
6//! use futures_lite::io::{self, AsyncReadExt};
7//!
8//! # spin_on::spin_on(async {
9//! let input: &[u8] = b"hello";
10//! let mut reader = io::BufReader::new(input);
11//!
12//! let mut contents = String::new();
13//! reader.read_to_string(&mut contents).await?;
14//! # std::io::Result::Ok(()) });
15//! ```
16
17#[doc(no_inline)]
18pub use std::io::{Error, ErrorKind, Result, SeekFrom};
19
20#[doc(no_inline)]
21pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
22
23use std::cmp;
24use std::fmt;
25use std::future::Future;
26use std::io::{IoSlice, IoSliceMut};
27use std::mem;
28use std::pin::Pin;
29use std::sync::{Arc, Mutex};
30use std::task::{Context, Poll};
31
32use futures_core::stream::Stream;
33use pin_project_lite::pin_project;
34
35use crate::future;
36use crate::ready;
37
38const DEFAULT_BUF_SIZE: usize = 8 * 1024;
39
40/// Copies the entire contents of a reader into a writer.
41///
42/// This function will read data from `reader` and write it into `writer` in a streaming fashion
43/// until `reader` returns EOF.
44///
45/// On success, returns the total number of bytes copied.
46///
47/// # Examples
48///
49/// ```
50/// use futures_lite::io::{self, BufReader, BufWriter};
51///
52/// # spin_on::spin_on(async {
53/// let input: &[u8] = b"hello";
54/// let reader = BufReader::new(input);
55///
56/// let mut output = Vec::new();
57/// let writer = BufWriter::new(&mut output);
58///
59/// io::copy(reader, writer).await?;
60/// # std::io::Result::Ok(()) });
61/// ```
62pub async fn copy<R, W>(reader: R, writer: W) -> Result<u64>
63where
64 R: AsyncRead + Unpin,
65 W: AsyncWrite + Unpin,
66{
67 pin_project! {
68 struct CopyFuture<R, W> {
69 #[pin]
70 reader: R,
71 #[pin]
72 writer: W,
73 amt: u64,
74 }
75 }
76
77 impl<R, W> Future for CopyFuture<R, W>
78 where
79 R: AsyncBufRead,
80 W: AsyncWrite + Unpin,
81 {
82 type Output = Result<u64>;
83
84 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
85 let mut this = self.project();
86 loop {
87 let buffer = ready!(this.reader.as_mut().poll_fill_buf(cx))?;
88 if buffer.is_empty() {
89 ready!(this.writer.as_mut().poll_flush(cx))?;
90 return Poll::Ready(Ok(*this.amt));
91 }
92
93 let i = ready!(this.writer.as_mut().poll_write(cx, buffer))?;
94 if i == 0 {
95 return Poll::Ready(Err(ErrorKind::WriteZero.into()));
96 }
97 *this.amt += i as u64;
98 this.reader.as_mut().consume(i);
99 }
100 }
101 }
102
103 let future = CopyFuture {
104 reader: BufReader::new(reader),
105 writer,
106 amt: 0,
107 };
108 future.await
109}
110
111/// Asserts that a type implementing [`std::io`] traits can be used as an async type.
112///
113/// The underlying I/O handle should never block nor return the [`ErrorKind::WouldBlock`] error.
114/// This is usually the case for in-memory buffered I/O.
115///
116/// # Examples
117///
118/// ```
119/// use futures_lite::io::{AssertAsync, AsyncReadExt};
120///
121/// let reader: &[u8] = b"hello";
122///
123/// # spin_on::spin_on(async {
124/// let mut async_reader = AssertAsync::new(reader);
125/// let mut contents = String::new();
126///
127/// // This line works in async manner - note that there is await:
128/// async_reader.read_to_string(&mut contents).await?;
129/// # std::io::Result::Ok(()) });
130/// ```
131#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
132pub struct AssertAsync<T>(T);
133
134impl<T> Unpin for AssertAsync<T> {}
135
136impl<T> AssertAsync<T> {
137 /// Wraps an I/O handle implementing [`std::io`] traits.
138 ///
139 /// # Examples
140 ///
141 /// ```
142 /// use futures_lite::io::AssertAsync;
143 ///
144 /// let reader: &[u8] = b"hello";
145 ///
146 /// let async_reader = AssertAsync::new(reader);
147 /// ```
148 pub fn new(io: T) -> Self {
149 AssertAsync(io)
150 }
151
152 /// Gets a reference to the inner I/O handle.
153 ///
154 /// # Examples
155 ///
156 /// ```
157 /// use futures_lite::io::AssertAsync;
158 ///
159 /// let reader: &[u8] = b"hello";
160 ///
161 /// let async_reader = AssertAsync::new(reader);
162 /// let r = async_reader.get_ref();
163 /// ```
164 pub fn get_ref(&self) -> &T {
165 &self.0
166 }
167
168 /// Gets a mutable reference to the inner I/O handle.
169 ///
170 /// # Examples
171 ///
172 /// ```
173 /// use futures_lite::io::AssertAsync;
174 ///
175 /// let reader: &[u8] = b"hello";
176 ///
177 /// let mut async_reader = AssertAsync::new(reader);
178 /// let r = async_reader.get_mut();
179 /// ```
180 pub fn get_mut(&mut self) -> &mut T {
181 &mut self.0
182 }
183
184 /// Extracts the inner I/O handle.
185 ///
186 /// # Examples
187 ///
188 /// ```
189 /// use futures_lite::io::AssertAsync;
190 ///
191 /// let reader: &[u8] = b"hello";
192 ///
193 /// let async_reader = AssertAsync::new(reader);
194 /// let inner = async_reader.into_inner();
195 /// ```
196 pub fn into_inner(self) -> T {
197 self.0
198 }
199}
200
201impl<T: std::io::Read> AsyncRead for AssertAsync<T> {
202 fn poll_read(
203 mut self: Pin<&mut Self>,
204 _: &mut Context<'_>,
205 buf: &mut [u8],
206 ) -> Poll<Result<usize>> {
207 loop {
208 match self.0.read(buf) {
209 Err(err) if err.kind() == ErrorKind::Interrupted => {}
210 res => return Poll::Ready(res),
211 }
212 }
213 }
214
215 fn poll_read_vectored(
216 mut self: Pin<&mut Self>,
217 _: &mut Context<'_>,
218 bufs: &mut [IoSliceMut<'_>],
219 ) -> Poll<Result<usize>> {
220 loop {
221 match self.0.read_vectored(bufs) {
222 Err(err) if err.kind() == ErrorKind::Interrupted => {}
223 res => return Poll::Ready(res),
224 }
225 }
226 }
227}
228
229impl<T: std::io::Write> AsyncWrite for AssertAsync<T> {
230 fn poll_write(
231 mut self: Pin<&mut Self>,
232 _: &mut Context<'_>,
233 buf: &[u8],
234 ) -> Poll<Result<usize>> {
235 loop {
236 match self.0.write(buf) {
237 Err(err) if err.kind() == ErrorKind::Interrupted => {}
238 res => return Poll::Ready(res),
239 }
240 }
241 }
242
243 fn poll_write_vectored(
244 mut self: Pin<&mut Self>,
245 _: &mut Context<'_>,
246 bufs: &[IoSlice<'_>],
247 ) -> Poll<Result<usize>> {
248 loop {
249 match self.0.write_vectored(bufs) {
250 Err(err) if err.kind() == ErrorKind::Interrupted => {}
251 res => return Poll::Ready(res),
252 }
253 }
254 }
255
256 fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
257 loop {
258 match self.0.flush() {
259 Err(err) if err.kind() == ErrorKind::Interrupted => {}
260 res => return Poll::Ready(res),
261 }
262 }
263 }
264
265 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
266 self.poll_flush(cx)
267 }
268}
269
270impl<T: std::io::Seek> AsyncSeek for AssertAsync<T> {
271 fn poll_seek(
272 mut self: Pin<&mut Self>,
273 _: &mut Context<'_>,
274 pos: SeekFrom,
275 ) -> Poll<Result<u64>> {
276 loop {
277 match self.0.seek(pos) {
278 Err(err) if err.kind() == ErrorKind::Interrupted => {}
279 res => return Poll::Ready(res),
280 }
281 }
282 }
283}
284
285/// Blocks on all async I/O operations and implements [`std::io`] traits.
286///
287/// Sometimes async I/O needs to be used in a blocking manner. If calling [`future::block_on()`]
288/// manually all the time becomes too tedious, use this type for more convenient blocking on async
289/// I/O operations.
290///
291/// This type implements traits [`Read`][`std::io::Read`], [`Write`][`std::io::Write`], or
292/// [`Seek`][`std::io::Seek`] if the inner type implements [`AsyncRead`], [`AsyncWrite`], or
293/// [`AsyncSeek`], respectively.
294///
295/// If writing data through the [`Write`][`std::io::Write`] trait, make sure to flush before
296/// dropping the [`BlockOn`] handle or some buffered data might get lost.
297///
298/// # Examples
299///
300/// ```
301/// use futures_lite::io::BlockOn;
302/// use futures_lite::pin;
303/// use std::io::Read;
304///
305/// let reader: &[u8] = b"hello";
306/// pin!(reader);
307///
308/// let mut blocking_reader = BlockOn::new(reader);
309/// let mut contents = String::new();
310///
311/// // This line blocks - note that there is no await:
312/// blocking_reader.read_to_string(&mut contents)?;
313/// # std::io::Result::Ok(())
314/// ```
315#[derive(Debug)]
316pub struct BlockOn<T>(T);
317
318impl<T> BlockOn<T> {
319 /// Wraps an async I/O handle into a blocking interface.
320 ///
321 /// # Examples
322 ///
323 /// ```
324 /// use futures_lite::io::BlockOn;
325 /// use futures_lite::pin;
326 ///
327 /// let reader: &[u8] = b"hello";
328 /// pin!(reader);
329 ///
330 /// let blocking_reader = BlockOn::new(reader);
331 /// ```
332 pub fn new(io: T) -> BlockOn<T> {
333 BlockOn(io)
334 }
335
336 /// Gets a reference to the async I/O handle.
337 ///
338 /// # Examples
339 ///
340 /// ```
341 /// use futures_lite::io::BlockOn;
342 /// use futures_lite::pin;
343 ///
344 /// let reader: &[u8] = b"hello";
345 /// pin!(reader);
346 ///
347 /// let blocking_reader = BlockOn::new(reader);
348 /// let r = blocking_reader.get_ref();
349 /// ```
350 pub fn get_ref(&self) -> &T {
351 &self.0
352 }
353
354 /// Gets a mutable reference to the async I/O handle.
355 ///
356 /// # Examples
357 ///
358 /// ```
359 /// use futures_lite::io::BlockOn;
360 /// use futures_lite::pin;
361 ///
362 /// let reader: &[u8] = b"hello";
363 /// pin!(reader);
364 ///
365 /// let mut blocking_reader = BlockOn::new(reader);
366 /// let r = blocking_reader.get_mut();
367 /// ```
368 pub fn get_mut(&mut self) -> &mut T {
369 &mut self.0
370 }
371
372 /// Extracts the inner async I/O handle.
373 ///
374 /// # Examples
375 ///
376 /// ```
377 /// use futures_lite::io::BlockOn;
378 /// use futures_lite::pin;
379 ///
380 /// let reader: &[u8] = b"hello";
381 /// pin!(reader);
382 ///
383 /// let blocking_reader = BlockOn::new(reader);
384 /// let inner = blocking_reader.into_inner();
385 /// ```
386 pub fn into_inner(self) -> T {
387 self.0
388 }
389}
390
391impl<T: AsyncRead + Unpin> std::io::Read for BlockOn<T> {
392 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
393 future::block_on(self.0.read(buf))
394 }
395}
396
397impl<T: AsyncWrite + Unpin> std::io::Write for BlockOn<T> {
398 fn write(&mut self, buf: &[u8]) -> Result<usize> {
399 future::block_on(self.0.write(buf))
400 }
401
402 fn flush(&mut self) -> Result<()> {
403 future::block_on(self.0.flush())
404 }
405}
406
407impl<T: AsyncSeek + Unpin> std::io::Seek for BlockOn<T> {
408 fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
409 future::block_on(self.0.seek(pos))
410 }
411}
412
413pin_project! {
414 /// Adds buffering to a reader.
415 ///
416 /// It can be excessively inefficient to work directly with an [`AsyncRead`] instance. A
417 /// [`BufReader`] performs large, infrequent reads on the underlying [`AsyncRead`] and
418 /// maintains an in-memory buffer of the incoming byte stream.
419 ///
420 /// [`BufReader`] can improve the speed of programs that make *small* and *repeated* reads to
421 /// the same file or networking socket. It does not help when reading very large amounts at
422 /// once, or reading just once or a few times. It also provides no advantage when reading from
423 /// a source that is already in memory, like a `Vec<u8>`.
424 ///
425 /// When a [`BufReader`] is dropped, the contents of its buffer are discarded. Creating
426 /// multiple instances of [`BufReader`] on the same reader can cause data loss.
427 ///
428 /// # Examples
429 ///
430 /// ```
431 /// use futures_lite::io::{AsyncBufReadExt, BufReader};
432 ///
433 /// # spin_on::spin_on(async {
434 /// let input: &[u8] = b"hello";
435 /// let mut reader = BufReader::new(input);
436 ///
437 /// let mut line = String::new();
438 /// reader.read_line(&mut line).await?;
439 /// # std::io::Result::Ok(()) });
440 /// ```
441 pub struct BufReader<R> {
442 #[pin]
443 inner: R,
444 buf: Box<[u8]>,
445 pos: usize,
446 cap: usize,
447 }
448}
449
450impl<R: AsyncRead> BufReader<R> {
451 /// Creates a buffered reader with the default buffer capacity.
452 ///
453 /// The default capacity is currently 8 KB, but that may change in the future.
454 ///
455 /// # Examples
456 ///
457 /// ```
458 /// use futures_lite::io::BufReader;
459 ///
460 /// let input: &[u8] = b"hello";
461 /// let reader = BufReader::new(input);
462 /// ```
463 pub fn new(inner: R) -> BufReader<R> {
464 BufReader::with_capacity(DEFAULT_BUF_SIZE, inner)
465 }
466
467 /// Creates a buffered reader with the specified capacity.
468 ///
469 /// # Examples
470 ///
471 /// ```
472 /// use futures_lite::io::BufReader;
473 ///
474 /// let input: &[u8] = b"hello";
475 /// let reader = BufReader::with_capacity(1024, input);
476 /// ```
477 pub fn with_capacity(capacity: usize, inner: R) -> BufReader<R> {
478 BufReader {
479 inner,
480 buf: vec![0; capacity].into_boxed_slice(),
481 pos: 0,
482 cap: 0,
483 }
484 }
485}
486
487impl<R> BufReader<R> {
488 /// Gets a reference to the underlying reader.
489 ///
490 /// It is not advisable to directly read from the underlying reader.
491 ///
492 /// # Examples
493 ///
494 /// ```
495 /// use futures_lite::io::BufReader;
496 ///
497 /// let input: &[u8] = b"hello";
498 /// let reader = BufReader::new(input);
499 ///
500 /// let r = reader.get_ref();
501 /// ```
502 pub fn get_ref(&self) -> &R {
503 &self.inner
504 }
505
506 /// Gets a mutable reference to the underlying reader.
507 ///
508 /// It is not advisable to directly read from the underlying reader.
509 ///
510 /// # Examples
511 ///
512 /// ```
513 /// use futures_lite::io::BufReader;
514 ///
515 /// let input: &[u8] = b"hello";
516 /// let mut reader = BufReader::new(input);
517 ///
518 /// let r = reader.get_mut();
519 /// ```
520 pub fn get_mut(&mut self) -> &mut R {
521 &mut self.inner
522 }
523
524 /// Gets a pinned mutable reference to the underlying reader.
525 ///
526 /// It is not advisable to directly read from the underlying reader.
527 fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
528 self.project().inner
529 }
530
531 /// Returns a reference to the internal buffer.
532 ///
533 /// This method will not attempt to fill the buffer if it is empty.
534 ///
535 /// # Examples
536 ///
537 /// ```
538 /// use futures_lite::io::BufReader;
539 ///
540 /// let input: &[u8] = b"hello";
541 /// let reader = BufReader::new(input);
542 ///
543 /// // The internal buffer is empty until the first read request.
544 /// assert_eq!(reader.buffer(), &[]);
545 /// ```
546 pub fn buffer(&self) -> &[u8] {
547 &self.buf[self.pos..self.cap]
548 }
549
550 /// Unwraps the buffered reader, returning the underlying reader.
551 ///
552 /// Note that any leftover data in the internal buffer will be lost.
553 ///
554 /// # Examples
555 ///
556 /// ```
557 /// use futures_lite::io::BufReader;
558 ///
559 /// let input: &[u8] = b"hello";
560 /// let reader = BufReader::new(input);
561 ///
562 /// assert_eq!(reader.into_inner(), input);
563 /// ```
564 pub fn into_inner(self) -> R {
565 self.inner
566 }
567
568 /// Invalidates all data in the internal buffer.
569 #[inline]
570 fn discard_buffer(self: Pin<&mut Self>) {
571 let this = self.project();
572 *this.pos = 0;
573 *this.cap = 0;
574 }
575}
576
577impl<R: AsyncRead> AsyncRead for BufReader<R> {
578 fn poll_read(
579 mut self: Pin<&mut Self>,
580 cx: &mut Context<'_>,
581 buf: &mut [u8],
582 ) -> Poll<Result<usize>> {
583 // If we don't have any buffered data and we're doing a massive read
584 // (larger than our internal buffer), bypass our internal buffer
585 // entirely.
586 if self.pos == self.cap && buf.len() >= self.buf.len() {
587 let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf));
588 self.discard_buffer();
589 return Poll::Ready(res);
590 }
591 let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
592 let nread = std::io::Read::read(&mut rem, buf)?;
593 self.consume(nread);
594 Poll::Ready(Ok(nread))
595 }
596
597 fn poll_read_vectored(
598 mut self: Pin<&mut Self>,
599 cx: &mut Context<'_>,
600 bufs: &mut [IoSliceMut<'_>],
601 ) -> Poll<Result<usize>> {
602 let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
603 if self.pos == self.cap && total_len >= self.buf.len() {
604 let res = ready!(self.as_mut().get_pin_mut().poll_read_vectored(cx, bufs));
605 self.discard_buffer();
606 return Poll::Ready(res);
607 }
608 let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
609 let nread = std::io::Read::read_vectored(&mut rem, bufs)?;
610 self.consume(nread);
611 Poll::Ready(Ok(nread))
612 }
613}
614
615impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
616 fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, cx: &mut Context<'_>) -> Poll<Result<&'a [u8]>> {
617 let mut this = self.project();
618
619 // If we've reached the end of our internal buffer then we need to fetch
620 // some more data from the underlying reader.
621 // Branch using `>=` instead of the more correct `==`
622 // to tell the compiler that the pos..cap slice is always valid.
623 if *this.pos >= *this.cap {
624 debug_assert!(*this.pos == *this.cap);
625 *this.cap = ready!(this.inner.as_mut().poll_read(cx, this.buf))?;
626 *this.pos = 0;
627 }
628 Poll::Ready(Ok(&this.buf[*this.pos..*this.cap]))
629 }
630
631 fn consume(self: Pin<&mut Self>, amt: usize) {
632 let this = self.project();
633 *this.pos = cmp::min(*this.pos + amt, *this.cap);
634 }
635}
636
637impl<R: AsyncRead + fmt::Debug> fmt::Debug for BufReader<R> {
638 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
639 f.debug_struct("BufReader")
640 .field("reader", &self.inner)
641 .field(
642 "buffer",
643 &format_args!("{}/{}", self.cap - self.pos, self.buf.len()),
644 )
645 .finish()
646 }
647}
648
649impl<R: AsyncSeek> AsyncSeek for BufReader<R> {
650 /// Seeks to an offset, in bytes, in the underlying reader.
651 ///
652 /// The position used for seeking with [`SeekFrom::Current`] is the position the underlying
653 /// reader would be at if the [`BufReader`] had no internal buffer.
654 ///
655 /// Seeking always discards the internal buffer, even if the seek position would otherwise fall
656 /// within it. This guarantees that calling [`into_inner()`][`BufReader::into_inner()`]
657 /// immediately after a seek yields the underlying reader at the same position.
658 ///
659 /// See [`AsyncSeek`] for more details.
660 ///
661 /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)` where `n` minus the
662 /// internal buffer length overflows an `i64`, two seeks will be performed instead of one. If
663 /// the second seek returns `Err`, the underlying reader will be left at the same position it
664 /// would have if you called [`seek()`][`AsyncSeekExt::seek()`] with `SeekFrom::Current(0)`.
665 fn poll_seek(
666 mut self: Pin<&mut Self>,
667 cx: &mut Context<'_>,
668 pos: SeekFrom,
669 ) -> Poll<Result<u64>> {
670 let result: u64;
671 if let SeekFrom::Current(n) = pos {
672 let remainder = (self.cap - self.pos) as i64;
673 // it should be safe to assume that remainder fits within an i64 as the alternative
674 // means we managed to allocate 8 exbibytes and that's absurd.
675 // But it's not out of the realm of possibility for some weird underlying reader to
676 // support seeking by i64::min_value() so we need to handle underflow when subtracting
677 // remainder.
678 if let Some(offset) = n.checked_sub(remainder) {
679 result = ready!(self
680 .as_mut()
681 .get_pin_mut()
682 .poll_seek(cx, SeekFrom::Current(offset)))?;
683 } else {
684 // seek backwards by our remainder, and then by the offset
685 ready!(self
686 .as_mut()
687 .get_pin_mut()
688 .poll_seek(cx, SeekFrom::Current(-remainder)))?;
689 self.as_mut().discard_buffer();
690 result = ready!(self
691 .as_mut()
692 .get_pin_mut()
693 .poll_seek(cx, SeekFrom::Current(n)))?;
694 }
695 } else {
696 // Seeking with Start/End doesn't care about our buffer length.
697 result = ready!(self.as_mut().get_pin_mut().poll_seek(cx, pos))?;
698 }
699 self.discard_buffer();
700 Poll::Ready(Ok(result))
701 }
702}
703
704pin_project! {
705 /// Adds buffering to a writer.
706 ///
707 /// It can be excessively inefficient to work directly with something that implements
708 /// [`AsyncWrite`]. For example, every call to [`write()`][`AsyncWriteExt::write()`] on a TCP
709 /// stream results in a system call. A [`BufWriter`] keeps an in-memory buffer of data and
710 /// writes it to the underlying writer in large, infrequent batches.
711 ///
712 /// [`BufWriter`] can improve the speed of programs that make *small* and *repeated* writes to
713 /// the same file or networking socket. It does not help when writing very large amounts at
714 /// once, or writing just once or a few times. It also provides no advantage when writing to a
715 /// destination that is in memory, like a `Vec<u8>`.
716 ///
717 /// Unlike [`std::io::BufWriter`], this type does not write out the contents of its buffer when
718 /// it is dropped. Therefore, it is important that users explicitly flush the buffer before
719 /// dropping the [`BufWriter`].
720 ///
721 /// # Examples
722 ///
723 /// ```
724 /// use futures_lite::io::{AsyncWriteExt, BufWriter};
725 ///
726 /// # spin_on::spin_on(async {
727 /// let mut output = Vec::new();
728 /// let mut writer = BufWriter::new(&mut output);
729 ///
730 /// writer.write_all(b"hello").await?;
731 /// writer.flush().await?;
732 /// # std::io::Result::Ok(()) });
733 /// ```
734 pub struct BufWriter<W> {
735 #[pin]
736 inner: W,
737 buf: Vec<u8>,
738 written: usize,
739 }
740}
741
742impl<W: AsyncWrite> BufWriter<W> {
743 /// Creates a buffered writer with the default buffer capacity.
744 ///
745 /// The default capacity is currently 8 KB, but that may change in the future.
746 ///
747 /// # Examples
748 ///
749 /// ```
750 /// use futures_lite::io::BufWriter;
751 ///
752 /// let mut output = Vec::new();
753 /// let writer = BufWriter::new(&mut output);
754 /// ```
755 pub fn new(inner: W) -> BufWriter<W> {
756 BufWriter::with_capacity(DEFAULT_BUF_SIZE, inner)
757 }
758
759 /// Creates a buffered writer with the specified buffer capacity.
760 ///
761 /// # Examples
762 ///
763 /// ```
764 /// use futures_lite::io::BufWriter;
765 ///
766 /// let mut output = Vec::new();
767 /// let writer = BufWriter::with_capacity(100, &mut output);
768 /// ```
769 pub fn with_capacity(capacity: usize, inner: W) -> BufWriter<W> {
770 BufWriter {
771 inner,
772 buf: Vec::with_capacity(capacity),
773 written: 0,
774 }
775 }
776
777 /// Gets a reference to the underlying writer.
778 ///
779 /// # Examples
780 ///
781 /// ```
782 /// use futures_lite::io::BufWriter;
783 ///
784 /// let mut output = Vec::new();
785 /// let writer = BufWriter::new(&mut output);
786 ///
787 /// let r = writer.get_ref();
788 /// ```
789 pub fn get_ref(&self) -> &W {
790 &self.inner
791 }
792
793 /// Gets a mutable reference to the underlying writer.
794 ///
795 /// It is not advisable to directly write to the underlying writer.
796 ///
797 /// # Examples
798 ///
799 /// ```
800 /// use futures_lite::io::BufWriter;
801 ///
802 /// let mut output = Vec::new();
803 /// let mut writer = BufWriter::new(&mut output);
804 ///
805 /// let r = writer.get_mut();
806 /// ```
807 pub fn get_mut(&mut self) -> &mut W {
808 &mut self.inner
809 }
810
811 /// Gets a pinned mutable reference to the underlying writer.
812 ///
813 /// It is not not advisable to directly write to the underlying writer.
814 fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> {
815 self.project().inner
816 }
817
818 /// Unwraps the buffered writer, returning the underlying writer.
819 ///
820 /// Note that any leftover data in the internal buffer will be lost. If you don't want to lose
821 /// that data, flush the buffered writer before unwrapping it.
822 ///
823 /// # Examples
824 ///
825 /// ```
826 /// use futures_lite::io::{AsyncWriteExt, BufWriter};
827 ///
828 /// # spin_on::spin_on(async {
829 /// let mut output = vec![1, 2, 3];
830 /// let mut writer = BufWriter::new(&mut output);
831 ///
832 /// writer.write_all(&[4]).await?;
833 /// writer.flush().await?;
834 /// assert_eq!(writer.into_inner(), &[1, 2, 3, 4]);
835 /// # std::io::Result::Ok(()) });
836 /// ```
837 pub fn into_inner(self) -> W {
838 self.inner
839 }
840
841 /// Returns a reference to the internal buffer.
842 ///
843 /// # Examples
844 ///
845 /// ```
846 /// use futures_lite::io::BufWriter;
847 ///
848 /// let mut output = Vec::new();
849 /// let writer = BufWriter::new(&mut output);
850 ///
851 /// // The internal buffer is empty until the first write request.
852 /// assert_eq!(writer.buffer(), &[]);
853 /// ```
854 pub fn buffer(&self) -> &[u8] {
855 &self.buf
856 }
857
858 /// Flush the buffer.
859 fn poll_flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
860 let mut this = self.project();
861 let len = this.buf.len();
862 let mut ret = Ok(());
863
864 while *this.written < len {
865 match this
866 .inner
867 .as_mut()
868 .poll_write(cx, &this.buf[*this.written..])
869 {
870 Poll::Ready(Ok(0)) => {
871 ret = Err(Error::new(
872 ErrorKind::WriteZero,
873 "Failed to write buffered data",
874 ));
875 break;
876 }
877 Poll::Ready(Ok(n)) => *this.written += n,
878 Poll::Ready(Err(ref e)) if e.kind() == ErrorKind::Interrupted => {}
879 Poll::Ready(Err(e)) => {
880 ret = Err(e);
881 break;
882 }
883 Poll::Pending => return Poll::Pending,
884 }
885 }
886
887 if *this.written > 0 {
888 this.buf.drain(..*this.written);
889 }
890 *this.written = 0;
891
892 Poll::Ready(ret)
893 }
894}
895
896impl<W: AsyncWrite + fmt::Debug> fmt::Debug for BufWriter<W> {
897 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
898 f.debug_struct("BufWriter")
899 .field("writer", &self.inner)
900 .field("buf", &self.buf)
901 .finish()
902 }
903}
904
905impl<W: AsyncWrite> AsyncWrite for BufWriter<W> {
906 fn poll_write(
907 mut self: Pin<&mut Self>,
908 cx: &mut Context<'_>,
909 buf: &[u8],
910 ) -> Poll<Result<usize>> {
911 if self.buf.len() + buf.len() > self.buf.capacity() {
912 ready!(self.as_mut().poll_flush_buf(cx))?;
913 }
914 if buf.len() >= self.buf.capacity() {
915 self.get_pin_mut().poll_write(cx, buf)
916 } else {
917 Pin::new(&mut *self.project().buf).poll_write(cx, buf)
918 }
919 }
920
921 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
922 ready!(self.as_mut().poll_flush_buf(cx))?;
923 self.get_pin_mut().poll_flush(cx)
924 }
925
926 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
927 ready!(self.as_mut().poll_flush_buf(cx))?;
928 self.get_pin_mut().poll_close(cx)
929 }
930}
931
932impl<W: AsyncWrite + AsyncSeek> AsyncSeek for BufWriter<W> {
933 /// Seek to the offset, in bytes, in the underlying writer.
934 ///
935 /// Seeking always writes out the internal buffer before seeking.
936 fn poll_seek(
937 mut self: Pin<&mut Self>,
938 cx: &mut Context<'_>,
939 pos: SeekFrom,
940 ) -> Poll<Result<u64>> {
941 ready!(self.as_mut().poll_flush_buf(cx))?;
942 self.get_pin_mut().poll_seek(cx, pos)
943 }
944}
945
946/// Gives an in-memory buffer a cursor for reading and writing.
947///
948/// # Examples
949///
950/// ```
951/// use futures_lite::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, Cursor, SeekFrom};
952///
953/// # spin_on::spin_on(async {
954/// let mut bytes = b"hello".to_vec();
955/// let mut cursor = Cursor::new(&mut bytes);
956///
957/// // Overwrite 'h' with 'H'.
958/// cursor.write_all(b"H").await?;
959///
960/// // Move the cursor one byte forward.
961/// cursor.seek(SeekFrom::Current(1)).await?;
962///
963/// // Read a byte.
964/// let mut byte = [0];
965/// cursor.read_exact(&mut byte).await?;
966/// assert_eq!(&byte, b"l");
967///
968/// // Check the final buffer.
969/// assert_eq!(bytes, b"Hello");
970/// # std::io::Result::Ok(()) });
971/// ```
972#[derive(Clone, Debug, Default)]
973pub struct Cursor<T> {
974 inner: std::io::Cursor<T>,
975}
976
977impl<T> Cursor<T> {
978 /// Creates a cursor for an in-memory buffer.
979 ///
980 /// Cursor's initial position is 0 even if the underlying buffer is not empty. Writing using
981 /// [`Cursor`] will overwrite the existing contents unless the cursor is moved to the end of
982 /// the buffer using [`set_position()`][Cursor::set_position()`] or
983 /// [`seek()`][`AsyncSeekExt::seek()`].
984 ///
985 /// # Examples
986 ///
987 /// ```
988 /// use futures_lite::io::Cursor;
989 ///
990 /// let cursor = Cursor::new(Vec::<u8>::new());
991 /// ```
992 pub fn new(inner: T) -> Cursor<T> {
993 Cursor {
994 inner: std::io::Cursor::new(inner),
995 }
996 }
997
998 /// Gets a reference to the underlying buffer.
999 ///
1000 /// # Examples
1001 ///
1002 /// ```
1003 /// use futures_lite::io::Cursor;
1004 ///
1005 /// let cursor = Cursor::new(Vec::<u8>::new());
1006 /// let r = cursor.get_ref();
1007 /// ```
1008 pub fn get_ref(&self) -> &T {
1009 self.inner.get_ref()
1010 }
1011
1012 /// Gets a mutable reference to the underlying buffer.
1013 ///
1014 /// # Examples
1015 ///
1016 /// ```
1017 /// use futures_lite::io::Cursor;
1018 ///
1019 /// let mut cursor = Cursor::new(Vec::<u8>::new());
1020 /// let r = cursor.get_mut();
1021 /// ```
1022 pub fn get_mut(&mut self) -> &mut T {
1023 self.inner.get_mut()
1024 }
1025
1026 /// Unwraps the cursor, returning the underlying buffer.
1027 ///
1028 /// # Examples
1029 ///
1030 /// ```
1031 /// use futures_lite::io::Cursor;
1032 ///
1033 /// let cursor = Cursor::new(vec![1, 2, 3]);
1034 /// assert_eq!(cursor.into_inner(), [1, 2, 3]);
1035 /// ```
1036 pub fn into_inner(self) -> T {
1037 self.inner.into_inner()
1038 }
1039
1040 /// Returns the current position of this cursor.
1041 ///
1042 /// # Examples
1043 ///
1044 /// ```
1045 /// use futures_lite::io::{AsyncSeekExt, Cursor, SeekFrom};
1046 ///
1047 /// # spin_on::spin_on(async {
1048 /// let mut cursor = Cursor::new(b"hello");
1049 /// assert_eq!(cursor.position(), 0);
1050 ///
1051 /// cursor.seek(SeekFrom::Start(2)).await?;
1052 /// assert_eq!(cursor.position(), 2);
1053 /// # std::io::Result::Ok(()) });
1054 /// ```
1055 pub fn position(&self) -> u64 {
1056 self.inner.position()
1057 }
1058
1059 /// Sets the position of this cursor.
1060 ///
1061 /// # Examples
1062 ///
1063 /// ```
1064 /// use futures_lite::io::Cursor;
1065 ///
1066 /// let mut cursor = Cursor::new(b"hello");
1067 /// assert_eq!(cursor.position(), 0);
1068 ///
1069 /// cursor.set_position(2);
1070 /// assert_eq!(cursor.position(), 2);
1071 /// ```
1072 pub fn set_position(&mut self, pos: u64) {
1073 self.inner.set_position(pos)
1074 }
1075}
1076
1077impl<T> AsyncSeek for Cursor<T>
1078where
1079 T: AsRef<[u8]> + Unpin,
1080{
1081 fn poll_seek(
1082 mut self: Pin<&mut Self>,
1083 _: &mut Context<'_>,
1084 pos: SeekFrom,
1085 ) -> Poll<Result<u64>> {
1086 Poll::Ready(std::io::Seek::seek(&mut self.inner, pos))
1087 }
1088}
1089
1090impl<T> AsyncRead for Cursor<T>
1091where
1092 T: AsRef<[u8]> + Unpin,
1093{
1094 fn poll_read(
1095 mut self: Pin<&mut Self>,
1096 _cx: &mut Context<'_>,
1097 buf: &mut [u8],
1098 ) -> Poll<Result<usize>> {
1099 Poll::Ready(std::io::Read::read(&mut self.inner, buf))
1100 }
1101
1102 fn poll_read_vectored(
1103 mut self: Pin<&mut Self>,
1104 _: &mut Context<'_>,
1105 bufs: &mut [IoSliceMut<'_>],
1106 ) -> Poll<Result<usize>> {
1107 Poll::Ready(std::io::Read::read_vectored(&mut self.inner, bufs))
1108 }
1109}
1110
1111impl<T> AsyncBufRead for Cursor<T>
1112where
1113 T: AsRef<[u8]> + Unpin,
1114{
1115 fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<&[u8]>> {
1116 Poll::Ready(std::io::BufRead::fill_buf(&mut self.get_mut().inner))
1117 }
1118
1119 fn consume(mut self: Pin<&mut Self>, amt: usize) {
1120 std::io::BufRead::consume(&mut self.inner, amt)
1121 }
1122}
1123
1124impl AsyncWrite for Cursor<&mut [u8]> {
1125 fn poll_write(
1126 mut self: Pin<&mut Self>,
1127 _: &mut Context<'_>,
1128 buf: &[u8],
1129 ) -> Poll<Result<usize>> {
1130 Poll::Ready(std::io::Write::write(&mut self.inner, buf))
1131 }
1132
1133 fn poll_write_vectored(
1134 mut self: Pin<&mut Self>,
1135 _: &mut Context<'_>,
1136 bufs: &[IoSlice<'_>],
1137 ) -> Poll<Result<usize>> {
1138 Poll::Ready(std::io::Write::write_vectored(&mut self.inner, bufs))
1139 }
1140
1141 fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1142 Poll::Ready(std::io::Write::flush(&mut self.inner))
1143 }
1144
1145 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1146 self.poll_flush(cx)
1147 }
1148}
1149
1150impl AsyncWrite for Cursor<&mut Vec<u8>> {
1151 fn poll_write(
1152 mut self: Pin<&mut Self>,
1153 _: &mut Context<'_>,
1154 buf: &[u8],
1155 ) -> Poll<Result<usize>> {
1156 Poll::Ready(std::io::Write::write(&mut self.inner, buf))
1157 }
1158
1159 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1160 self.poll_flush(cx)
1161 }
1162
1163 fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1164 Poll::Ready(std::io::Write::flush(&mut self.inner))
1165 }
1166}
1167
1168impl AsyncWrite for Cursor<Vec<u8>> {
1169 fn poll_write(
1170 mut self: Pin<&mut Self>,
1171 _: &mut Context<'_>,
1172 buf: &[u8],
1173 ) -> Poll<Result<usize>> {
1174 Poll::Ready(std::io::Write::write(&mut self.inner, buf))
1175 }
1176
1177 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1178 self.poll_flush(cx)
1179 }
1180
1181 fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1182 Poll::Ready(std::io::Write::flush(&mut self.inner))
1183 }
1184}
1185
1186/// Creates an empty reader.
1187///
1188/// # Examples
1189///
1190/// ```
1191/// use futures_lite::io::{self, AsyncReadExt};
1192///
1193/// # spin_on::spin_on(async {
1194/// let mut reader = io::empty();
1195///
1196/// let mut contents = Vec::new();
1197/// reader.read_to_end(&mut contents).await?;
1198/// assert!(contents.is_empty());
1199/// # std::io::Result::Ok(()) });
1200/// ```
1201pub fn empty() -> Empty {
1202 Empty { _private: () }
1203}
1204
1205/// Reader for the [`empty()`] function.
1206pub struct Empty {
1207 _private: (),
1208}
1209
1210impl fmt::Debug for Empty {
1211 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1212 f.pad("Empty { .. }")
1213 }
1214}
1215
1216impl AsyncRead for Empty {
1217 #[inline]
1218 fn poll_read(self: Pin<&mut Self>, _: &mut Context<'_>, _: &mut [u8]) -> Poll<Result<usize>> {
1219 Poll::Ready(Ok(0))
1220 }
1221}
1222
1223impl AsyncBufRead for Empty {
1224 #[inline]
1225 fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, _: &mut Context<'_>) -> Poll<Result<&'a [u8]>> {
1226 Poll::Ready(Ok(&[]))
1227 }
1228
1229 #[inline]
1230 fn consume(self: Pin<&mut Self>, _: usize) {}
1231}
1232
1233/// Creates an infinite reader that reads the same byte repeatedly.
1234///
1235/// # Examples
1236///
1237/// ```
1238/// use futures_lite::io::{self, AsyncReadExt};
1239///
1240/// # spin_on::spin_on(async {
1241/// let mut reader = io::repeat(b'a');
1242///
1243/// let mut contents = vec![0; 5];
1244/// reader.read_exact(&mut contents).await?;
1245/// assert_eq!(contents, b"aaaaa");
1246/// # std::io::Result::Ok(()) });
1247/// ```
1248pub fn repeat(byte: u8) -> Repeat {
1249 Repeat { byte }
1250}
1251
1252/// Reader for the [`repeat()`] function.
1253#[derive(Debug)]
1254pub struct Repeat {
1255 byte: u8,
1256}
1257
1258impl AsyncRead for Repeat {
1259 #[inline]
1260 fn poll_read(self: Pin<&mut Self>, _: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
1261 for b in &mut *buf {
1262 *b = self.byte;
1263 }
1264 Poll::Ready(Ok(buf.len()))
1265 }
1266}
1267
1268/// Creates a writer that consumes and drops all data.
1269///
1270/// # Examples
1271///
1272/// ```
1273/// use futures_lite::io::{self, AsyncWriteExt};
1274///
1275/// # spin_on::spin_on(async {
1276/// let mut writer = io::sink();
1277/// writer.write_all(b"hello").await?;
1278/// # std::io::Result::Ok(()) });
1279/// ```
1280pub fn sink() -> Sink {
1281 Sink { _private: () }
1282}
1283
1284/// Writer for the [`sink()`] function.
1285#[derive(Debug)]
1286pub struct Sink {
1287 _private: (),
1288}
1289
1290impl AsyncWrite for Sink {
1291 #[inline]
1292 fn poll_write(self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
1293 Poll::Ready(Ok(buf.len()))
1294 }
1295
1296 #[inline]
1297 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1298 Poll::Ready(Ok(()))
1299 }
1300
1301 #[inline]
1302 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1303 Poll::Ready(Ok(()))
1304 }
1305}
1306
1307/// Extension trait for [`AsyncBufRead`].
1308pub trait AsyncBufReadExt: AsyncBufRead {
1309 /// Returns the contents of the internal buffer, filling it with more data if empty.
1310 ///
1311 /// If the stream has reached EOF, an empty buffer will be returned.
1312 ///
1313 /// # Examples
1314 ///
1315 /// ```
1316 /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1317 /// use std::pin::Pin;
1318 ///
1319 /// # spin_on::spin_on(async {
1320 /// let input: &[u8] = b"hello world";
1321 /// let mut reader = BufReader::with_capacity(5, input);
1322 ///
1323 /// assert_eq!(reader.fill_buf().await?, b"hello");
1324 /// reader.consume(2);
1325 /// assert_eq!(reader.fill_buf().await?, b"llo");
1326 /// reader.consume(3);
1327 /// assert_eq!(reader.fill_buf().await?, b" worl");
1328 /// # std::io::Result::Ok(()) });
1329 /// ```
1330 fn fill_buf(&mut self) -> FillBuf<'_, Self>
1331 where
1332 Self: Unpin,
1333 {
1334 FillBuf { reader: Some(self) }
1335 }
1336
1337 /// Consumes `amt` buffered bytes.
1338 ///
1339 /// This method does not perform any I/O, it simply consumes some amount of bytes from the
1340 /// internal buffer.
1341 ///
1342 /// The `amt` must be <= the number of bytes in the buffer returned by
1343 /// [`fill_buf()`][`AsyncBufReadExt::fill_buf()`].
1344 ///
1345 /// # Examples
1346 ///
1347 /// ```
1348 /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1349 /// use std::pin::Pin;
1350 ///
1351 /// # spin_on::spin_on(async {
1352 /// let input: &[u8] = b"hello";
1353 /// let mut reader = BufReader::with_capacity(4, input);
1354 ///
1355 /// assert_eq!(reader.fill_buf().await?, b"hell");
1356 /// reader.consume(2);
1357 /// assert_eq!(reader.fill_buf().await?, b"ll");
1358 /// # std::io::Result::Ok(()) });
1359 /// ```
1360 fn consume(&mut self, amt: usize)
1361 where
1362 Self: Unpin,
1363 {
1364 AsyncBufRead::consume(Pin::new(self), amt);
1365 }
1366
1367 /// Reads all bytes and appends them into `buf` until the delimiter `byte` or EOF is found.
1368 ///
1369 /// This method will read bytes from the underlying stream until the delimiter or EOF is
1370 /// found. All bytes up to and including the delimiter (if found) will be appended to `buf`.
1371 ///
1372 /// If successful, returns the total number of bytes read.
1373 ///
1374 /// # Examples
1375 ///
1376 /// ```
1377 /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1378 ///
1379 /// # spin_on::spin_on(async {
1380 /// let input: &[u8] = b"hello";
1381 /// let mut reader = BufReader::new(input);
1382 ///
1383 /// let mut buf = Vec::new();
1384 /// let n = reader.read_until(b'\n', &mut buf).await?;
1385 /// # std::io::Result::Ok(()) });
1386 /// ```
1387 fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> ReadUntilFuture<'_, Self>
1388 where
1389 Self: Unpin,
1390 {
1391 ReadUntilFuture {
1392 reader: self,
1393 byte,
1394 buf,
1395 read: 0,
1396 }
1397 }
1398
1399 /// Reads all bytes and appends them into `buf` until a newline (the 0xA byte) or EOF is found.
1400 ///
1401 /// This method will read bytes from the underlying stream until the newline delimiter (the
1402 /// 0xA byte) or EOF is found. All bytes up to, and including, the newline delimiter (if found)
1403 /// will be appended to `buf`.
1404 ///
1405 /// If successful, returns the total number of bytes read.
1406 ///
1407 /// # Examples
1408 ///
1409 /// ```
1410 /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1411 ///
1412 /// # spin_on::spin_on(async {
1413 /// let input: &[u8] = b"hello";
1414 /// let mut reader = BufReader::new(input);
1415 ///
1416 /// let mut line = String::new();
1417 /// let n = reader.read_line(&mut line).await?;
1418 /// # std::io::Result::Ok(()) });
1419 /// ```
1420 fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLineFuture<'_, Self>
1421 where
1422 Self: Unpin,
1423 {
1424 ReadLineFuture {
1425 reader: self,
1426 buf,
1427 bytes: Vec::new(),
1428 read: 0,
1429 }
1430 }
1431
1432 /// Returns a stream over the lines of this byte stream.
1433 ///
1434 /// The stream returned from this method yields items of type
1435 /// [`io::Result`][`super::io::Result`]`<`[`String`]`>`.
1436 /// Each string returned will *not* have a newline byte (the 0xA byte) or CRLF (0xD, 0xA bytes)
1437 /// at the end.
1438 ///
1439 /// # Examples
1440 ///
1441 /// ```
1442 /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1443 /// use futures_lite::stream::StreamExt;
1444 ///
1445 /// # spin_on::spin_on(async {
1446 /// let input: &[u8] = b"hello\nworld\n";
1447 /// let mut reader = BufReader::new(input);
1448 /// let mut lines = reader.lines();
1449 ///
1450 /// let mut line = String::new();
1451 /// while let Some(line) = lines.next().await {
1452 /// println!("{}", line?);
1453 /// }
1454 /// # std::io::Result::Ok(()) });
1455 /// ```
1456 fn lines(self) -> Lines<Self>
1457 where
1458 Self: Unpin + Sized,
1459 {
1460 Lines {
1461 reader: self,
1462 buf: String::new(),
1463 bytes: Vec::new(),
1464 read: 0,
1465 }
1466 }
1467
1468 /// Returns a stream over the contents of this reader split on the specified `byte`.
1469 ///
1470 /// The stream returned from this method yields items of type
1471 /// [`io::Result`][`super::io::Result`]`<`[`Vec<u8>`][`Vec`]`>`.
1472 /// Each vector returned will *not* have the delimiter byte at the end.
1473 ///
1474 /// # Examples
1475 ///
1476 /// ```
1477 /// use futures_lite::io::{AsyncBufReadExt, Cursor};
1478 /// use futures_lite::stream::StreamExt;
1479 ///
1480 /// # spin_on::spin_on(async {
1481 /// let cursor = Cursor::new(b"lorem-ipsum-dolor");
1482 /// let items: Vec<Vec<u8>> = cursor.split(b'-').try_collect().await?;
1483 ///
1484 /// assert_eq!(items[0], b"lorem");
1485 /// assert_eq!(items[1], b"ipsum");
1486 /// assert_eq!(items[2], b"dolor");
1487 /// # std::io::Result::Ok(()) });
1488 /// ```
1489 fn split(self, byte: u8) -> Split<Self>
1490 where
1491 Self: Sized,
1492 {
1493 Split {
1494 reader: self,
1495 buf: Vec::new(),
1496 delim: byte,
1497 read: 0,
1498 }
1499 }
1500}
1501
1502impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}
1503
1504/// Future for the [`AsyncBufReadExt::fill_buf()`] method.
1505#[derive(Debug)]
1506#[must_use = "futures do nothing unless you `.await` or poll them"]
1507pub struct FillBuf<'a, R: ?Sized> {
1508 reader: Option<&'a mut R>,
1509}
1510
1511impl<R: ?Sized> Unpin for FillBuf<'_, R> {}
1512
1513impl<'a, R> Future for FillBuf<'a, R>
1514where
1515 R: AsyncBufRead + Unpin + ?Sized,
1516{
1517 type Output = Result<&'a [u8]>;
1518
1519 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1520 let this = &mut *self;
1521 let reader = this
1522 .reader
1523 .take()
1524 .expect("polled `FillBuf` after completion");
1525
1526 match Pin::new(&mut *reader).poll_fill_buf(cx) {
1527 Poll::Ready(Ok(_)) => match Pin::new(reader).poll_fill_buf(cx) {
1528 Poll::Ready(Ok(slice)) => Poll::Ready(Ok(slice)),
1529 poll => panic!("`poll_fill_buf()` was ready but now it isn't: {:?}", poll),
1530 },
1531 Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
1532 Poll::Pending => {
1533 this.reader = Some(reader);
1534 Poll::Pending
1535 }
1536 }
1537 }
1538}
1539
1540/// Future for the [`AsyncBufReadExt::read_until()`] method.
1541#[derive(Debug)]
1542#[must_use = "futures do nothing unless you `.await` or poll them"]
1543pub struct ReadUntilFuture<'a, R: Unpin + ?Sized> {
1544 reader: &'a mut R,
1545 byte: u8,
1546 buf: &'a mut Vec<u8>,
1547 read: usize,
1548}
1549
1550impl<R: Unpin + ?Sized> Unpin for ReadUntilFuture<'_, R> {}
1551
1552impl<R: AsyncBufRead + Unpin + ?Sized> Future for ReadUntilFuture<'_, R> {
1553 type Output = Result<usize>;
1554
1555 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1556 let Self {
1557 reader,
1558 byte,
1559 buf,
1560 read,
1561 } = &mut *self;
1562 read_until_internal(Pin::new(reader), cx, *byte, buf, read)
1563 }
1564}
1565
1566fn read_until_internal<R: AsyncBufReadExt + ?Sized>(
1567 mut reader: Pin<&mut R>,
1568 cx: &mut Context<'_>,
1569 byte: u8,
1570 buf: &mut Vec<u8>,
1571 read: &mut usize,
1572) -> Poll<Result<usize>> {
1573 loop {
1574 let (done, used) = {
1575 let available = ready!(reader.as_mut().poll_fill_buf(cx))?;
1576
1577 if let Some(i) = memchr::memchr(byte, available) {
1578 buf.extend_from_slice(&available[..=i]);
1579 (true, i + 1)
1580 } else {
1581 buf.extend_from_slice(available);
1582 (false, available.len())
1583 }
1584 };
1585
1586 reader.as_mut().consume(used);
1587 *read += used;
1588
1589 if done || used == 0 {
1590 return Poll::Ready(Ok(mem::replace(read, 0)));
1591 }
1592 }
1593}
1594
1595/// Future for the [`AsyncBufReadExt::read_line()`] method.
1596#[derive(Debug)]
1597#[must_use = "futures do nothing unless you `.await` or poll them"]
1598pub struct ReadLineFuture<'a, R: Unpin + ?Sized> {
1599 reader: &'a mut R,
1600 buf: &'a mut String,
1601 bytes: Vec<u8>,
1602 read: usize,
1603}
1604
1605impl<R: Unpin + ?Sized> Unpin for ReadLineFuture<'_, R> {}
1606
1607impl<R: AsyncBufRead + Unpin + ?Sized> Future for ReadLineFuture<'_, R> {
1608 type Output = Result<usize>;
1609
1610 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1611 let Self {
1612 reader,
1613 buf,
1614 bytes,
1615 read,
1616 } = &mut *self;
1617 read_line_internal(Pin::new(reader), cx, buf, bytes, read)
1618 }
1619}
1620
1621pin_project! {
1622 /// Stream for the [`AsyncBufReadExt::lines()`] method.
1623 #[derive(Debug)]
1624 #[must_use = "streams do nothing unless polled"]
1625 pub struct Lines<R> {
1626 #[pin]
1627 reader: R,
1628 buf: String,
1629 bytes: Vec<u8>,
1630 read: usize,
1631 }
1632}
1633
1634impl<R: AsyncBufRead> Stream for Lines<R> {
1635 type Item = Result<String>;
1636
1637 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1638 let this = self.project();
1639
1640 let n = ready!(read_line_internal(
1641 this.reader,
1642 cx,
1643 this.buf,
1644 this.bytes,
1645 this.read
1646 ))?;
1647 if n == 0 && this.buf.is_empty() {
1648 return Poll::Ready(None);
1649 }
1650
1651 if this.buf.ends_with('\n') {
1652 this.buf.pop();
1653 if this.buf.ends_with('\r') {
1654 this.buf.pop();
1655 }
1656 }
1657 Poll::Ready(Some(Ok(mem::replace(this.buf, String::new()))))
1658 }
1659}
1660
1661fn read_line_internal<R: AsyncBufRead + ?Sized>(
1662 reader: Pin<&mut R>,
1663 cx: &mut Context<'_>,
1664 buf: &mut String,
1665 bytes: &mut Vec<u8>,
1666 read: &mut usize,
1667) -> Poll<Result<usize>> {
1668 let ret = ready!(read_until_internal(reader, cx, b'\n', bytes, read));
1669
1670 match String::from_utf8(mem::replace(bytes, Vec::new())) {
1671 Ok(s) => {
1672 debug_assert!(buf.is_empty());
1673 debug_assert_eq!(*read, 0);
1674 *buf = s;
1675 Poll::Ready(ret)
1676 }
1677 Err(_) => Poll::Ready(ret.and_then(|_| {
1678 Err(Error::new(
1679 ErrorKind::InvalidData,
1680 "stream did not contain valid UTF-8",
1681 ))
1682 })),
1683 }
1684}
1685
1686pin_project! {
1687 /// Stream for the [`AsyncBufReadExt::split()`] method.
1688 #[derive(Debug)]
1689 #[must_use = "streams do nothing unless polled"]
1690 pub struct Split<R> {
1691 #[pin]
1692 reader: R,
1693 buf: Vec<u8>,
1694 read: usize,
1695 delim: u8,
1696 }
1697}
1698
1699impl<R: AsyncBufRead> Stream for Split<R> {
1700 type Item = Result<Vec<u8>>;
1701
1702 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1703 let this = self.project();
1704
1705 let n = ready!(read_until_internal(
1706 this.reader,
1707 cx,
1708 *this.delim,
1709 this.buf,
1710 this.read
1711 ))?;
1712 if n == 0 && this.buf.is_empty() {
1713 return Poll::Ready(None);
1714 }
1715
1716 if this.buf[this.buf.len() - 1] == *this.delim {
1717 this.buf.pop();
1718 }
1719 Poll::Ready(Some(Ok(mem::replace(this.buf, vec![]))))
1720 }
1721}
1722
1723/// Extension trait for [`AsyncRead`].
1724pub trait AsyncReadExt: AsyncRead {
1725 /// Reads some bytes from the byte stream.
1726 ///
1727 /// On success, returns the total number of bytes read.
1728 ///
1729 /// If the return value is `Ok(n)`, then it must be guaranteed that
1730 /// `0 <= n <= buf.len()`. A nonzero `n` value indicates that the buffer has been
1731 /// filled with `n` bytes of data. If `n` is `0`, then it can indicate one of two
1732 /// scenarios:
1733 ///
1734 /// 1. This reader has reached its "end of file" and will likely no longer be able to
1735 /// produce bytes. Note that this does not mean that the reader will always no
1736 /// longer be able to produce bytes.
1737 /// 2. The buffer specified was 0 bytes in length.
1738 ///
1739 /// # Examples
1740 ///
1741 /// ```
1742 /// use futures_lite::io::{AsyncReadExt, BufReader};
1743 ///
1744 /// # spin_on::spin_on(async {
1745 /// let input: &[u8] = b"hello";
1746 /// let mut reader = BufReader::new(input);
1747 ///
1748 /// let mut buf = vec![0; 1024];
1749 /// let n = reader.read(&mut buf).await?;
1750 /// # std::io::Result::Ok(()) });
1751 /// ```
1752 fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadFuture<'a, Self>
1753 where
1754 Self: Unpin,
1755 {
1756 ReadFuture { reader: self, buf }
1757 }
1758
1759 /// Like [`read()`][`AsyncReadExt::read()`], except it reads into a slice of buffers.
1760 ///
1761 /// Data is copied to fill each buffer in order, with the final buffer possibly being
1762 /// only partially filled. This method must behave same as a single call to
1763 /// [`read()`][`AsyncReadExt::read()`] with the buffers concatenated would.
1764 fn read_vectored<'a>(
1765 &'a mut self,
1766 bufs: &'a mut [IoSliceMut<'a>],
1767 ) -> ReadVectoredFuture<'a, Self>
1768 where
1769 Self: Unpin,
1770 {
1771 ReadVectoredFuture { reader: self, bufs }
1772 }
1773
1774 /// Reads the entire contents and appends them to a [`Vec`].
1775 ///
1776 /// On success, returns the total number of bytes read.
1777 ///
1778 /// # Examples
1779 ///
1780 /// ```
1781 /// use futures_lite::io::{AsyncReadExt, Cursor};
1782 ///
1783 /// # spin_on::spin_on(async {
1784 /// let mut reader = Cursor::new(vec![1, 2, 3]);
1785 /// let mut contents = Vec::new();
1786 ///
1787 /// let n = reader.read_to_end(&mut contents).await?;
1788 /// assert_eq!(n, 3);
1789 /// assert_eq!(contents, [1, 2, 3]);
1790 /// # std::io::Result::Ok(()) });
1791 /// ```
1792 fn read_to_end<'a>(&'a mut self, buf: &'a mut Vec<u8>) -> ReadToEndFuture<'a, Self>
1793 where
1794 Self: Unpin,
1795 {
1796 let start_len = buf.len();
1797 ReadToEndFuture {
1798 reader: self,
1799 buf,
1800 start_len,
1801 }
1802 }
1803
1804 /// Reads the entire contents and appends them to a [`String`].
1805 ///
1806 /// On success, returns the total number of bytes read.
1807 ///
1808 /// # Examples
1809 ///
1810 /// ```
1811 /// use futures_lite::io::{AsyncReadExt, Cursor};
1812 ///
1813 /// # spin_on::spin_on(async {
1814 /// let mut reader = Cursor::new(&b"hello");
1815 /// let mut contents = String::new();
1816 ///
1817 /// let n = reader.read_to_string(&mut contents).await?;
1818 /// assert_eq!(n, 5);
1819 /// assert_eq!(contents, "hello");
1820 /// # std::io::Result::Ok(()) });
1821 /// ```
1822 fn read_to_string<'a>(&'a mut self, buf: &'a mut String) -> ReadToStringFuture<'a, Self>
1823 where
1824 Self: Unpin,
1825 {
1826 ReadToStringFuture {
1827 reader: self,
1828 buf,
1829 bytes: Vec::new(),
1830 start_len: 0,
1831 }
1832 }
1833
1834 /// Reads the exact number of bytes required to fill `buf`.
1835 ///
1836 /// On success, returns the total number of bytes read.
1837 ///
1838 /// # Examples
1839 ///
1840 /// ```
1841 /// use futures_lite::io::{AsyncReadExt, Cursor};
1842 ///
1843 /// # spin_on::spin_on(async {
1844 /// let mut reader = Cursor::new(&b"hello");
1845 /// let mut contents = vec![0; 3];
1846 ///
1847 /// reader.read_exact(&mut contents).await?;
1848 /// assert_eq!(contents, b"hel");
1849 /// # std::io::Result::Ok(()) });
1850 /// ```
1851 fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExactFuture<'a, Self>
1852 where
1853 Self: Unpin,
1854 {
1855 ReadExactFuture { reader: self, buf }
1856 }
1857
1858 /// Creates an adapter which will read at most `limit` bytes from it.
1859 ///
1860 /// This method returns a new instance of [`AsyncRead`] which will read at most
1861 /// `limit` bytes, after which it will always return `Ok(0)` indicating EOF.
1862 ///
1863 /// # Examples
1864 ///
1865 /// ```
1866 /// use futures_lite::io::{AsyncReadExt, Cursor};
1867 ///
1868 /// # spin_on::spin_on(async {
1869 /// let mut reader = Cursor::new(&b"hello");
1870 /// let mut contents = String::new();
1871 ///
1872 /// let n = reader.take(3).read_to_string(&mut contents).await?;
1873 /// assert_eq!(n, 3);
1874 /// assert_eq!(contents, "hel");
1875 /// # std::io::Result::Ok(()) });
1876 /// ```
1877 fn take(self, limit: u64) -> Take<Self>
1878 where
1879 Self: Sized,
1880 {
1881 Take { inner: self, limit }
1882 }
1883
1884 /// Converts this [`AsyncRead`] into a [`Stream`] of bytes.
1885 ///
1886 /// The returned type implements [`Stream`] where `Item` is `io::Result<u8>`.
1887 ///
1888 /// ```
1889 /// use futures_lite::io::{AsyncReadExt, Cursor};
1890 /// use futures_lite::stream::StreamExt;
1891 ///
1892 /// # spin_on::spin_on(async {
1893 /// let reader = Cursor::new(&b"hello");
1894 /// let mut bytes = reader.bytes();
1895 ///
1896 /// while let Some(byte) = bytes.next().await {
1897 /// println!("byte: {}", byte?);
1898 /// }
1899 /// # std::io::Result::Ok(()) });
1900 /// ```
1901 fn bytes(self) -> Bytes<Self>
1902 where
1903 Self: Sized,
1904 {
1905 Bytes { inner: self }
1906 }
1907
1908 /// Creates an adapter which will chain this stream with another.
1909 ///
1910 /// The returned [`AsyncRead`] instance will first read all bytes from this reader
1911 /// until EOF is found, and then continue with `next`.
1912 ///
1913 /// # Examples
1914 ///
1915 /// ```
1916 /// use futures_lite::io::{AsyncReadExt, Cursor};
1917 ///
1918 /// # spin_on::spin_on(async {
1919 /// let r1 = Cursor::new(&b"hello");
1920 /// let r2 = Cursor::new(&b"world");
1921 /// let mut reader = r1.chain(r2);
1922 ///
1923 /// let mut contents = String::new();
1924 /// reader.read_to_string(&mut contents).await?;
1925 /// assert_eq!(contents, "helloworld");
1926 /// # std::io::Result::Ok(()) });
1927 /// ```
1928 fn chain<R: AsyncRead>(self, next: R) -> Chain<Self, R>
1929 where
1930 Self: Sized,
1931 {
1932 Chain {
1933 first: self,
1934 second: next,
1935 done_first: false,
1936 }
1937 }
1938
1939 /// Boxes the reader and changes its type to `dyn AsyncRead + Send + 'a`.
1940 ///
1941 /// # Examples
1942 ///
1943 /// ```
1944 /// use futures_lite::io::AsyncReadExt;
1945 ///
1946 /// let reader = [1, 2, 3].boxed_reader();
1947 /// ```
1948 #[cfg(feature = "alloc")]
1949 fn boxed_reader<'a>(self) -> Pin<Box<dyn AsyncRead + Send + 'a>>
1950 where
1951 Self: Sized + Send + 'a,
1952 {
1953 Box::pin(self)
1954 }
1955}
1956
1957impl<R: AsyncRead + ?Sized> AsyncReadExt for R {}
1958
1959/// Future for the [`AsyncReadExt::read()`] method.
1960#[derive(Debug)]
1961#[must_use = "futures do nothing unless you `.await` or poll them"]
1962pub struct ReadFuture<'a, R: Unpin + ?Sized> {
1963 reader: &'a mut R,
1964 buf: &'a mut [u8],
1965}
1966
1967impl<R: Unpin + ?Sized> Unpin for ReadFuture<'_, R> {}
1968
1969impl<R: AsyncRead + Unpin + ?Sized> Future for ReadFuture<'_, R> {
1970 type Output = Result<usize>;
1971
1972 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1973 let Self { reader, buf } = &mut *self;
1974 Pin::new(reader).poll_read(cx, buf)
1975 }
1976}
1977
1978/// Future for the [`AsyncReadExt::read_vectored()`] method.
1979#[derive(Debug)]
1980#[must_use = "futures do nothing unless you `.await` or poll them"]
1981pub struct ReadVectoredFuture<'a, R: Unpin + ?Sized> {
1982 reader: &'a mut R,
1983 bufs: &'a mut [IoSliceMut<'a>],
1984}
1985
1986impl<R: Unpin + ?Sized> Unpin for ReadVectoredFuture<'_, R> {}
1987
1988impl<R: AsyncRead + Unpin + ?Sized> Future for ReadVectoredFuture<'_, R> {
1989 type Output = Result<usize>;
1990
1991 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1992 let Self { reader, bufs } = &mut *self;
1993 Pin::new(reader).poll_read_vectored(cx, bufs)
1994 }
1995}
1996
1997/// Future for the [`AsyncReadExt::read_to_end()`] method.
1998#[derive(Debug)]
1999#[must_use = "futures do nothing unless you `.await` or poll them"]
2000pub struct ReadToEndFuture<'a, R: Unpin + ?Sized> {
2001 reader: &'a mut R,
2002 buf: &'a mut Vec<u8>,
2003 start_len: usize,
2004}
2005
2006impl<R: Unpin + ?Sized> Unpin for ReadToEndFuture<'_, R> {}
2007
2008impl<R: AsyncRead + Unpin + ?Sized> Future for ReadToEndFuture<'_, R> {
2009 type Output = Result<usize>;
2010
2011 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2012 let Self {
2013 reader,
2014 buf,
2015 start_len,
2016 } = &mut *self;
2017 read_to_end_internal(Pin::new(reader), cx, buf, *start_len)
2018 }
2019}
2020
2021/// Future for the [`AsyncReadExt::read_to_string()`] method.
2022#[derive(Debug)]
2023#[must_use = "futures do nothing unless you `.await` or poll them"]
2024pub struct ReadToStringFuture<'a, R: Unpin + ?Sized> {
2025 reader: &'a mut R,
2026 buf: &'a mut String,
2027 bytes: Vec<u8>,
2028 start_len: usize,
2029}
2030
2031impl<R: Unpin + ?Sized> Unpin for ReadToStringFuture<'_, R> {}
2032
2033impl<R: AsyncRead + Unpin + ?Sized> Future for ReadToStringFuture<'_, R> {
2034 type Output = Result<usize>;
2035
2036 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2037 let Self {
2038 reader,
2039 buf,
2040 bytes,
2041 start_len,
2042 } = &mut *self;
2043 let reader = Pin::new(reader);
2044
2045 let ret = ready!(read_to_end_internal(reader, cx, bytes, *start_len));
2046
2047 match String::from_utf8(mem::replace(bytes, Vec::new())) {
2048 Ok(s) => {
2049 debug_assert!(buf.is_empty());
2050 **buf = s;
2051 Poll::Ready(ret)
2052 }
2053 Err(_) => Poll::Ready(ret.and_then(|_| {
2054 Err(Error::new(
2055 ErrorKind::InvalidData,
2056 "stream did not contain valid UTF-8",
2057 ))
2058 })),
2059 }
2060 }
2061}
2062
2063// This uses an adaptive system to extend the vector when it fills. We want to
2064// avoid paying to allocate and zero a huge chunk of memory if the reader only
2065// has 4 bytes while still making large reads if the reader does have a ton
2066// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
2067// time is 4,500 times (!) slower than this if the reader has a very small
2068// amount of data to return.
2069//
2070// Because we're extending the buffer with uninitialized data for trusted
2071// readers, we need to make sure to truncate that if any of this panics.
2072fn read_to_end_internal<R: AsyncRead + ?Sized>(
2073 mut rd: Pin<&mut R>,
2074 cx: &mut Context<'_>,
2075 buf: &mut Vec<u8>,
2076 start_len: usize,
2077) -> Poll<Result<usize>> {
2078 struct Guard<'a> {
2079 buf: &'a mut Vec<u8>,
2080 len: usize,
2081 }
2082
2083 impl Drop for Guard<'_> {
2084 fn drop(&mut self) {
2085 self.buf.resize(self.len, 0);
2086 }
2087 }
2088
2089 let mut g = Guard {
2090 len: buf.len(),
2091 buf,
2092 };
2093 let ret;
2094 loop {
2095 if g.len == g.buf.len() {
2096 g.buf.reserve(32);
2097 let capacity = g.buf.capacity();
2098 g.buf.resize(capacity, 0);
2099 }
2100
2101 match ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) {
2102 Ok(0) => {
2103 ret = Poll::Ready(Ok(g.len - start_len));
2104 break;
2105 }
2106 Ok(n) => g.len += n,
2107 Err(e) => {
2108 ret = Poll::Ready(Err(e));
2109 break;
2110 }
2111 }
2112 }
2113
2114 ret
2115}
2116
2117/// Future for the [`AsyncReadExt::read_exact()`] method.
2118#[derive(Debug)]
2119#[must_use = "futures do nothing unless you `.await` or poll them"]
2120pub struct ReadExactFuture<'a, R: Unpin + ?Sized> {
2121 reader: &'a mut R,
2122 buf: &'a mut [u8],
2123}
2124
2125impl<R: Unpin + ?Sized> Unpin for ReadExactFuture<'_, R> {}
2126
2127impl<R: AsyncRead + Unpin + ?Sized> Future for ReadExactFuture<'_, R> {
2128 type Output = Result<()>;
2129
2130 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2131 let Self { reader, buf } = &mut *self;
2132
2133 while !buf.is_empty() {
2134 let n = ready!(Pin::new(&mut *reader).poll_read(cx, buf))?;
2135 let (_, rest) = mem::replace(buf, &mut []).split_at_mut(n);
2136 *buf = rest;
2137
2138 if n == 0 {
2139 return Poll::Ready(Err(ErrorKind::UnexpectedEof.into()));
2140 }
2141 }
2142
2143 Poll::Ready(Ok(()))
2144 }
2145}
2146
2147pin_project! {
2148 /// Reader for the [`AsyncReadExt::take()`] method.
2149 #[derive(Debug)]
2150 pub struct Take<R> {
2151 #[pin]
2152 inner: R,
2153 limit: u64,
2154 }
2155}
2156
2157impl<R> Take<R> {
2158 /// Returns the number of bytes before this adapter will return EOF.
2159 ///
2160 /// Note that EOF may be reached sooner if the underlying reader is shorter than the limit.
2161 ///
2162 /// # Examples
2163 ///
2164 /// ```
2165 /// use futures_lite::io::{AsyncReadExt, Cursor};
2166 ///
2167 /// let reader = Cursor::new("hello");
2168 ///
2169 /// let reader = reader.take(3);
2170 /// assert_eq!(reader.limit(), 3);
2171 /// ```
2172 pub fn limit(&self) -> u64 {
2173 self.limit
2174 }
2175
2176 /// Puts a limit on the number of bytes.
2177 ///
2178 /// Changing the limit is equivalent to creating a new adapter with [`AsyncReadExt::take()`].
2179 ///
2180 /// # Examples
2181 ///
2182 /// ```
2183 /// use futures_lite::io::{AsyncReadExt, Cursor};
2184 ///
2185 /// let reader = Cursor::new("hello");
2186 ///
2187 /// let mut reader = reader.take(10);
2188 /// assert_eq!(reader.limit(), 10);
2189 ///
2190 /// reader.set_limit(3);
2191 /// assert_eq!(reader.limit(), 3);
2192 /// ```
2193 pub fn set_limit(&mut self, limit: u64) {
2194 self.limit = limit;
2195 }
2196
2197 /// Gets a reference to the underlying reader.
2198 ///
2199 /// # Examples
2200 ///
2201 /// ```
2202 /// use futures_lite::io::{AsyncReadExt, Cursor};
2203 ///
2204 /// let reader = Cursor::new("hello");
2205 ///
2206 /// let reader = reader.take(3);
2207 /// let r = reader.get_ref();
2208 /// ```
2209 pub fn get_ref(&self) -> &R {
2210 &self.inner
2211 }
2212
2213 /// Gets a mutable reference to the underlying reader.
2214 ///
2215 /// # Examples
2216 ///
2217 /// ```
2218 /// use futures_lite::io::{AsyncReadExt, Cursor};
2219 ///
2220 /// let reader = Cursor::new("hello");
2221 ///
2222 /// let mut reader = reader.take(3);
2223 /// let r = reader.get_mut();
2224 /// ```
2225 pub fn get_mut(&mut self) -> &mut R {
2226 &mut self.inner
2227 }
2228
2229 /// Unwraps the adapter, returning the underlying reader.
2230 ///
2231 /// # Examples
2232 ///
2233 /// ```
2234 /// use futures_lite::io::{AsyncReadExt, Cursor};
2235 ///
2236 /// let reader = Cursor::new("hello");
2237 ///
2238 /// let reader = reader.take(3);
2239 /// let reader = reader.into_inner();
2240 /// ```
2241 pub fn into_inner(self) -> R {
2242 self.inner
2243 }
2244}
2245
2246impl<R: AsyncRead> AsyncRead for Take<R> {
2247 fn poll_read(
2248 self: Pin<&mut Self>,
2249 cx: &mut Context<'_>,
2250 buf: &mut [u8],
2251 ) -> Poll<Result<usize>> {
2252 let this = self.project();
2253 take_read_internal(this.inner, cx, buf, this.limit)
2254 }
2255}
2256
2257fn take_read_internal<R: AsyncRead + ?Sized>(
2258 mut rd: Pin<&mut R>,
2259 cx: &mut Context<'_>,
2260 buf: &mut [u8],
2261 limit: &mut u64,
2262) -> Poll<Result<usize>> {
2263 // Don't call into inner reader at all at EOF because it may still block
2264 if *limit == 0 {
2265 return Poll::Ready(Ok(0));
2266 }
2267
2268 let max = cmp::min(buf.len() as u64, *limit) as usize;
2269
2270 match ready!(rd.as_mut().poll_read(cx, &mut buf[..max])) {
2271 Ok(n) => {
2272 *limit -= n as u64;
2273 Poll::Ready(Ok(n))
2274 }
2275 Err(e) => Poll::Ready(Err(e)),
2276 }
2277}
2278
2279impl<R: AsyncBufRead> AsyncBufRead for Take<R> {
2280 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
2281 let this = self.project();
2282
2283 if *this.limit == 0 {
2284 return Poll::Ready(Ok(&[]));
2285 }
2286
2287 match ready!(this.inner.poll_fill_buf(cx)) {
2288 Ok(buf) => {
2289 let cap = cmp::min(buf.len() as u64, *this.limit) as usize;
2290 Poll::Ready(Ok(&buf[..cap]))
2291 }
2292 Err(e) => Poll::Ready(Err(e)),
2293 }
2294 }
2295
2296 fn consume(self: Pin<&mut Self>, amt: usize) {
2297 let this = self.project();
2298 // Don't let callers reset the limit by passing an overlarge value
2299 let amt = cmp::min(amt as u64, *this.limit) as usize;
2300 *this.limit -= amt as u64;
2301
2302 this.inner.consume(amt);
2303 }
2304}
2305
2306pin_project! {
2307 /// Reader for the [`AsyncReadExt::bytes()`] method.
2308 #[derive(Debug)]
2309 pub struct Bytes<R> {
2310 #[pin]
2311 inner: R,
2312 }
2313}
2314
2315impl<R: AsyncRead + Unpin> Stream for Bytes<R> {
2316 type Item = Result<u8>;
2317
2318 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2319 let mut byte = 0;
2320
2321 let rd = Pin::new(&mut self.inner);
2322
2323 match ready!(rd.poll_read(cx, std::slice::from_mut(&mut byte))) {
2324 Ok(0) => Poll::Ready(None),
2325 Ok(..) => Poll::Ready(Some(Ok(byte))),
2326 Err(ref e) if e.kind() == ErrorKind::Interrupted => Poll::Pending,
2327 Err(e) => Poll::Ready(Some(Err(e))),
2328 }
2329 }
2330}
2331
2332impl<R: AsyncRead> AsyncRead for Bytes<R> {
2333 fn poll_read(
2334 self: Pin<&mut Self>,
2335 cx: &mut Context<'_>,
2336 buf: &mut [u8],
2337 ) -> Poll<Result<usize>> {
2338 self.project().inner.poll_read(cx, buf)
2339 }
2340
2341 fn poll_read_vectored(
2342 self: Pin<&mut Self>,
2343 cx: &mut Context<'_>,
2344 bufs: &mut [IoSliceMut<'_>],
2345 ) -> Poll<Result<usize>> {
2346 self.project().inner.poll_read_vectored(cx, bufs)
2347 }
2348}
2349
2350pin_project! {
2351 /// Reader for the [`AsyncReadExt::chain()`] method.
2352 pub struct Chain<R1, R2> {
2353 #[pin]
2354 first: R1,
2355 #[pin]
2356 second: R2,
2357 done_first: bool,
2358 }
2359}
2360
2361impl<R1, R2> Chain<R1, R2> {
2362 /// Gets references to the underlying readers.
2363 ///
2364 /// # Examples
2365 ///
2366 /// ```
2367 /// use futures_lite::io::{AsyncReadExt, Cursor};
2368 ///
2369 /// let r1 = Cursor::new(b"hello");
2370 /// let r2 = Cursor::new(b"world");
2371 ///
2372 /// let reader = r1.chain(r2);
2373 /// let (r1, r2) = reader.get_ref();
2374 /// ```
2375 pub fn get_ref(&self) -> (&R1, &R2) {
2376 (&self.first, &self.second)
2377 }
2378
2379 /// Gets mutable references to the underlying readers.
2380 ///
2381 /// # Examples
2382 ///
2383 /// ```
2384 /// use futures_lite::io::{AsyncReadExt, Cursor};
2385 ///
2386 /// let r1 = Cursor::new(b"hello");
2387 /// let r2 = Cursor::new(b"world");
2388 ///
2389 /// let mut reader = r1.chain(r2);
2390 /// let (r1, r2) = reader.get_mut();
2391 /// ```
2392 pub fn get_mut(&mut self) -> (&mut R1, &mut R2) {
2393 (&mut self.first, &mut self.second)
2394 }
2395
2396 /// Unwraps the adapter, returning the underlying readers.
2397 ///
2398 /// # Examples
2399 ///
2400 /// ```
2401 /// use futures_lite::io::{AsyncReadExt, Cursor};
2402 ///
2403 /// let r1 = Cursor::new(b"hello");
2404 /// let r2 = Cursor::new(b"world");
2405 ///
2406 /// let reader = r1.chain(r2);
2407 /// let (r1, r2) = reader.into_inner();
2408 /// ```
2409 pub fn into_inner(self) -> (R1, R2) {
2410 (self.first, self.second)
2411 }
2412}
2413
2414impl<R1: fmt::Debug, R2: fmt::Debug> fmt::Debug for Chain<R1, R2> {
2415 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2416 f.debug_struct("Chain")
2417 .field("r1", &self.first)
2418 .field("r2", &self.second)
2419 .finish()
2420 }
2421}
2422
2423impl<R1: AsyncRead, R2: AsyncRead> AsyncRead for Chain<R1, R2> {
2424 fn poll_read(
2425 self: Pin<&mut Self>,
2426 cx: &mut Context<'_>,
2427 buf: &mut [u8],
2428 ) -> Poll<Result<usize>> {
2429 let this = self.project();
2430 if !*this.done_first {
2431 match ready!(this.first.poll_read(cx, buf)) {
2432 Ok(0) if !buf.is_empty() => *this.done_first = true,
2433 Ok(n) => return Poll::Ready(Ok(n)),
2434 Err(err) => return Poll::Ready(Err(err)),
2435 }
2436 }
2437
2438 this.second.poll_read(cx, buf)
2439 }
2440
2441 fn poll_read_vectored(
2442 self: Pin<&mut Self>,
2443 cx: &mut Context<'_>,
2444 bufs: &mut [IoSliceMut<'_>],
2445 ) -> Poll<Result<usize>> {
2446 let this = self.project();
2447 if !*this.done_first {
2448 match ready!(this.first.poll_read_vectored(cx, bufs)) {
2449 Ok(0) if !bufs.is_empty() => *this.done_first = true,
2450 Ok(n) => return Poll::Ready(Ok(n)),
2451 Err(err) => return Poll::Ready(Err(err)),
2452 }
2453 }
2454
2455 this.second.poll_read_vectored(cx, bufs)
2456 }
2457}
2458
2459impl<R1: AsyncBufRead, R2: AsyncBufRead> AsyncBufRead for Chain<R1, R2> {
2460 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
2461 let this = self.project();
2462 if !*this.done_first {
2463 match ready!(this.first.poll_fill_buf(cx)) {
2464 Ok(buf) if buf.is_empty() => {
2465 *this.done_first = true;
2466 }
2467 Ok(buf) => return Poll::Ready(Ok(buf)),
2468 Err(err) => return Poll::Ready(Err(err)),
2469 }
2470 }
2471
2472 this.second.poll_fill_buf(cx)
2473 }
2474
2475 fn consume(self: Pin<&mut Self>, amt: usize) {
2476 let this = self.project();
2477 if !*this.done_first {
2478 this.first.consume(amt)
2479 } else {
2480 this.second.consume(amt)
2481 }
2482 }
2483}
2484
2485/// Extension trait for [`AsyncSeek`].
2486pub trait AsyncSeekExt: AsyncSeek {
2487 /// Seeks to a new position in a byte stream.
2488 ///
2489 /// Returns the new position in the byte stream.
2490 ///
2491 /// A seek beyond the end of stream is allowed, but behavior is defined by the implementation.
2492 ///
2493 /// # Examples
2494 ///
2495 /// ```
2496 /// use futures_lite::io::{AsyncSeekExt, Cursor, SeekFrom};
2497 ///
2498 /// # spin_on::spin_on(async {
2499 /// let mut cursor = Cursor::new("hello");
2500 ///
2501 /// // Move the cursor to the end.
2502 /// cursor.seek(SeekFrom::End(0)).await?;
2503 ///
2504 /// // Check the current position.
2505 /// assert_eq!(cursor.seek(SeekFrom::Current(0)).await?, 5);
2506 /// # std::io::Result::Ok(()) });
2507 /// ```
2508 fn seek(&mut self, pos: SeekFrom) -> SeekFuture<'_, Self>
2509 where
2510 Self: Unpin,
2511 {
2512 SeekFuture { seeker: self, pos }
2513 }
2514}
2515
2516impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {}
2517
2518/// Future for the [`AsyncSeekExt::seek()`] method.
2519#[derive(Debug)]
2520#[must_use = "futures do nothing unless you `.await` or poll them"]
2521pub struct SeekFuture<'a, S: Unpin + ?Sized> {
2522 seeker: &'a mut S,
2523 pos: SeekFrom,
2524}
2525
2526impl<S: Unpin + ?Sized> Unpin for SeekFuture<'_, S> {}
2527
2528impl<S: AsyncSeek + Unpin + ?Sized> Future for SeekFuture<'_, S> {
2529 type Output = Result<u64>;
2530
2531 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2532 let pos = self.pos;
2533 Pin::new(&mut *self.seeker).poll_seek(cx, pos)
2534 }
2535}
2536
2537/// Extension trait for [`AsyncWrite`].
2538pub trait AsyncWriteExt: AsyncWrite {
2539 /// Writes some bytes into the byte stream.
2540 ///
2541 /// Returns the number of bytes written from the start of the buffer.
2542 ///
2543 /// If the return value is `Ok(n)` then it must be guaranteed that
2544 /// `0 <= n <= buf.len()`. A return value of `0` typically means that the underlying
2545 /// object is no longer able to accept bytes and will likely not be able to in the
2546 /// future as well, or that the provided buffer is empty.
2547 ///
2548 /// # Examples
2549 ///
2550 /// ```
2551 /// use futures_lite::io::{AsyncWriteExt, BufWriter};
2552 ///
2553 /// # spin_on::spin_on(async {
2554 /// let mut output = Vec::new();
2555 /// let mut writer = BufWriter::new(&mut output);
2556 ///
2557 /// let n = writer.write(b"hello").await?;
2558 /// # std::io::Result::Ok(()) });
2559 /// ```
2560 fn write<'a>(&'a mut self, buf: &'a [u8]) -> WriteFuture<'a, Self>
2561 where
2562 Self: Unpin,
2563 {
2564 WriteFuture { writer: self, buf }
2565 }
2566
2567 /// Like [`write()`][`AsyncWriteExt::write()`], except that it writes a slice of buffers.
2568 ///
2569 /// Data is copied from each buffer in order, with the final buffer possibly being only
2570 /// partially consumed. This method must behave same as a call to
2571 /// [`write()`][`AsyncWriteExt::write()`] with the buffers concatenated would.
2572 fn write_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'a>]) -> WriteVectoredFuture<'a, Self>
2573 where
2574 Self: Unpin,
2575 {
2576 WriteVectoredFuture { writer: self, bufs }
2577 }
2578
2579 /// Writes an entire buffer into the byte stream.
2580 ///
2581 /// This method will keep calling [`write()`][`AsyncWriteExt::write()`] until there is no more
2582 /// data to be written or an error occurs. It will not return before the entire buffer is
2583 /// successfully written or an error occurs.
2584 ///
2585 /// # Examples
2586 ///
2587 /// ```
2588 /// use futures_lite::io::{AsyncWriteExt, BufWriter};
2589 ///
2590 /// # spin_on::spin_on(async {
2591 /// let mut output = Vec::new();
2592 /// let mut writer = BufWriter::new(&mut output);
2593 ///
2594 /// let n = writer.write_all(b"hello").await?;
2595 /// # std::io::Result::Ok(()) });
2596 /// ```
2597 fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAllFuture<'a, Self>
2598 where
2599 Self: Unpin,
2600 {
2601 WriteAllFuture { writer: self, buf }
2602 }
2603
2604 /// Flushes the stream to ensure that all buffered contents reach their destination.
2605 ///
2606 /// # Examples
2607 ///
2608 /// ```
2609 /// use futures_lite::io::{AsyncWriteExt, BufWriter};
2610 ///
2611 /// # spin_on::spin_on(async {
2612 /// let mut output = Vec::new();
2613 /// let mut writer = BufWriter::new(&mut output);
2614 ///
2615 /// writer.write_all(b"hello").await?;
2616 /// writer.flush().await?;
2617 /// # std::io::Result::Ok(()) });
2618 /// ```
2619 fn flush(&mut self) -> FlushFuture<'_, Self>
2620 where
2621 Self: Unpin,
2622 {
2623 FlushFuture { writer: self }
2624 }
2625
2626 /// Closes the writer.
2627 ///
2628 /// # Examples
2629 ///
2630 /// ```
2631 /// use futures_lite::io::{AsyncWriteExt, BufWriter};
2632 ///
2633 /// # spin_on::spin_on(async {
2634 /// let mut output = Vec::new();
2635 /// let mut writer = BufWriter::new(&mut output);
2636 ///
2637 /// writer.close().await?;
2638 /// # std::io::Result::Ok(()) });
2639 /// ```
2640 fn close(&mut self) -> CloseFuture<'_, Self>
2641 where
2642 Self: Unpin,
2643 {
2644 CloseFuture { writer: self }
2645 }
2646
2647 /// Boxes the writer and changes its type to `dyn AsyncWrite + Send + 'a`.
2648 ///
2649 /// # Examples
2650 ///
2651 /// ```
2652 /// use futures_lite::io::AsyncWriteExt;
2653 ///
2654 /// let writer = Vec::<u8>::new().boxed_writer();
2655 /// ```
2656 #[cfg(feature = "alloc")]
2657 fn boxed_writer<'a>(self) -> Pin<Box<dyn AsyncWrite + Send + 'a>>
2658 where
2659 Self: Sized + Send + 'a,
2660 {
2661 Box::pin(self)
2662 }
2663}
2664
2665impl<W: AsyncWrite + ?Sized> AsyncWriteExt for W {}
2666
2667/// Future for the [`AsyncWriteExt::write()`] method.
2668#[derive(Debug)]
2669#[must_use = "futures do nothing unless you `.await` or poll them"]
2670pub struct WriteFuture<'a, W: Unpin + ?Sized> {
2671 writer: &'a mut W,
2672 buf: &'a [u8],
2673}
2674
2675impl<W: Unpin + ?Sized> Unpin for WriteFuture<'_, W> {}
2676
2677impl<W: AsyncWrite + Unpin + ?Sized> Future for WriteFuture<'_, W> {
2678 type Output = Result<usize>;
2679
2680 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2681 let buf = self.buf;
2682 Pin::new(&mut *self.writer).poll_write(cx, buf)
2683 }
2684}
2685
2686/// Future for the [`AsyncWriteExt::write_vectored()`] method.
2687#[derive(Debug)]
2688#[must_use = "futures do nothing unless you `.await` or poll them"]
2689pub struct WriteVectoredFuture<'a, W: Unpin + ?Sized> {
2690 writer: &'a mut W,
2691 bufs: &'a [IoSlice<'a>],
2692}
2693
2694impl<W: Unpin + ?Sized> Unpin for WriteVectoredFuture<'_, W> {}
2695
2696impl<W: AsyncWrite + Unpin + ?Sized> Future for WriteVectoredFuture<'_, W> {
2697 type Output = Result<usize>;
2698
2699 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2700 let bufs = self.bufs;
2701 Pin::new(&mut *self.writer).poll_write_vectored(cx, bufs)
2702 }
2703}
2704
2705/// Future for the [`AsyncWriteExt::write_all()`] method.
2706#[derive(Debug)]
2707#[must_use = "futures do nothing unless you `.await` or poll them"]
2708pub struct WriteAllFuture<'a, W: Unpin + ?Sized> {
2709 writer: &'a mut W,
2710 buf: &'a [u8],
2711}
2712
2713impl<W: Unpin + ?Sized> Unpin for WriteAllFuture<'_, W> {}
2714
2715impl<W: AsyncWrite + Unpin + ?Sized> Future for WriteAllFuture<'_, W> {
2716 type Output = Result<()>;
2717
2718 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2719 let Self { writer, buf } = &mut *self;
2720
2721 while !buf.is_empty() {
2722 let n = ready!(Pin::new(&mut **writer).poll_write(cx, buf))?;
2723 let (_, rest) = mem::replace(buf, &[]).split_at(n);
2724 *buf = rest;
2725
2726 if n == 0 {
2727 return Poll::Ready(Err(ErrorKind::WriteZero.into()));
2728 }
2729 }
2730
2731 Poll::Ready(Ok(()))
2732 }
2733}
2734
2735/// Future for the [`AsyncWriteExt::flush()`] method.
2736#[derive(Debug)]
2737#[must_use = "futures do nothing unless you `.await` or poll them"]
2738pub struct FlushFuture<'a, W: Unpin + ?Sized> {
2739 writer: &'a mut W,
2740}
2741
2742impl<W: Unpin + ?Sized> Unpin for FlushFuture<'_, W> {}
2743
2744impl<W: AsyncWrite + Unpin + ?Sized> Future for FlushFuture<'_, W> {
2745 type Output = Result<()>;
2746
2747 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2748 Pin::new(&mut *self.writer).poll_flush(cx)
2749 }
2750}
2751
2752/// Future for the [`AsyncWriteExt::close()`] method.
2753#[derive(Debug)]
2754#[must_use = "futures do nothing unless you `.await` or poll them"]
2755pub struct CloseFuture<'a, W: Unpin + ?Sized> {
2756 writer: &'a mut W,
2757}
2758
2759impl<W: Unpin + ?Sized> Unpin for CloseFuture<'_, W> {}
2760
2761impl<W: AsyncWrite + Unpin + ?Sized> Future for CloseFuture<'_, W> {
2762 type Output = Result<()>;
2763
2764 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2765 Pin::new(&mut *self.writer).poll_close(cx)
2766 }
2767}
2768
2769/// Type alias for `Pin<Box<dyn AsyncRead + Send + 'static>>`.
2770///
2771/// # Examples
2772///
2773/// ```
2774/// use futures_lite::io::AsyncReadExt;
2775///
2776/// let reader = [1, 2, 3].boxed_reader();
2777/// ```
2778#[cfg(feature = "alloc")]
2779pub type BoxedReader = Pin<Box<dyn AsyncRead + Send + 'static>>;
2780
2781/// Type alias for `Pin<Box<dyn AsyncWrite + Send + 'static>>`.
2782///
2783/// # Examples
2784///
2785/// ```
2786/// use futures_lite::io::AsyncWriteExt;
2787///
2788/// let writer = Vec::<u8>::new().boxed_writer();
2789/// ```
2790#[cfg(feature = "alloc")]
2791pub type BoxedWriter = Pin<Box<dyn AsyncWrite + Send + 'static>>;
2792
2793/// Splits a stream into [`AsyncRead`] and [`AsyncWrite`] halves.
2794///
2795/// # Examples
2796///
2797/// ```
2798/// use futures_lite::io::{self, Cursor};
2799///
2800/// # spin_on::spin_on(async {
2801/// let stream = Cursor::new(vec![]);
2802/// let (mut reader, mut writer) = io::split(stream);
2803/// # std::io::Result::Ok(()) });
2804/// ```
2805pub fn split<T>(stream: T) -> (ReadHalf<T>, WriteHalf<T>)
2806where
2807 T: AsyncRead + AsyncWrite + Unpin,
2808{
2809 let inner = Arc::new(Mutex::new(stream));
2810 (ReadHalf(inner.clone()), WriteHalf(inner))
2811}
2812
2813/// The read half returned by [`split()`].
2814#[derive(Debug)]
2815pub struct ReadHalf<T>(Arc<Mutex<T>>);
2816
2817/// The write half returned by [`split()`].
2818#[derive(Debug)]
2819pub struct WriteHalf<T>(Arc<Mutex<T>>);
2820
2821impl<T: AsyncRead + Unpin> AsyncRead for ReadHalf<T> {
2822 fn poll_read(
2823 self: Pin<&mut Self>,
2824 cx: &mut Context<'_>,
2825 buf: &mut [u8],
2826 ) -> Poll<Result<usize>> {
2827 let mut inner = self.0.lock().unwrap();
2828 Pin::new(&mut *inner).poll_read(cx, buf)
2829 }
2830
2831 fn poll_read_vectored(
2832 self: Pin<&mut Self>,
2833 cx: &mut Context<'_>,
2834 bufs: &mut [IoSliceMut<'_>],
2835 ) -> Poll<Result<usize>> {
2836 let mut inner = self.0.lock().unwrap();
2837 Pin::new(&mut *inner).poll_read_vectored(cx, bufs)
2838 }
2839}
2840
2841impl<T: AsyncWrite + Unpin> AsyncWrite for WriteHalf<T> {
2842 fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
2843 let mut inner = self.0.lock().unwrap();
2844 Pin::new(&mut *inner).poll_write(cx, buf)
2845 }
2846
2847 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
2848 let mut inner = self.0.lock().unwrap();
2849 Pin::new(&mut *inner).poll_flush(cx)
2850 }
2851
2852 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
2853 let mut inner = self.0.lock().unwrap();
2854 Pin::new(&mut *inner).poll_close(cx)
2855 }
2856}