async_stream/
lib.rs

1#![warn(
2    missing_debug_implementations,
3    missing_docs,
4    rust_2018_idioms,
5    unreachable_pub
6)]
7#![doc(test(no_crate_inject, attr(deny(rust_2018_idioms))))]
8
9//! Asynchronous stream of elements.
10//!
11//! Provides two macros, `stream!` and `try_stream!`, allowing the caller to
12//! define asynchronous streams of elements. These are implemented using `async`
13//! & `await` notation. This crate works without unstable features.
14//!
15//! The `stream!` macro returns an anonymous type implementing the [`Stream`]
16//! trait. The `Item` associated type is the type of the values yielded from the
17//! stream. The `try_stream!` also returns an anonymous type implementing the
18//! [`Stream`] trait, but the `Item` associated type is `Result<T, Error>`. The
19//! `try_stream!` macro supports using `?` notation as part of the
20//! implementation.
21//!
22//! # Usage
23//!
24//! A basic stream yielding numbers. Values are yielded using the `yield`
25//! keyword. The stream block must return `()`.
26//!
27//! ```rust
28//! use async_stream::stream;
29//!
30//! use futures_util::pin_mut;
31//! use futures_util::stream::StreamExt;
32//!
33//! #[tokio::main]
34//! async fn main() {
35//!     let s = stream! {
36//!         for i in 0..3 {
37//!             yield i;
38//!         }
39//!     };
40//!
41//!     pin_mut!(s); // needed for iteration
42//!
43//!     while let Some(value) = s.next().await {
44//!         println!("got {}", value);
45//!     }
46//! }
47//! ```
48//!
49//! Streams may be returned by using `impl Stream<Item = T>`:
50//!
51//! ```rust
52//! use async_stream::stream;
53//!
54//! use futures_core::stream::Stream;
55//! use futures_util::pin_mut;
56//! use futures_util::stream::StreamExt;
57//!
58//! fn zero_to_three() -> impl Stream<Item = u32> {
59//!     stream! {
60//!         for i in 0..3 {
61//!             yield i;
62//!         }
63//!     }
64//! }
65//!
66//! #[tokio::main]
67//! async fn main() {
68//!     let s = zero_to_three();
69//!     pin_mut!(s); // needed for iteration
70//!
71//!     while let Some(value) = s.next().await {
72//!         println!("got {}", value);
73//!     }
74//! }
75//! ```
76//!
77//! Streams may be implemented in terms of other streams - `async-stream` provides `for await`
78//! syntax to assist with this:
79//!
80//! ```rust
81//! use async_stream::stream;
82//!
83//! use futures_core::stream::Stream;
84//! use futures_util::pin_mut;
85//! use futures_util::stream::StreamExt;
86//!
87//! fn zero_to_three() -> impl Stream<Item = u32> {
88//!     stream! {
89//!         for i in 0..3 {
90//!             yield i;
91//!         }
92//!     }
93//! }
94//!
95//! fn double<S: Stream<Item = u32>>(input: S)
96//!     -> impl Stream<Item = u32>
97//! {
98//!     stream! {
99//!         for await value in input {
100//!             yield value * 2;
101//!         }
102//!     }
103//! }
104//!
105//! #[tokio::main]
106//! async fn main() {
107//!     let s = double(zero_to_three());
108//!     pin_mut!(s); // needed for iteration
109//!
110//!     while let Some(value) = s.next().await {
111//!         println!("got {}", value);
112//!     }
113//! }
114//! ```
115//!
116//! Rust try notation (`?`) can be used with the `try_stream!` macro. The `Item`
117//! of the returned stream is `Result` with `Ok` being the value yielded and
118//! `Err` the error type returned by `?`.
119//!
120//! ```rust
121//! use tokio::net::{TcpListener, TcpStream};
122//!
123//! use async_stream::try_stream;
124//! use futures_core::stream::Stream;
125//!
126//! use std::io;
127//! use std::net::SocketAddr;
128//!
129//! fn bind_and_accept(addr: SocketAddr)
130//!     -> impl Stream<Item = io::Result<TcpStream>>
131//! {
132//!     try_stream! {
133//!         let mut listener = TcpListener::bind(addr).await?;
134//!
135//!         loop {
136//!             let (stream, addr) = listener.accept().await?;
137//!             println!("received on {:?}", addr);
138//!             yield stream;
139//!         }
140//!     }
141//! }
142//! ```
143//!
144//! # Implementation
145//!
146//! The `stream!` and `try_stream!` macros are implemented using proc macros.
147//! The macro searches the syntax tree for instances of `yield $expr` and
148//! transforms them into `sender.send($expr).await`.
149//!
150//! The stream uses a lightweight sender to send values from the stream
151//! implementation to the caller. When entering the stream, an `Option<T>` is
152//! stored on the stack. A pointer to the cell is stored in a thread local and
153//! `poll` is called on the async block. When `poll` returns.
154//! `sender.send(value)` stores the value that cell and yields back to the
155//! caller.
156//!
157//! [`Stream`]: https://docs.rs/futures-core/*/futures_core/stream/trait.Stream.html
158
159mod async_stream;
160mod next;
161mod yielder;
162
163/// Asynchronous stream
164///
165/// See [crate](index.html) documentation for more details.
166///
167/// # Examples
168///
169/// ```
170/// use async_stream::stream;
171///
172/// use futures_util::pin_mut;
173/// use futures_util::stream::StreamExt;
174///
175/// #[tokio::main]
176/// async fn main() {
177///     let s = stream! {
178///         for i in 0..3 {
179///             yield i;
180///         }
181///     };
182///
183///     pin_mut!(s); // needed for iteration
184///
185///     while let Some(value) = s.next().await {
186///         println!("got {}", value);
187///     }
188/// }
189/// ```
190#[macro_export]
191macro_rules! stream {
192    ($($tt:tt)*) => {
193        $crate::__private::stream_inner!(($crate) $($tt)*)
194    }
195}
196
197/// Asynchronous fallible stream
198///
199/// See [crate](index.html) documentation for more details.
200///
201/// # Examples
202///
203/// ```
204/// use tokio::net::{TcpListener, TcpStream};
205///
206/// use async_stream::try_stream;
207/// use futures_core::stream::Stream;
208///
209/// use std::io;
210/// use std::net::SocketAddr;
211///
212/// fn bind_and_accept(addr: SocketAddr)
213///     -> impl Stream<Item = io::Result<TcpStream>>
214/// {
215///     try_stream! {
216///         let mut listener = TcpListener::bind(addr).await?;
217///
218///         loop {
219///             let (stream, addr) = listener.accept().await?;
220///             println!("received on {:?}", addr);
221///             yield stream;
222///         }
223///     }
224/// }
225/// ```
226#[macro_export]
227macro_rules! try_stream {
228    ($($tt:tt)*) => {
229        $crate::__private::try_stream_inner!(($crate) $($tt)*)
230    }
231}
232
233// Not public API.
234#[doc(hidden)]
235pub mod __private {
236    pub use crate::async_stream::AsyncStream;
237    pub use crate::next::next;
238    pub use async_stream_impl::{stream_inner, try_stream_inner};
239    pub mod yielder {
240        pub use crate::yielder::pair;
241    }
242}