async_stream/
async_stream.rs

1use crate::yielder::Receiver;
2
3use futures_core::{FusedStream, Stream};
4use pin_project_lite::pin_project;
5use std::future::Future;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9pin_project! {
10    #[doc(hidden)]
11    #[derive(Debug)]
12    pub struct AsyncStream<T, U> {
13        rx: Receiver<T>,
14        done: bool,
15        #[pin]
16        generator: U,
17    }
18}
19
20impl<T, U> AsyncStream<T, U> {
21    #[doc(hidden)]
22    pub fn new(rx: Receiver<T>, generator: U) -> AsyncStream<T, U> {
23        AsyncStream {
24            rx,
25            done: false,
26            generator,
27        }
28    }
29}
30
31impl<T, U> FusedStream for AsyncStream<T, U>
32where
33    U: Future<Output = ()>,
34{
35    fn is_terminated(&self) -> bool {
36        self.done
37    }
38}
39
40impl<T, U> Stream for AsyncStream<T, U>
41where
42    U: Future<Output = ()>,
43{
44    type Item = T;
45
46    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
47        let me = self.project();
48
49        if *me.done {
50            return Poll::Ready(None);
51        }
52
53        let mut dst = None;
54        let res = {
55            let _enter = me.rx.enter(&mut dst);
56            me.generator.poll(cx)
57        };
58
59        *me.done = res.is_ready();
60
61        if dst.is_some() {
62            return Poll::Ready(dst.take());
63        }
64
65        if *me.done {
66            Poll::Ready(None)
67        } else {
68            Poll::Pending
69        }
70    }
71
72    fn size_hint(&self) -> (usize, Option<usize>) {
73        if self.done {
74            (0, Some(0))
75        } else {
76            (0, None)
77        }
78    }
79}