futures_util/io/
read_until.rs

1use futures_core::future::Future;
2use futures_core::ready;
3use futures_core::task::{Context, Poll};
4use futures_io::AsyncBufRead;
5use std::io;
6use std::mem;
7use std::pin::Pin;
8
9/// Future for the [`read_until`](super::AsyncBufReadExt::read_until) method.
10#[derive(Debug)]
11#[must_use = "futures do nothing unless you `.await` or poll them"]
12pub struct ReadUntil<'a, R: ?Sized> {
13    reader: &'a mut R,
14    byte: u8,
15    buf: &'a mut Vec<u8>,
16    read: usize,
17}
18
19impl<R: ?Sized + Unpin> Unpin for ReadUntil<'_, R> {}
20
21impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadUntil<'a, R> {
22    pub(super) fn new(reader: &'a mut R, byte: u8, buf: &'a mut Vec<u8>) -> Self {
23        Self { reader, byte, buf, read: 0 }
24    }
25}
26
27pub(super) fn read_until_internal<R: AsyncBufRead + ?Sized>(
28    mut reader: Pin<&mut R>,
29    cx: &mut Context<'_>,
30    byte: u8,
31    buf: &mut Vec<u8>,
32    read: &mut usize,
33) -> Poll<io::Result<usize>> {
34    loop {
35        let (done, used) = {
36            let available = ready!(reader.as_mut().poll_fill_buf(cx))?;
37            if let Some(i) = memchr::memchr(byte, available) {
38                buf.extend_from_slice(&available[..=i]);
39                (true, i + 1)
40            } else {
41                buf.extend_from_slice(available);
42                (false, available.len())
43            }
44        };
45        reader.as_mut().consume(used);
46        *read += used;
47        if done || used == 0 {
48            return Poll::Ready(Ok(mem::replace(read, 0)));
49        }
50    }
51}
52
53impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadUntil<'_, R> {
54    type Output = io::Result<usize>;
55
56    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
57        let Self { reader, byte, buf, read } = &mut *self;
58        read_until_internal(Pin::new(reader), cx, *byte, buf, read)
59    }
60}