futures_util/stream/try_stream/
try_chunks.rs

1use crate::stream::{Fuse, IntoStream, StreamExt};
2
3use alloc::vec::Vec;
4use core::pin::Pin;
5use core::{fmt, mem};
6use futures_core::ready;
7use futures_core::stream::{FusedStream, Stream, TryStream};
8use futures_core::task::{Context, Poll};
9#[cfg(feature = "sink")]
10use futures_sink::Sink;
11use pin_project_lite::pin_project;
12
13pin_project! {
14    /// Stream for the [`try_chunks`](super::TryStreamExt::try_chunks) method.
15    #[derive(Debug)]
16    #[must_use = "streams do nothing unless polled"]
17    pub struct TryChunks<St: TryStream> {
18        #[pin]
19        stream: Fuse<IntoStream<St>>,
20        items: Vec<St::Ok>,
21        cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
22    }
23}
24
25impl<St: TryStream> TryChunks<St> {
26    pub(super) fn new(stream: St, capacity: usize) -> Self {
27        assert!(capacity > 0);
28
29        Self {
30            stream: IntoStream::new(stream).fuse(),
31            items: Vec::with_capacity(capacity),
32            cap: capacity,
33        }
34    }
35
36    fn take(self: Pin<&mut Self>) -> Vec<St::Ok> {
37        let cap = self.cap;
38        mem::replace(self.project().items, Vec::with_capacity(cap))
39    }
40
41    delegate_access_inner!(stream, St, (. .));
42}
43
44type TryChunksStreamError<St> = TryChunksError<<St as TryStream>::Ok, <St as TryStream>::Error>;
45
46impl<St: TryStream> Stream for TryChunks<St> {
47    type Item = Result<Vec<St::Ok>, TryChunksStreamError<St>>;
48
49    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
50        let mut this = self.as_mut().project();
51        loop {
52            match ready!(this.stream.as_mut().try_poll_next(cx)) {
53                // Push the item into the buffer and check whether it is full.
54                // If so, replace our buffer with a new and empty one and return
55                // the full one.
56                Some(item) => match item {
57                    Ok(item) => {
58                        this.items.push(item);
59                        if this.items.len() >= *this.cap {
60                            return Poll::Ready(Some(Ok(self.take())));
61                        }
62                    }
63                    Err(e) => {
64                        return Poll::Ready(Some(Err(TryChunksError(self.take(), e))));
65                    }
66                },
67
68                // Since the underlying stream ran out of values, return what we
69                // have buffered, if we have anything.
70                None => {
71                    let last = if this.items.is_empty() {
72                        None
73                    } else {
74                        let full_buf = mem::take(this.items);
75                        Some(full_buf)
76                    };
77
78                    return Poll::Ready(last.map(Ok));
79                }
80            }
81        }
82    }
83
84    fn size_hint(&self) -> (usize, Option<usize>) {
85        let chunk_len = usize::from(!self.items.is_empty());
86        let (lower, upper) = self.stream.size_hint();
87        let lower = (lower / self.cap).saturating_add(chunk_len);
88        let upper = match upper {
89            Some(x) => x.checked_add(chunk_len),
90            None => None,
91        };
92        (lower, upper)
93    }
94}
95
96impl<St: TryStream + FusedStream> FusedStream for TryChunks<St> {
97    fn is_terminated(&self) -> bool {
98        self.stream.is_terminated() && self.items.is_empty()
99    }
100}
101
102// Forwarding impl of Sink from the underlying stream
103#[cfg(feature = "sink")]
104impl<S, Item> Sink<Item> for TryChunks<S>
105where
106    S: TryStream + Sink<Item>,
107{
108    type Error = <S as Sink<Item>>::Error;
109
110    delegate_sink!(stream, Item);
111}
112
113/// Error indicating, that while chunk was collected inner stream produced an error.
114///
115/// Contains all items that were collected before an error occurred, and the stream error itself.
116#[derive(PartialEq, Eq)]
117pub struct TryChunksError<T, E>(pub Vec<T>, pub E);
118
119impl<T, E: fmt::Debug> fmt::Debug for TryChunksError<T, E> {
120    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121        self.1.fmt(f)
122    }
123}
124
125impl<T, E: fmt::Display> fmt::Display for TryChunksError<T, E> {
126    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127        self.1.fmt(f)
128    }
129}
130
131#[cfg(feature = "std")]
132impl<T, E: fmt::Debug + fmt::Display> std::error::Error for TryChunksError<T, E> {}