futures_io/
lib.rs

1//! Asynchronous I/O
2//!
3//! This crate contains the `AsyncRead`, `AsyncWrite`, `AsyncSeek`, and
4//! `AsyncBufRead` traits, the asynchronous analogs to
5//! `std::io::{Read, Write, Seek, BufRead}`. The primary difference is
6//! that these traits integrate with the asynchronous task system.
7//!
8//! All items of this library are only available when the `std` feature of this
9//! library is activated, and it is activated by default.
10
11#![cfg_attr(not(feature = "std"), no_std)]
12#![warn(missing_debug_implementations, missing_docs, rust_2018_idioms, unreachable_pub)]
13// It cannot be included in the published code because this lints have false positives in the minimum required version.
14#![cfg_attr(test, warn(single_use_lifetimes))]
15#![doc(test(
16    no_crate_inject,
17    attr(
18        deny(warnings, rust_2018_idioms, single_use_lifetimes),
19        allow(dead_code, unused_assignments, unused_variables)
20    )
21))]
22#![cfg_attr(docsrs, feature(doc_cfg))]
23
24#[cfg(feature = "std")]
25mod if_std {
26    use std::io;
27    use std::ops::DerefMut;
28    use std::pin::Pin;
29    use std::task::{Context, Poll};
30
31    // Re-export some types from `std::io` so that users don't have to deal
32    // with conflicts when `use`ing `futures::io` and `std::io`.
33    #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
34    #[doc(no_inline)]
35    pub use io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom};
36
37    /// Read bytes asynchronously.
38    ///
39    /// This trait is analogous to the `std::io::Read` trait, but integrates
40    /// with the asynchronous task system. In particular, the `poll_read`
41    /// method, unlike `Read::read`, will automatically queue the current task
42    /// for wakeup and return if data is not yet available, rather than blocking
43    /// the calling thread.
44    pub trait AsyncRead {
45        /// Attempt to read from the `AsyncRead` into `buf`.
46        ///
47        /// On success, returns `Poll::Ready(Ok(num_bytes_read))`.
48        ///
49        /// If no data is available for reading, the method returns
50        /// `Poll::Pending` and arranges for the current task (via
51        /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
52        /// readable or is closed.
53        ///
54        /// # Implementation
55        ///
56        /// This function may not return errors of kind `WouldBlock` or
57        /// `Interrupted`.  Implementations must convert `WouldBlock` into
58        /// `Poll::Pending` and either internally retry or convert
59        /// `Interrupted` into another error kind.
60        fn poll_read(
61            self: Pin<&mut Self>,
62            cx: &mut Context<'_>,
63            buf: &mut [u8],
64        ) -> Poll<Result<usize>>;
65
66        /// Attempt to read from the `AsyncRead` into `bufs` using vectored
67        /// IO operations.
68        ///
69        /// This method is similar to `poll_read`, but allows data to be read
70        /// into multiple buffers using a single operation.
71        ///
72        /// On success, returns `Poll::Ready(Ok(num_bytes_read))`.
73        ///
74        /// If no data is available for reading, the method returns
75        /// `Poll::Pending` and arranges for the current task (via
76        /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
77        /// readable or is closed.
78        /// By default, this method delegates to using `poll_read` on the first
79        /// nonempty buffer in `bufs`, or an empty one if none exists. Objects which
80        /// support vectored IO should override this method.
81        ///
82        /// # Implementation
83        ///
84        /// This function may not return errors of kind `WouldBlock` or
85        /// `Interrupted`.  Implementations must convert `WouldBlock` into
86        /// `Poll::Pending` and either internally retry or convert
87        /// `Interrupted` into another error kind.
88        fn poll_read_vectored(
89            self: Pin<&mut Self>,
90            cx: &mut Context<'_>,
91            bufs: &mut [IoSliceMut<'_>],
92        ) -> Poll<Result<usize>> {
93            for b in bufs {
94                if !b.is_empty() {
95                    return self.poll_read(cx, b);
96                }
97            }
98
99            self.poll_read(cx, &mut [])
100        }
101    }
102
103    /// Write bytes asynchronously.
104    ///
105    /// This trait is analogous to the `std::io::Write` trait, but integrates
106    /// with the asynchronous task system. In particular, the `poll_write`
107    /// method, unlike `Write::write`, will automatically queue the current task
108    /// for wakeup and return if the writer cannot take more data, rather than blocking
109    /// the calling thread.
110    pub trait AsyncWrite {
111        /// Attempt to write bytes from `buf` into the object.
112        ///
113        /// On success, returns `Poll::Ready(Ok(num_bytes_written))`.
114        ///
115        /// If the object is not ready for writing, the method returns
116        /// `Poll::Pending` and arranges for the current task (via
117        /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
118        /// writable or is closed.
119        ///
120        /// # Implementation
121        ///
122        /// This function may not return errors of kind `WouldBlock` or
123        /// `Interrupted`.  Implementations must convert `WouldBlock` into
124        /// `Poll::Pending` and either internally retry or convert
125        /// `Interrupted` into another error kind.
126        ///
127        /// `poll_write` must try to make progress by flushing the underlying object if
128        /// that is the only way the underlying object can become writable again.
129        fn poll_write(
130            self: Pin<&mut Self>,
131            cx: &mut Context<'_>,
132            buf: &[u8],
133        ) -> Poll<Result<usize>>;
134
135        /// Attempt to write bytes from `bufs` into the object using vectored
136        /// IO operations.
137        ///
138        /// This method is similar to `poll_write`, but allows data from multiple buffers to be written
139        /// using a single operation.
140        ///
141        /// On success, returns `Poll::Ready(Ok(num_bytes_written))`.
142        ///
143        /// If the object is not ready for writing, the method returns
144        /// `Poll::Pending` and arranges for the current task (via
145        /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
146        /// writable or is closed.
147        ///
148        /// By default, this method delegates to using `poll_write` on the first
149        /// nonempty buffer in `bufs`, or an empty one if none exists. Objects which
150        /// support vectored IO should override this method.
151        ///
152        /// # Implementation
153        ///
154        /// This function may not return errors of kind `WouldBlock` or
155        /// `Interrupted`.  Implementations must convert `WouldBlock` into
156        /// `Poll::Pending` and either internally retry or convert
157        /// `Interrupted` into another error kind.
158        fn poll_write_vectored(
159            self: Pin<&mut Self>,
160            cx: &mut Context<'_>,
161            bufs: &[IoSlice<'_>],
162        ) -> Poll<Result<usize>> {
163            for b in bufs {
164                if !b.is_empty() {
165                    return self.poll_write(cx, b);
166                }
167            }
168
169            self.poll_write(cx, &[])
170        }
171
172        /// Attempt to flush the object, ensuring that any buffered data reach
173        /// their destination.
174        ///
175        /// On success, returns `Poll::Ready(Ok(()))`.
176        ///
177        /// If flushing cannot immediately complete, this method returns
178        /// `Poll::Pending` and arranges for the current task (via
179        /// `cx.waker().wake_by_ref()`) to receive a notification when the object can make
180        /// progress towards flushing.
181        ///
182        /// # Implementation
183        ///
184        /// This function may not return errors of kind `WouldBlock` or
185        /// `Interrupted`.  Implementations must convert `WouldBlock` into
186        /// `Poll::Pending` and either internally retry or convert
187        /// `Interrupted` into another error kind.
188        ///
189        /// It only makes sense to do anything here if you actually buffer data.
190        fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>;
191
192        /// Attempt to close the object.
193        ///
194        /// On success, returns `Poll::Ready(Ok(()))`.
195        ///
196        /// If closing cannot immediately complete, this function returns
197        /// `Poll::Pending` and arranges for the current task (via
198        /// `cx.waker().wake_by_ref()`) to receive a notification when the object can make
199        /// progress towards closing.
200        ///
201        /// # Implementation
202        ///
203        /// This function may not return errors of kind `WouldBlock` or
204        /// `Interrupted`.  Implementations must convert `WouldBlock` into
205        /// `Poll::Pending` and either internally retry or convert
206        /// `Interrupted` into another error kind.
207        fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>;
208    }
209
210    /// Seek bytes asynchronously.
211    ///
212    /// This trait is analogous to the `std::io::Seek` trait, but integrates
213    /// with the asynchronous task system. In particular, the `poll_seek`
214    /// method, unlike `Seek::seek`, will automatically queue the current task
215    /// for wakeup and return if data is not yet available, rather than blocking
216    /// the calling thread.
217    pub trait AsyncSeek {
218        /// Attempt to seek to an offset, in bytes, in a stream.
219        ///
220        /// A seek beyond the end of a stream is allowed, but behavior is defined
221        /// by the implementation.
222        ///
223        /// If the seek operation completed successfully,
224        /// this method returns the new position from the start of the stream.
225        /// That position can be used later with [`SeekFrom::Start`].
226        ///
227        /// # Errors
228        ///
229        /// Seeking to a negative offset is considered an error.
230        ///
231        /// # Implementation
232        ///
233        /// This function may not return errors of kind `WouldBlock` or
234        /// `Interrupted`.  Implementations must convert `WouldBlock` into
235        /// `Poll::Pending` and either internally retry or convert
236        /// `Interrupted` into another error kind.
237        fn poll_seek(
238            self: Pin<&mut Self>,
239            cx: &mut Context<'_>,
240            pos: SeekFrom,
241        ) -> Poll<Result<u64>>;
242    }
243
244    /// Read bytes asynchronously.
245    ///
246    /// This trait is analogous to the `std::io::BufRead` trait, but integrates
247    /// with the asynchronous task system. In particular, the `poll_fill_buf`
248    /// method, unlike `BufRead::fill_buf`, will automatically queue the current task
249    /// for wakeup and return if data is not yet available, rather than blocking
250    /// the calling thread.
251    pub trait AsyncBufRead: AsyncRead {
252        /// Attempt to return the contents of the internal buffer, filling it with more data
253        /// from the inner reader if it is empty.
254        ///
255        /// On success, returns `Poll::Ready(Ok(buf))`.
256        ///
257        /// If no data is available for reading, the method returns
258        /// `Poll::Pending` and arranges for the current task (via
259        /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
260        /// readable or is closed.
261        ///
262        /// This function is a lower-level call. It needs to be paired with the
263        /// [`consume`] method to function properly. When calling this
264        /// method, none of the contents will be "read" in the sense that later
265        /// calling [`poll_read`] may return the same contents. As such, [`consume`] must
266        /// be called with the number of bytes that are consumed from this buffer to
267        /// ensure that the bytes are never returned twice.
268        ///
269        /// [`poll_read`]: AsyncRead::poll_read
270        /// [`consume`]: AsyncBufRead::consume
271        ///
272        /// An empty buffer returned indicates that the stream has reached EOF.
273        ///
274        /// # Implementation
275        ///
276        /// This function may not return errors of kind `WouldBlock` or
277        /// `Interrupted`.  Implementations must convert `WouldBlock` into
278        /// `Poll::Pending` and either internally retry or convert
279        /// `Interrupted` into another error kind.
280        fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>>;
281
282        /// Tells this buffer that `amt` bytes have been consumed from the buffer,
283        /// so they should no longer be returned in calls to [`poll_read`].
284        ///
285        /// This function is a lower-level call. It needs to be paired with the
286        /// [`poll_fill_buf`] method to function properly. This function does
287        /// not perform any I/O, it simply informs this object that some amount of
288        /// its buffer, returned from [`poll_fill_buf`], has been consumed and should
289        /// no longer be returned. As such, this function may do odd things if
290        /// [`poll_fill_buf`] isn't called before calling it.
291        ///
292        /// The `amt` must be `<=` the number of bytes in the buffer returned by
293        /// [`poll_fill_buf`].
294        ///
295        /// [`poll_read`]: AsyncRead::poll_read
296        /// [`poll_fill_buf`]: AsyncBufRead::poll_fill_buf
297        fn consume(self: Pin<&mut Self>, amt: usize);
298    }
299
300    macro_rules! deref_async_read {
301        () => {
302            fn poll_read(
303                mut self: Pin<&mut Self>,
304                cx: &mut Context<'_>,
305                buf: &mut [u8],
306            ) -> Poll<Result<usize>> {
307                Pin::new(&mut **self).poll_read(cx, buf)
308            }
309
310            fn poll_read_vectored(
311                mut self: Pin<&mut Self>,
312                cx: &mut Context<'_>,
313                bufs: &mut [IoSliceMut<'_>],
314            ) -> Poll<Result<usize>> {
315                Pin::new(&mut **self).poll_read_vectored(cx, bufs)
316            }
317        };
318    }
319
320    impl<T: ?Sized + AsyncRead + Unpin> AsyncRead for Box<T> {
321        deref_async_read!();
322    }
323
324    impl<T: ?Sized + AsyncRead + Unpin> AsyncRead for &mut T {
325        deref_async_read!();
326    }
327
328    impl<P> AsyncRead for Pin<P>
329    where
330        P: DerefMut + Unpin,
331        P::Target: AsyncRead,
332    {
333        fn poll_read(
334            self: Pin<&mut Self>,
335            cx: &mut Context<'_>,
336            buf: &mut [u8],
337        ) -> Poll<Result<usize>> {
338            self.get_mut().as_mut().poll_read(cx, buf)
339        }
340
341        fn poll_read_vectored(
342            self: Pin<&mut Self>,
343            cx: &mut Context<'_>,
344            bufs: &mut [IoSliceMut<'_>],
345        ) -> Poll<Result<usize>> {
346            self.get_mut().as_mut().poll_read_vectored(cx, bufs)
347        }
348    }
349
350    macro_rules! delegate_async_read_to_stdio {
351        () => {
352            fn poll_read(
353                mut self: Pin<&mut Self>,
354                _: &mut Context<'_>,
355                buf: &mut [u8],
356            ) -> Poll<Result<usize>> {
357                Poll::Ready(io::Read::read(&mut *self, buf))
358            }
359
360            fn poll_read_vectored(
361                mut self: Pin<&mut Self>,
362                _: &mut Context<'_>,
363                bufs: &mut [IoSliceMut<'_>],
364            ) -> Poll<Result<usize>> {
365                Poll::Ready(io::Read::read_vectored(&mut *self, bufs))
366            }
367        };
368    }
369
370    impl AsyncRead for &[u8] {
371        delegate_async_read_to_stdio!();
372    }
373
374    macro_rules! deref_async_write {
375        () => {
376            fn poll_write(
377                mut self: Pin<&mut Self>,
378                cx: &mut Context<'_>,
379                buf: &[u8],
380            ) -> Poll<Result<usize>> {
381                Pin::new(&mut **self).poll_write(cx, buf)
382            }
383
384            fn poll_write_vectored(
385                mut self: Pin<&mut Self>,
386                cx: &mut Context<'_>,
387                bufs: &[IoSlice<'_>],
388            ) -> Poll<Result<usize>> {
389                Pin::new(&mut **self).poll_write_vectored(cx, bufs)
390            }
391
392            fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
393                Pin::new(&mut **self).poll_flush(cx)
394            }
395
396            fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
397                Pin::new(&mut **self).poll_close(cx)
398            }
399        };
400    }
401
402    impl<T: ?Sized + AsyncWrite + Unpin> AsyncWrite for Box<T> {
403        deref_async_write!();
404    }
405
406    impl<T: ?Sized + AsyncWrite + Unpin> AsyncWrite for &mut T {
407        deref_async_write!();
408    }
409
410    impl<P> AsyncWrite for Pin<P>
411    where
412        P: DerefMut + Unpin,
413        P::Target: AsyncWrite,
414    {
415        fn poll_write(
416            self: Pin<&mut Self>,
417            cx: &mut Context<'_>,
418            buf: &[u8],
419        ) -> Poll<Result<usize>> {
420            self.get_mut().as_mut().poll_write(cx, buf)
421        }
422
423        fn poll_write_vectored(
424            self: Pin<&mut Self>,
425            cx: &mut Context<'_>,
426            bufs: &[IoSlice<'_>],
427        ) -> Poll<Result<usize>> {
428            self.get_mut().as_mut().poll_write_vectored(cx, bufs)
429        }
430
431        fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
432            self.get_mut().as_mut().poll_flush(cx)
433        }
434
435        fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
436            self.get_mut().as_mut().poll_close(cx)
437        }
438    }
439
440    macro_rules! delegate_async_write_to_stdio {
441        () => {
442            fn poll_write(
443                mut self: Pin<&mut Self>,
444                _: &mut Context<'_>,
445                buf: &[u8],
446            ) -> Poll<Result<usize>> {
447                Poll::Ready(io::Write::write(&mut *self, buf))
448            }
449
450            fn poll_write_vectored(
451                mut self: Pin<&mut Self>,
452                _: &mut Context<'_>,
453                bufs: &[IoSlice<'_>],
454            ) -> Poll<Result<usize>> {
455                Poll::Ready(io::Write::write_vectored(&mut *self, bufs))
456            }
457
458            fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
459                Poll::Ready(io::Write::flush(&mut *self))
460            }
461
462            fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
463                self.poll_flush(cx)
464            }
465        };
466    }
467
468    impl AsyncWrite for Vec<u8> {
469        delegate_async_write_to_stdio!();
470    }
471
472    macro_rules! deref_async_seek {
473        () => {
474            fn poll_seek(
475                mut self: Pin<&mut Self>,
476                cx: &mut Context<'_>,
477                pos: SeekFrom,
478            ) -> Poll<Result<u64>> {
479                Pin::new(&mut **self).poll_seek(cx, pos)
480            }
481        };
482    }
483
484    impl<T: ?Sized + AsyncSeek + Unpin> AsyncSeek for Box<T> {
485        deref_async_seek!();
486    }
487
488    impl<T: ?Sized + AsyncSeek + Unpin> AsyncSeek for &mut T {
489        deref_async_seek!();
490    }
491
492    impl<P> AsyncSeek for Pin<P>
493    where
494        P: DerefMut + Unpin,
495        P::Target: AsyncSeek,
496    {
497        fn poll_seek(
498            self: Pin<&mut Self>,
499            cx: &mut Context<'_>,
500            pos: SeekFrom,
501        ) -> Poll<Result<u64>> {
502            self.get_mut().as_mut().poll_seek(cx, pos)
503        }
504    }
505
506    macro_rules! deref_async_buf_read {
507        () => {
508            fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
509                Pin::new(&mut **self.get_mut()).poll_fill_buf(cx)
510            }
511
512            fn consume(mut self: Pin<&mut Self>, amt: usize) {
513                Pin::new(&mut **self).consume(amt)
514            }
515        };
516    }
517
518    impl<T: ?Sized + AsyncBufRead + Unpin> AsyncBufRead for Box<T> {
519        deref_async_buf_read!();
520    }
521
522    impl<T: ?Sized + AsyncBufRead + Unpin> AsyncBufRead for &mut T {
523        deref_async_buf_read!();
524    }
525
526    impl<P> AsyncBufRead for Pin<P>
527    where
528        P: DerefMut + Unpin,
529        P::Target: AsyncBufRead,
530    {
531        fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
532            self.get_mut().as_mut().poll_fill_buf(cx)
533        }
534
535        fn consume(self: Pin<&mut Self>, amt: usize) {
536            self.get_mut().as_mut().consume(amt)
537        }
538    }
539
540    macro_rules! delegate_async_buf_read_to_stdio {
541        () => {
542            fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<&[u8]>> {
543                Poll::Ready(io::BufRead::fill_buf(self.get_mut()))
544            }
545
546            fn consume(self: Pin<&mut Self>, amt: usize) {
547                io::BufRead::consume(self.get_mut(), amt)
548            }
549        };
550    }
551
552    impl AsyncBufRead for &[u8] {
553        delegate_async_buf_read_to_stdio!();
554    }
555}
556
557#[cfg(feature = "std")]
558pub use self::if_std::*;