futures_util/io/
into_sink.rs

1use futures_core::ready;
2use futures_core::task::{Context, Poll};
3use futures_io::AsyncWrite;
4use futures_sink::Sink;
5use pin_project_lite::pin_project;
6use std::io;
7use std::pin::Pin;
8
9#[derive(Debug)]
10struct Block<Item> {
11    offset: usize,
12    bytes: Item,
13}
14
15pin_project! {
16    /// Sink for the [`into_sink`](super::AsyncWriteExt::into_sink) method.
17    #[must_use = "sinks do nothing unless polled"]
18    #[derive(Debug)]
19    #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
20    pub struct IntoSink<W, Item> {
21        #[pin]
22        writer: W,
23        // An outstanding block for us to push into the underlying writer, along with an offset of how
24        // far into this block we have written already.
25        buffer: Option<Block<Item>>,
26    }
27}
28
29impl<W: AsyncWrite, Item: AsRef<[u8]>> IntoSink<W, Item> {
30    pub(super) fn new(writer: W) -> Self {
31        Self { writer, buffer: None }
32    }
33
34    /// If we have an outstanding block in `buffer` attempt to push it into the writer, does _not_
35    /// flush the writer after it succeeds in pushing the block into it.
36    fn poll_flush_buffer(
37        self: Pin<&mut Self>,
38        cx: &mut Context<'_>,
39    ) -> Poll<Result<(), io::Error>> {
40        let mut this = self.project();
41
42        if let Some(buffer) = this.buffer {
43            loop {
44                let bytes = buffer.bytes.as_ref();
45                let written = ready!(this.writer.as_mut().poll_write(cx, &bytes[buffer.offset..]))?;
46                buffer.offset += written;
47                if buffer.offset == bytes.len() {
48                    break;
49                }
50            }
51        }
52        *this.buffer = None;
53        Poll::Ready(Ok(()))
54    }
55}
56
57impl<W: AsyncWrite, Item: AsRef<[u8]>> Sink<Item> for IntoSink<W, Item> {
58    type Error = io::Error;
59
60    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
61        ready!(self.poll_flush_buffer(cx))?;
62        Poll::Ready(Ok(()))
63    }
64
65    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
66        debug_assert!(self.buffer.is_none());
67        *self.project().buffer = Some(Block { offset: 0, bytes: item });
68        Ok(())
69    }
70
71    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
72        ready!(self.as_mut().poll_flush_buffer(cx))?;
73        ready!(self.project().writer.poll_flush(cx))?;
74        Poll::Ready(Ok(()))
75    }
76
77    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
78        ready!(self.as_mut().poll_flush_buffer(cx))?;
79        ready!(self.project().writer.poll_close(cx))?;
80        Poll::Ready(Ok(()))
81    }
82}