futures_lite/
stream.rs

1//! Combinators for the [`Stream`] trait.
2//!
3//! # Examples
4//!
5//! ```
6//! use futures_lite::stream::{self, StreamExt};
7//!
8//! # spin_on::spin_on(async {
9//! let mut s = stream::iter(vec![1, 2, 3]);
10//!
11//! assert_eq!(s.next().await, Some(1));
12//! assert_eq!(s.next().await, Some(2));
13//! assert_eq!(s.next().await, Some(3));
14//! assert_eq!(s.next().await, None);
15//! # });
16//! ```
17
18#[cfg(feature = "alloc")]
19extern crate alloc;
20
21#[doc(no_inline)]
22pub use futures_core::stream::Stream;
23
24#[cfg(feature = "alloc")]
25use alloc::boxed::Box;
26
27use core::fmt;
28use core::future::Future;
29use core::marker::PhantomData;
30use core::mem;
31use core::pin::Pin;
32use core::task::{Context, Poll};
33
34use pin_project_lite::pin_project;
35
36use crate::ready;
37
38/// Converts a stream into a blocking iterator.
39///
40/// # Examples
41///
42/// ```
43/// use futures_lite::{pin, stream};
44///
45/// let stream = stream::once(7);
46/// pin!(stream);
47///
48/// let mut iter = stream::block_on(stream);
49/// assert_eq!(iter.next(), Some(7));
50/// assert_eq!(iter.next(), None);
51/// ```
52#[cfg(feature = "std")]
53pub fn block_on<S: Stream + Unpin>(stream: S) -> BlockOn<S> {
54    BlockOn(stream)
55}
56
57/// Iterator for the [`block_on()`] function.
58#[derive(Debug)]
59pub struct BlockOn<S>(S);
60
61#[cfg(feature = "std")]
62impl<S: Stream + Unpin> Iterator for BlockOn<S> {
63    type Item = S::Item;
64
65    fn next(&mut self) -> Option<Self::Item> {
66        crate::future::block_on(self.0.next())
67    }
68}
69
70/// Creates an empty stream.
71///
72/// # Examples
73///
74/// ```
75/// use futures_lite::stream::{self, StreamExt};
76///
77/// # spin_on::spin_on(async {
78/// let mut s = stream::empty::<i32>();
79/// assert_eq!(s.next().await, None);
80/// # })
81/// ```
82pub fn empty<T>() -> Empty<T> {
83    Empty {
84        _marker: PhantomData,
85    }
86}
87
88/// Stream for the [`empty()`] function.
89#[derive(Clone, Debug)]
90#[must_use = "streams do nothing unless polled"]
91pub struct Empty<T> {
92    _marker: PhantomData<T>,
93}
94
95impl<T> Unpin for Empty<T> {}
96
97impl<T> Stream for Empty<T> {
98    type Item = T;
99
100    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
101        Poll::Ready(None)
102    }
103
104    fn size_hint(&self) -> (usize, Option<usize>) {
105        (0, Some(0))
106    }
107}
108
109/// Creates a stream from an iterator.
110///
111/// # Examples
112///
113/// ```
114/// use futures_lite::stream::{self, StreamExt};
115///
116/// # spin_on::spin_on(async {
117/// let mut s = stream::iter(vec![1, 2]);
118///
119/// assert_eq!(s.next().await, Some(1));
120/// assert_eq!(s.next().await, Some(2));
121/// assert_eq!(s.next().await, None);
122/// # })
123/// ```
124pub fn iter<I: IntoIterator>(iter: I) -> Iter<I::IntoIter> {
125    Iter {
126        iter: iter.into_iter(),
127    }
128}
129
130/// Stream for the [`iter()`] function.
131#[derive(Clone, Debug)]
132#[must_use = "streams do nothing unless polled"]
133pub struct Iter<I> {
134    iter: I,
135}
136
137impl<I> Unpin for Iter<I> {}
138
139impl<I: Iterator> Stream for Iter<I> {
140    type Item = I::Item;
141
142    fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
143        Poll::Ready(self.iter.next())
144    }
145
146    fn size_hint(&self) -> (usize, Option<usize>) {
147        self.iter.size_hint()
148    }
149}
150
151/// Creates a stream that yields a single item.
152///
153/// # Examples
154///
155/// ```
156/// use futures_lite::stream::{self, StreamExt};
157///
158/// # spin_on::spin_on(async {
159/// let mut s = stream::once(7);
160///
161/// assert_eq!(s.next().await, Some(7));
162/// assert_eq!(s.next().await, None);
163/// # })
164/// ```
165pub fn once<T>(t: T) -> Once<T> {
166    Once { value: Some(t) }
167}
168
169pin_project! {
170    /// Stream for the [`once()`] function.
171    #[derive(Clone, Debug)]
172    #[must_use = "streams do nothing unless polled"]
173    pub struct Once<T> {
174        value: Option<T>,
175    }
176}
177
178impl<T> Stream for Once<T> {
179    type Item = T;
180
181    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
182        Poll::Ready(self.project().value.take())
183    }
184
185    fn size_hint(&self) -> (usize, Option<usize>) {
186        if self.value.is_some() {
187            (1, Some(1))
188        } else {
189            (0, Some(0))
190        }
191    }
192}
193
194/// Creates a stream that is always pending.
195///
196/// # Examples
197///
198/// ```no_run
199/// use futures_lite::stream::{self, StreamExt};
200///
201/// # spin_on::spin_on(async {
202/// let mut s = stream::pending::<i32>();
203/// s.next().await;
204/// unreachable!();
205/// # })
206/// ```
207pub fn pending<T>() -> Pending<T> {
208    Pending {
209        _marker: PhantomData,
210    }
211}
212
213/// Stream for the [`pending()`] function.
214#[derive(Clone, Debug)]
215#[must_use = "streams do nothing unless polled"]
216pub struct Pending<T> {
217    _marker: PhantomData<T>,
218}
219
220impl<T> Unpin for Pending<T> {}
221
222impl<T> Stream for Pending<T> {
223    type Item = T;
224
225    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
226        Poll::Pending
227    }
228
229    fn size_hint(&self) -> (usize, Option<usize>) {
230        (0, Some(0))
231    }
232}
233
234/// Creates a stream from a function returning [`Poll`].
235///
236/// # Examples
237///
238/// ```
239/// use futures_lite::stream::{self, StreamExt};
240/// use std::task::{Context, Poll};
241///
242/// # spin_on::spin_on(async {
243/// fn f(_: &mut Context<'_>) -> Poll<Option<i32>> {
244///     Poll::Ready(Some(7))
245/// }
246///
247/// assert_eq!(stream::poll_fn(f).next().await, Some(7));
248/// # })
249/// ```
250pub fn poll_fn<T, F>(f: F) -> PollFn<F>
251where
252    F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
253{
254    PollFn { f }
255}
256
257/// Stream for the [`poll_fn()`] function.
258#[derive(Clone)]
259#[must_use = "streams do nothing unless polled"]
260pub struct PollFn<F> {
261    f: F,
262}
263
264impl<F> Unpin for PollFn<F> {}
265
266impl<F> fmt::Debug for PollFn<F> {
267    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
268        f.debug_struct("PollFn").finish()
269    }
270}
271
272impl<T, F> Stream for PollFn<F>
273where
274    F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
275{
276    type Item = T;
277
278    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
279        (&mut self.f)(cx)
280    }
281}
282
283/// Creates an infinite stream that yields the same item repeatedly.
284///
285/// # Examples
286///
287/// ```
288/// use futures_lite::stream::{self, StreamExt};
289///
290/// # spin_on::spin_on(async {
291/// let mut s = stream::repeat(7);
292///
293/// assert_eq!(s.next().await, Some(7));
294/// assert_eq!(s.next().await, Some(7));
295/// # })
296/// ```
297pub fn repeat<T: Clone>(item: T) -> Repeat<T> {
298    Repeat { item }
299}
300
301/// Stream for the [`repeat()`] function.
302#[derive(Clone, Debug)]
303#[must_use = "streams do nothing unless polled"]
304pub struct Repeat<T> {
305    item: T,
306}
307
308impl<T> Unpin for Repeat<T> {}
309
310impl<T: Clone> Stream for Repeat<T> {
311    type Item = T;
312
313    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
314        Poll::Ready(Some(self.item.clone()))
315    }
316
317    fn size_hint(&self) -> (usize, Option<usize>) {
318        (usize::max_value(), None)
319    }
320}
321
322/// Creates an infinite stream from a closure that generates items.
323///
324/// # Examples
325///
326/// ```
327/// use futures_lite::stream::{self, StreamExt};
328///
329/// # spin_on::spin_on(async {
330/// let mut s = stream::repeat_with(|| 7);
331///
332/// assert_eq!(s.next().await, Some(7));
333/// assert_eq!(s.next().await, Some(7));
334/// # })
335/// ```
336pub fn repeat_with<T, F>(repeater: F) -> RepeatWith<F>
337where
338    F: FnMut() -> T,
339{
340    RepeatWith { f: repeater }
341}
342
343/// Stream for the [`repeat_with()`] function.
344#[derive(Clone, Debug)]
345#[must_use = "streams do nothing unless polled"]
346pub struct RepeatWith<F> {
347    f: F,
348}
349
350impl<F> Unpin for RepeatWith<F> {}
351
352impl<T, F> Stream for RepeatWith<F>
353where
354    F: FnMut() -> T,
355{
356    type Item = T;
357
358    fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
359        let item = (&mut self.f)();
360        Poll::Ready(Some(item))
361    }
362
363    fn size_hint(&self) -> (usize, Option<usize>) {
364        (usize::max_value(), None)
365    }
366}
367
368/// Creates a stream from a seed value and an async closure operating on it.
369///
370/// # Examples
371///
372/// ```
373/// use futures_lite::stream::{self, StreamExt};
374///
375/// # spin_on::spin_on(async {
376/// let s = stream::unfold(0, |mut n| async move {
377///     if n < 2 {
378///         let m = n + 1;
379///         Some((n, m))
380///     } else {
381///         None
382///     }
383/// });
384///
385/// let v: Vec<i32> = s.collect().await;
386/// assert_eq!(v, [0, 1]);
387/// # })
388/// ```
389pub fn unfold<T, F, Fut, Item>(seed: T, f: F) -> Unfold<T, F, Fut>
390where
391    F: FnMut(T) -> Fut,
392    Fut: Future<Output = Option<(Item, T)>>,
393{
394    Unfold {
395        f,
396        state: Some(seed),
397        fut: None,
398    }
399}
400
401pin_project! {
402    /// Stream for the [`unfold()`] function.
403    #[derive(Clone)]
404    #[must_use = "streams do nothing unless polled"]
405    pub struct Unfold<T, F, Fut> {
406        f: F,
407        state: Option<T>,
408        #[pin]
409        fut: Option<Fut>,
410    }
411}
412
413impl<T, F, Fut> fmt::Debug for Unfold<T, F, Fut>
414where
415    T: fmt::Debug,
416    Fut: fmt::Debug,
417{
418    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
419        f.debug_struct("Unfold")
420            .field("state", &self.state)
421            .field("fut", &self.fut)
422            .finish()
423    }
424}
425
426impl<T, F, Fut, Item> Stream for Unfold<T, F, Fut>
427where
428    F: FnMut(T) -> Fut,
429    Fut: Future<Output = Option<(Item, T)>>,
430{
431    type Item = Item;
432
433    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
434        let mut this = self.project();
435
436        if let Some(state) = this.state.take() {
437            this.fut.set(Some((this.f)(state)));
438        }
439
440        let step = ready!(this
441            .fut
442            .as_mut()
443            .as_pin_mut()
444            .expect("`Unfold` must not be polled after it returned `Poll::Ready(None)`")
445            .poll(cx));
446        this.fut.set(None);
447
448        if let Some((item, next_state)) = step {
449            *this.state = Some(next_state);
450            Poll::Ready(Some(item))
451        } else {
452            Poll::Ready(None)
453        }
454    }
455}
456
457/// Creates a stream from a seed value and a fallible async closure operating on it.
458///
459/// # Examples
460///
461/// ```
462/// use futures_lite::stream::{self, StreamExt};
463///
464/// # spin_on::spin_on(async {
465/// let s = stream::try_unfold(0, |mut n| async move {
466///     if n < 2 {
467///         let m = n + 1;
468///         Ok(Some((n, m)))
469///     } else {
470///         std::io::Result::Ok(None)
471///     }
472/// });
473///
474/// let v: Vec<i32> = s.try_collect().await?;
475/// assert_eq!(v, [0, 1]);
476/// # std::io::Result::Ok(()) });
477/// ```
478pub fn try_unfold<T, E, F, Fut, Item>(init: T, f: F) -> TryUnfold<T, F, Fut>
479where
480    F: FnMut(T) -> Fut,
481    Fut: Future<Output = Result<Option<(Item, T)>, E>>,
482{
483    TryUnfold {
484        f,
485        state: Some(init),
486        fut: None,
487    }
488}
489
490pin_project! {
491    /// Stream for the [`try_unfold()`] function.
492    #[derive(Clone)]
493    #[must_use = "streams do nothing unless polled"]
494    pub struct TryUnfold<T, F, Fut> {
495        f: F,
496        state: Option<T>,
497        #[pin]
498        fut: Option<Fut>,
499    }
500}
501
502impl<T, F, Fut> fmt::Debug for TryUnfold<T, F, Fut>
503where
504    T: fmt::Debug,
505    Fut: fmt::Debug,
506{
507    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
508        f.debug_struct("TryUnfold")
509            .field("state", &self.state)
510            .field("fut", &self.fut)
511            .finish()
512    }
513}
514
515impl<T, E, F, Fut, Item> Stream for TryUnfold<T, F, Fut>
516where
517    F: FnMut(T) -> Fut,
518    Fut: Future<Output = Result<Option<(Item, T)>, E>>,
519{
520    type Item = Result<Item, E>;
521
522    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
523        let mut this = self.project();
524
525        if let Some(state) = this.state.take() {
526            this.fut.set(Some((this.f)(state)));
527        }
528
529        match this.fut.as_mut().as_pin_mut() {
530            None => {
531                // The future previously errored
532                Poll::Ready(None)
533            }
534            Some(future) => {
535                let step = ready!(future.poll(cx));
536                this.fut.set(None);
537
538                match step {
539                    Ok(Some((item, next_state))) => {
540                        *this.state = Some(next_state);
541                        Poll::Ready(Some(Ok(item)))
542                    }
543                    Ok(None) => Poll::Ready(None),
544                    Err(e) => Poll::Ready(Some(Err(e))),
545                }
546            }
547        }
548    }
549}
550
551/// Extension trait for [`Stream`].
552pub trait StreamExt: Stream {
553    /// A convenience for calling [`Stream::poll_next()`] on `!`[`Unpin`] types.
554    fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
555    where
556        Self: Unpin,
557    {
558        Stream::poll_next(Pin::new(self), cx)
559    }
560
561    /// Retrieves the next item in the stream.
562    ///
563    /// Returns [`None`] when iteration is finished. Stream implementations may choose to or not to
564    /// resume iteration after that.
565    ///
566    /// # Examples
567    ///
568    /// ```
569    /// use futures_lite::stream::{self, StreamExt};
570    ///
571    /// # spin_on::spin_on(async {
572    /// let mut s = stream::iter(1..=3);
573    ///
574    /// assert_eq!(s.next().await, Some(1));
575    /// assert_eq!(s.next().await, Some(2));
576    /// assert_eq!(s.next().await, Some(3));
577    /// assert_eq!(s.next().await, None);
578    /// # });
579    /// ```
580    fn next(&mut self) -> NextFuture<'_, Self>
581    where
582        Self: Unpin,
583    {
584        NextFuture { stream: self }
585    }
586
587    /// Retrieves the next item in the stream.
588    ///
589    /// This is similar to the [`next()`][`StreamExt::next()`] method, but returns
590    /// `Result<Option<T>, E>` rather than `Option<Result<T, E>>`.
591    ///
592    /// Note that `s.try_next().await` is equivalent to `s.next().await.transpose()`.
593    ///
594    /// # Examples
595    ///
596    /// ```
597    /// use futures_lite::stream::{self, StreamExt};
598    ///
599    /// # spin_on::spin_on(async {
600    /// let mut s = stream::iter(vec![Ok(1), Ok(2), Err("error")]);
601    ///
602    /// assert_eq!(s.try_next().await, Ok(Some(1)));
603    /// assert_eq!(s.try_next().await, Ok(Some(2)));
604    /// assert_eq!(s.try_next().await, Err("error"));
605    /// assert_eq!(s.try_next().await, Ok(None));
606    /// # });
607    /// ```
608    fn try_next<T, E>(&mut self) -> TryNextFuture<'_, Self>
609    where
610        Self: Stream<Item = Result<T, E>> + Unpin,
611    {
612        TryNextFuture { stream: self }
613    }
614
615    /// Counts the number of items in the stream.
616    ///
617    /// # Examples
618    ///
619    /// ```
620    /// use futures_lite::stream::{self, StreamExt};
621    ///
622    /// # spin_on::spin_on(async {
623    /// let s1 = stream::iter(vec![0]);
624    /// let s2 = stream::iter(vec![1, 2, 3]);
625    ///
626    /// assert_eq!(s1.count().await, 1);
627    /// assert_eq!(s2.count().await, 3);
628    /// # });
629    /// ```
630    fn count(self) -> CountFuture<Self>
631    where
632        Self: Sized,
633    {
634        CountFuture {
635            stream: self,
636            count: 0,
637        }
638    }
639
640    /// Maps items of the stream to new values using a closure.
641    ///
642    /// # Examples
643    ///
644    /// ```
645    /// use futures_lite::stream::{self, StreamExt};
646    ///
647    /// # spin_on::spin_on(async {
648    /// let s = stream::iter(vec![1, 2, 3]);
649    /// let mut s = s.map(|x| 2 * x);
650    ///
651    /// assert_eq!(s.next().await, Some(2));
652    /// assert_eq!(s.next().await, Some(4));
653    /// assert_eq!(s.next().await, Some(6));
654    /// assert_eq!(s.next().await, None);
655    /// # });
656    /// ```
657    fn map<T, F>(self, f: F) -> Map<Self, F>
658    where
659        Self: Sized,
660        F: FnMut(Self::Item) -> T,
661    {
662        Map { stream: self, f }
663    }
664
665    /// Maps items to streams and then concatenates them.
666    ///
667    /// # Examples
668    ///
669    /// ```
670    /// use futures_lite::stream::{self, StreamExt};
671    ///
672    /// # spin_on::spin_on(async {
673    /// let words = stream::iter(vec!["one", "two"]);
674    ///
675    /// let s: String = words
676    ///     .flat_map(|s| stream::iter(s.chars()))
677    ///     .collect()
678    ///     .await;
679    ///
680    /// assert_eq!(s, "onetwo");
681    /// # });
682    /// ```
683    fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
684    where
685        Self: Sized,
686        U: Stream,
687        F: FnMut(Self::Item) -> U,
688    {
689        FlatMap {
690            stream: self.map(f),
691            inner_stream: None,
692        }
693    }
694
695    /// Concatenates inner streams.
696    ///
697    /// # Examples
698    ///
699    /// ```
700    /// use futures_lite::stream::{self, StreamExt};
701    ///
702    /// # spin_on::spin_on(async {
703    /// let s1 = stream::iter(vec![1, 2, 3]);
704    /// let s2 = stream::iter(vec![4, 5]);
705    ///
706    /// let s = stream::iter(vec![s1, s2]);
707    /// let v: Vec<_> = s.flatten().collect().await;
708    /// assert_eq!(v, [1, 2, 3, 4, 5]);
709    /// # });
710    /// ```
711    fn flatten(self) -> Flatten<Self>
712    where
713        Self: Sized,
714        Self::Item: Stream,
715    {
716        Flatten {
717            stream: self,
718            inner_stream: None,
719        }
720    }
721
722    /// Maps items of the stream to new values using an async closure.
723    ///
724    /// # Examples
725    ///
726    /// ```
727    /// use futures_lite::pin;
728    /// use futures_lite::stream::{self, StreamExt};
729    ///
730    /// # spin_on::spin_on(async {
731    /// let s = stream::iter(vec![1, 2, 3]);
732    /// let mut s = s.then(|x| async move { 2 * x });
733    ///
734    /// pin!(s);
735    /// assert_eq!(s.next().await, Some(2));
736    /// assert_eq!(s.next().await, Some(4));
737    /// assert_eq!(s.next().await, Some(6));
738    /// assert_eq!(s.next().await, None);
739    /// # });
740    /// ```
741    fn then<F, Fut>(self, f: F) -> Then<Self, F, Fut>
742    where
743        Self: Sized,
744        F: FnMut(Self::Item) -> Fut,
745        Fut: Future,
746    {
747        Then {
748            stream: self,
749            future: None,
750            f,
751        }
752    }
753
754    /// Keeps items of the stream for which `predicate` returns `true`.
755    ///
756    /// # Examples
757    ///
758    /// ```
759    /// use futures_lite::stream::{self, StreamExt};
760    ///
761    /// # spin_on::spin_on(async {
762    /// let s = stream::iter(vec![1, 2, 3, 4]);
763    /// let mut s = s.filter(|i| i % 2 == 0);
764    ///
765    /// assert_eq!(s.next().await, Some(2));
766    /// assert_eq!(s.next().await, Some(4));
767    /// assert_eq!(s.next().await, None);
768    /// # });
769    /// ```
770    fn filter<P>(self, predicate: P) -> Filter<Self, P>
771    where
772        Self: Sized,
773        P: FnMut(&Self::Item) -> bool,
774    {
775        Filter {
776            stream: self,
777            predicate,
778        }
779    }
780
781    /// Filters and maps items of the stream using a closure.
782    ///
783    /// # Examples
784    ///
785    /// ```
786    /// use futures_lite::stream::{self, StreamExt};
787    ///
788    /// # spin_on::spin_on(async {
789    /// let s = stream::iter(vec!["1", "lol", "3", "NaN", "5"]);
790    /// let mut s = s.filter_map(|a| a.parse::<u32>().ok());
791    ///
792    /// assert_eq!(s.next().await, Some(1));
793    /// assert_eq!(s.next().await, Some(3));
794    /// assert_eq!(s.next().await, Some(5));
795    /// assert_eq!(s.next().await, None);
796    /// # });
797    /// ```
798    fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
799    where
800        Self: Sized,
801        F: FnMut(Self::Item) -> Option<T>,
802    {
803        FilterMap { stream: self, f }
804    }
805
806    /// Takes only the first `n` items of the stream.
807    ///
808    /// # Examples
809    ///
810    /// ```
811    /// use futures_lite::stream::{self, StreamExt};
812    ///
813    /// # spin_on::spin_on(async {
814    /// let mut s = stream::repeat(7).take(2);
815    ///
816    /// assert_eq!(s.next().await, Some(7));
817    /// assert_eq!(s.next().await, Some(7));
818    /// assert_eq!(s.next().await, None);
819    /// # });
820    /// ```
821    fn take(self, n: usize) -> Take<Self>
822    where
823        Self: Sized,
824    {
825        Take { stream: self, n }
826    }
827
828    /// Takes items while `predicate` returns `true`.
829    ///
830    /// # Examples
831    ///
832    /// ```
833    /// use futures_lite::stream::{self, StreamExt};
834    ///
835    /// # spin_on::spin_on(async {
836    /// let s = stream::iter(vec![1, 2, 3, 4]);
837    /// let mut s = s.take_while(|x| *x < 3);
838    ///
839    /// assert_eq!(s.next().await, Some(1));
840    /// assert_eq!(s.next().await, Some(2));
841    /// assert_eq!(s.next().await, None);
842    /// # });
843    /// ```
844    fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
845    where
846        Self: Sized,
847        P: FnMut(&Self::Item) -> bool,
848    {
849        TakeWhile {
850            stream: self,
851            predicate,
852        }
853    }
854
855    /// Skips the first `n` items of the stream.
856    ///
857    /// # Examples
858    ///
859    /// ```
860    /// use futures_lite::stream::{self, StreamExt};
861    ///
862    /// # spin_on::spin_on(async {
863    /// let s = stream::iter(vec![1, 2, 3]);
864    /// let mut s = s.skip(2);
865    ///
866    /// assert_eq!(s.next().await, Some(3));
867    /// assert_eq!(s.next().await, None);
868    /// # });
869    /// ```
870    fn skip(self, n: usize) -> Skip<Self>
871    where
872        Self: Sized,
873    {
874        Skip { stream: self, n }
875    }
876
877    /// Skips items while `predicate` returns `true`.
878    ///
879    /// # Examples
880    ///
881    /// ```
882    /// use futures_lite::stream::{self, StreamExt};
883    ///
884    /// # spin_on::spin_on(async {
885    /// let s = stream::iter(vec![-1i32, 0, 1]);
886    /// let mut s = s.skip_while(|x| x.is_negative());
887    ///
888    /// assert_eq!(s.next().await, Some(0));
889    /// assert_eq!(s.next().await, Some(1));
890    /// assert_eq!(s.next().await, None);
891    /// # });
892    /// ```
893    fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
894    where
895        Self: Sized,
896        P: FnMut(&Self::Item) -> bool,
897    {
898        SkipWhile {
899            stream: self,
900            predicate: Some(predicate),
901        }
902    }
903
904    /// Yields every `step`th item.
905    ///
906    /// # Panics
907    ///
908    /// This method will panic if the `step` is 0.
909    ///
910    /// # Examples
911    ///
912    /// ```
913    /// use futures_lite::stream::{self, StreamExt};
914    ///
915    /// # spin_on::spin_on(async {
916    /// let s = stream::iter(vec![0, 1, 2, 3, 4]);
917    /// let mut s = s.step_by(2);
918    ///
919    /// assert_eq!(s.next().await, Some(0));
920    /// assert_eq!(s.next().await, Some(2));
921    /// assert_eq!(s.next().await, Some(4));
922    /// assert_eq!(s.next().await, None);
923    /// # });
924    /// ```
925    fn step_by(self, step: usize) -> StepBy<Self>
926    where
927        Self: Sized,
928    {
929        assert!(step > 0, "`step` must be greater than zero");
930        StepBy {
931            stream: self,
932            step,
933            i: 0,
934        }
935    }
936
937    /// Appends another stream to the end of this one.
938    ///
939    /// # Examples
940    ///
941    /// ```
942    /// use futures_lite::stream::{self, StreamExt};
943    ///
944    /// # spin_on::spin_on(async {
945    /// let s1 = stream::iter(vec![1, 2]);
946    /// let s2 = stream::iter(vec![7, 8]);
947    /// let mut s = s1.chain(s2);
948    ///
949    /// assert_eq!(s.next().await, Some(1));
950    /// assert_eq!(s.next().await, Some(2));
951    /// assert_eq!(s.next().await, Some(7));
952    /// assert_eq!(s.next().await, Some(8));
953    /// assert_eq!(s.next().await, None);
954    /// # });
955    /// ```
956    fn chain<U>(self, other: U) -> Chain<Self, U>
957    where
958        Self: Sized,
959        U: Stream<Item = Self::Item> + Sized,
960    {
961        Chain {
962            first: self.fuse(),
963            second: other.fuse(),
964        }
965    }
966
967    /// Clones all items.
968    ///
969    /// # Examples
970    ///
971    /// ```
972    /// use futures_lite::stream::{self, StreamExt};
973    ///
974    /// # spin_on::spin_on(async {
975    /// let s = stream::iter(vec![&1, &2]);
976    /// let mut s = s.cloned();
977    ///
978    /// assert_eq!(s.next().await, Some(1));
979    /// assert_eq!(s.next().await, Some(2));
980    /// assert_eq!(s.next().await, None);
981    /// # });
982    /// ```
983    fn cloned<'a, T>(self) -> Cloned<Self>
984    where
985        Self: Stream<Item = &'a T> + Sized,
986        T: Clone + 'a,
987    {
988        Cloned { stream: self }
989    }
990
991    /// Copies all items.
992    ///
993    /// # Examples
994    ///
995    /// ```
996    /// use futures_lite::stream::{self, StreamExt};
997    ///
998    /// # spin_on::spin_on(async {
999    /// let s = stream::iter(vec![&1, &2]);
1000    /// let mut s = s.copied();
1001    ///
1002    /// assert_eq!(s.next().await, Some(1));
1003    /// assert_eq!(s.next().await, Some(2));
1004    /// assert_eq!(s.next().await, None);
1005    /// # });
1006    /// ```
1007    fn copied<'a, T>(self) -> Copied<Self>
1008    where
1009        Self: Stream<Item = &'a T> + Sized,
1010        T: Copy + 'a,
1011    {
1012        Copied { stream: self }
1013    }
1014
1015    /// Collects all items in the stream into a collection.
1016    ///
1017    /// # Examples
1018    ///
1019    /// ```
1020    /// use futures_lite::stream::{self, StreamExt};
1021    ///
1022    /// # spin_on::spin_on(async {
1023    /// let mut s = stream::iter(1..=3);
1024    ///
1025    /// let items: Vec<_> = s.collect().await;
1026    /// assert_eq!(items, [1, 2, 3]);
1027    /// # });
1028    /// ```
1029    fn collect<C>(self) -> CollectFuture<Self, C>
1030    where
1031        Self: Sized,
1032        C: Default + Extend<Self::Item>,
1033    {
1034        CollectFuture {
1035            stream: self,
1036            collection: Default::default(),
1037        }
1038    }
1039
1040    /// Collects all items in the fallible stream into a collection.
1041    ///
1042    /// ```
1043    /// use futures_lite::stream::{self, StreamExt};
1044    ///
1045    /// # spin_on::spin_on(async {
1046    /// let s = stream::iter(vec![Ok(1), Err(2), Ok(3)]);
1047    /// let res: Result<Vec<i32>, i32> = s.try_collect().await;
1048    /// assert_eq!(res, Err(2));
1049    ///
1050    /// let s = stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
1051    /// let res: Result<Vec<i32>, i32> = s.try_collect().await;
1052    /// assert_eq!(res, Ok(vec![1, 2, 3]));
1053    /// # })
1054    /// ```
1055    fn try_collect<T, E, C>(self) -> TryCollectFuture<Self, C>
1056    where
1057        Self: Stream<Item = Result<T, E>> + Sized,
1058        C: Default + Extend<T>,
1059    {
1060        TryCollectFuture {
1061            stream: self,
1062            items: Default::default(),
1063        }
1064    }
1065
1066    /// Partitions items into those for which `predicate` is `true` and those for which it is
1067    /// `false`, and then collects them into two collections.
1068    ///
1069    /// # Examples
1070    ///
1071    /// ```
1072    /// use futures_lite::stream::{self, StreamExt};
1073    ///
1074    /// # spin_on::spin_on(async {
1075    /// let s = stream::iter(vec![1, 2, 3]);
1076    /// let (even, odd): (Vec<_>, Vec<_>) = s.partition(|&n| n % 2 == 0).await;
1077    ///
1078    /// assert_eq!(even, &[2]);
1079    /// assert_eq!(odd, &[1, 3]);
1080    /// # })
1081    /// ```
1082    fn partition<B, P>(self, predicate: P) -> PartitionFuture<Self, P, B>
1083    where
1084        Self: Sized,
1085        B: Default + Extend<Self::Item>,
1086        P: FnMut(&Self::Item) -> bool,
1087    {
1088        PartitionFuture {
1089            stream: self,
1090            predicate,
1091            res: Some(Default::default()),
1092        }
1093    }
1094
1095    /// Accumulates a computation over the stream.
1096    ///
1097    /// The computation begins with the accumulator value set to `init`, and then applies `f` to
1098    /// the accumulator and each item in the stream. The final accumulator value is returned.
1099    ///
1100    /// # Examples
1101    ///
1102    /// ```
1103    /// use futures_lite::stream::{self, StreamExt};
1104    ///
1105    /// # spin_on::spin_on(async {
1106    /// let s = stream::iter(vec![1, 2, 3]);
1107    /// let sum = s.fold(0, |acc, x| acc + x).await;
1108    ///
1109    /// assert_eq!(sum, 6);
1110    /// # })
1111    /// ```
1112    fn fold<T, F>(self, init: T, f: F) -> FoldFuture<Self, F, T>
1113    where
1114        Self: Sized,
1115        F: FnMut(T, Self::Item) -> T,
1116    {
1117        FoldFuture {
1118            stream: self,
1119            f,
1120            acc: Some(init),
1121        }
1122    }
1123
1124    /// Accumulates a fallible computation over the stream.
1125    ///
1126    /// The computation begins with the accumulator value set to `init`, and then applies `f` to
1127    /// the accumulator and each item in the stream. The final accumulator value is returned, or an
1128    /// error if `f` failed the computation.
1129    ///
1130    /// # Examples
1131    ///
1132    /// ```
1133    /// use futures_lite::stream::{self, StreamExt};
1134    ///
1135    /// # spin_on::spin_on(async {
1136    /// let mut s = stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
1137    ///
1138    /// let sum = s.try_fold(0, |acc, v| {
1139    ///     if (acc + v) % 2 == 1 {
1140    ///         Ok(acc + v)
1141    ///     } else {
1142    ///         Err("fail")
1143    ///     }
1144    /// })
1145    /// .await;
1146    ///
1147    /// assert_eq!(sum, Err("fail"));
1148    /// # })
1149    /// ```
1150    fn try_fold<T, E, F, B>(&mut self, init: B, f: F) -> TryFoldFuture<'_, Self, F, B>
1151    where
1152        Self: Stream<Item = Result<T, E>> + Unpin + Sized,
1153        F: FnMut(B, T) -> Result<B, E>,
1154    {
1155        TryFoldFuture {
1156            stream: self,
1157            f,
1158            acc: Some(init),
1159        }
1160    }
1161
1162    /// Maps items of the stream to new values using a state value and a closure.
1163    ///
1164    /// Scanning begins with the inital state set to `initial_state`, and then applies `f` to the
1165    /// state and each item in the stream. The stream stops when `f` returns `None`.
1166    ///
1167    /// # Examples
1168    ///
1169    /// ```
1170    /// use futures_lite::stream::{self, StreamExt};
1171    ///
1172    /// # spin_on::spin_on(async {
1173    /// let s = stream::iter(vec![1, 2, 3]);
1174    /// let mut s = s.scan(1, |state, x| {
1175    ///     *state = *state * x;
1176    ///     Some(-*state)
1177    /// });
1178    ///
1179    /// assert_eq!(s.next().await, Some(-1));
1180    /// assert_eq!(s.next().await, Some(-2));
1181    /// assert_eq!(s.next().await, Some(-6));
1182    /// assert_eq!(s.next().await, None);
1183    /// # })
1184    /// ```
1185    fn scan<St, B, F>(self, initial_state: St, f: F) -> Scan<Self, St, F>
1186    where
1187        Self: Sized,
1188        F: FnMut(&mut St, Self::Item) -> Option<B>,
1189    {
1190        Scan {
1191            stream: self,
1192            state_f: (initial_state, f),
1193        }
1194    }
1195
1196    /// Fuses the stream so that it stops yielding items after the first [`None`].
1197    ///
1198    /// # Examples
1199    ///
1200    /// ```
1201    /// use futures_lite::stream::{self, StreamExt};
1202    ///
1203    /// # spin_on::spin_on(async {
1204    /// let mut s = stream::once(1).fuse();
1205    ///
1206    /// assert_eq!(s.next().await, Some(1));
1207    /// assert_eq!(s.next().await, None);
1208    /// assert_eq!(s.next().await, None);
1209    /// # })
1210    /// ```
1211    fn fuse(self) -> Fuse<Self>
1212    where
1213        Self: Sized,
1214    {
1215        Fuse {
1216            stream: self,
1217            done: false,
1218        }
1219    }
1220
1221    /// Repeats the stream from beginning to end, forever.
1222    ///
1223    /// # Examples
1224    ///
1225    /// ```
1226    /// use futures_lite::stream::{self, StreamExt};
1227    ///
1228    /// # spin_on::spin_on(async {
1229    /// let mut s = stream::iter(vec![1, 2]).cycle();
1230    ///
1231    /// assert_eq!(s.next().await, Some(1));
1232    /// assert_eq!(s.next().await, Some(2));
1233    /// assert_eq!(s.next().await, Some(1));
1234    /// assert_eq!(s.next().await, Some(2));
1235    /// # });
1236    /// ```
1237    fn cycle(self) -> Cycle<Self>
1238    where
1239        Self: Clone + Sized,
1240    {
1241        Cycle {
1242            orig: self.clone(),
1243            stream: self,
1244        }
1245    }
1246
1247    /// Enumerates items, mapping them to `(index, item)`.
1248    ///
1249    /// # Examples
1250    ///
1251    /// ```
1252    /// use futures_lite::stream::{self, StreamExt};
1253    ///
1254    /// # spin_on::spin_on(async {
1255    /// let s = stream::iter(vec!['a', 'b', 'c']);
1256    /// let mut s = s.enumerate();
1257    ///
1258    /// assert_eq!(s.next().await, Some((0, 'a')));
1259    /// assert_eq!(s.next().await, Some((1, 'b')));
1260    /// assert_eq!(s.next().await, Some((2, 'c')));
1261    /// assert_eq!(s.next().await, None);
1262    /// # });
1263    /// ```
1264    fn enumerate(self) -> Enumerate<Self>
1265    where
1266        Self: Sized,
1267    {
1268        Enumerate { stream: self, i: 0 }
1269    }
1270
1271    /// Calls a closure on each item and passes it on.
1272    ///
1273    /// # Examples
1274    ///
1275    /// ```
1276    /// use futures_lite::stream::{self, StreamExt};
1277    ///
1278    /// # spin_on::spin_on(async {
1279    /// let s = stream::iter(vec![1, 2, 3, 4, 5]);
1280    ///
1281    /// let sum = s
1282    ///    .inspect(|x| println!("about to filter {}", x))
1283    ///    .filter(|x| x % 2 == 0)
1284    ///    .inspect(|x| println!("made it through filter: {}", x))
1285    ///    .fold(0, |sum, i| sum + i)
1286    ///    .await;
1287    /// # });
1288    /// ```
1289    fn inspect<F>(self, f: F) -> Inspect<Self, F>
1290    where
1291        Self: Sized,
1292        F: FnMut(&Self::Item),
1293    {
1294        Inspect { stream: self, f }
1295    }
1296
1297    /// Gets the `n`th item of the stream.
1298    ///
1299    /// In the end, `n+1` items of the stream will be consumed.
1300    ///
1301    /// # Examples
1302    ///
1303    /// ```
1304    /// use futures_lite::stream::{self, StreamExt};
1305    ///
1306    /// # spin_on::spin_on(async {
1307    /// let mut s = stream::iter(vec![0, 1, 2, 3, 4, 5, 6, 7]);
1308    ///
1309    /// assert_eq!(s.nth(2).await, Some(2));
1310    /// assert_eq!(s.nth(2).await, Some(5));
1311    /// assert_eq!(s.nth(2).await, None);
1312    /// # });
1313    /// ```
1314    fn nth(&mut self, n: usize) -> NthFuture<'_, Self>
1315    where
1316        Self: Unpin,
1317    {
1318        NthFuture { stream: self, n }
1319    }
1320
1321    /// Returns the last item in the stream.
1322    ///
1323    /// # Examples
1324    ///
1325    /// ```
1326    /// use futures_lite::stream::{self, StreamExt};
1327    ///
1328    /// # spin_on::spin_on(async {
1329    /// let s = stream::iter(vec![1, 2, 3, 4]);
1330    /// assert_eq!(s.last().await, Some(4));
1331    ///
1332    /// let s = stream::empty::<i32>();
1333    /// assert_eq!(s.last().await, None);
1334    /// # });
1335    /// ```
1336    fn last(self) -> LastFuture<Self>
1337    where
1338        Self: Sized,
1339    {
1340        LastFuture {
1341            stream: self,
1342            last: None,
1343        }
1344    }
1345
1346    /// Finds the first item of the stream for which `predicate` returns `true`.
1347    ///
1348    /// # Examples
1349    ///
1350    /// ```
1351    /// use futures_lite::stream::{self, StreamExt};
1352    ///
1353    /// # spin_on::spin_on(async {
1354    /// let mut s = stream::iter(vec![11, 12, 13, 14]);
1355    ///
1356    /// assert_eq!(s.find(|x| *x % 2 == 0).await, Some(12));
1357    /// assert_eq!(s.next().await, Some(13));
1358    /// # });
1359    /// ```
1360    fn find<P>(&mut self, predicate: P) -> FindFuture<'_, Self, P>
1361    where
1362        Self: Unpin,
1363        P: FnMut(&Self::Item) -> bool,
1364    {
1365        FindFuture {
1366            stream: self,
1367            predicate,
1368        }
1369    }
1370
1371    /// Applies a closure to items in the stream and returns the first [`Some`] result.
1372    ///
1373    /// # Examples
1374    ///
1375    /// ```
1376    /// use futures_lite::stream::{self, StreamExt};
1377    ///
1378    /// # spin_on::spin_on(async {
1379    /// let mut s = stream::iter(vec!["lol", "NaN", "2", "5"]);
1380    /// let number = s.find_map(|s| s.parse().ok()).await;
1381    ///
1382    /// assert_eq!(number, Some(2));
1383    /// # });
1384    /// ```
1385    fn find_map<F, B>(&mut self, f: F) -> FindMapFuture<'_, Self, F>
1386    where
1387        Self: Unpin,
1388        F: FnMut(Self::Item) -> Option<B>,
1389    {
1390        FindMapFuture { stream: self, f }
1391    }
1392
1393    /// Finds the index of the first item of the stream for which `predicate` returns `true`.
1394    ///
1395    /// # Examples
1396    ///
1397    /// ```
1398    /// use futures_lite::stream::{self, StreamExt};
1399    ///
1400    /// # spin_on::spin_on(async {
1401    /// let mut s = stream::iter(vec![0, 1, 2, 3, 4, 5]);
1402    ///
1403    /// assert_eq!(s.position(|x| x == 2).await, Some(2));
1404    /// assert_eq!(s.position(|x| x == 3).await, Some(0));
1405    /// assert_eq!(s.position(|x| x == 9).await, None);
1406    /// # });
1407    /// ```
1408    fn position<P>(&mut self, predicate: P) -> PositionFuture<'_, Self, P>
1409    where
1410        Self: Unpin,
1411        P: FnMut(Self::Item) -> bool,
1412    {
1413        PositionFuture {
1414            stream: self,
1415            predicate,
1416            index: 0,
1417        }
1418    }
1419
1420    /// Tests if `predicate` returns `true` for all items in the stream.
1421    ///
1422    /// The result is `true` for an empty stream.
1423    ///
1424    /// # Examples
1425    ///
1426    /// ```
1427    /// use futures_lite::stream::{self, StreamExt};
1428    ///
1429    /// # spin_on::spin_on(async {
1430    /// let mut s = stream::iter(vec![1, 2, 3]);
1431    /// assert!(!s.all(|x| x % 2 == 0).await);
1432    ///
1433    /// let mut s = stream::iter(vec![2, 4, 6, 8]);
1434    /// assert!(s.all(|x| x % 2 == 0).await);
1435    ///
1436    /// let mut s = stream::empty::<i32>();
1437    /// assert!(s.all(|x| x % 2 == 0).await);
1438    /// # });
1439    /// ```
1440    fn all<P>(&mut self, predicate: P) -> AllFuture<'_, Self, P>
1441    where
1442        Self: Unpin,
1443        P: FnMut(Self::Item) -> bool,
1444    {
1445        AllFuture {
1446            stream: self,
1447            predicate,
1448        }
1449    }
1450
1451    /// Tests if `predicate` returns `true` for any item in the stream.
1452    ///
1453    /// The result is `false` for an empty stream.
1454    ///
1455    /// # Examples
1456    ///
1457    /// ```
1458    /// use futures_lite::stream::{self, StreamExt};
1459    ///
1460    /// # spin_on::spin_on(async {
1461    /// let mut s = stream::iter(vec![1, 3, 5, 7]);
1462    /// assert!(!s.any(|x| x % 2 == 0).await);
1463    ///
1464    /// let mut s = stream::iter(vec![1, 2, 3]);
1465    /// assert!(s.any(|x| x % 2 == 0).await);
1466    ///
1467    /// let mut s = stream::empty::<i32>();
1468    /// assert!(!s.any(|x| x % 2 == 0).await);
1469    /// # });
1470    /// ```
1471    fn any<P>(&mut self, predicate: P) -> AnyFuture<'_, Self, P>
1472    where
1473        Self: Unpin,
1474        P: FnMut(Self::Item) -> bool,
1475    {
1476        AnyFuture {
1477            stream: self,
1478            predicate,
1479        }
1480    }
1481
1482    /// Calls a closure on each item of the stream.
1483    ///
1484    /// # Examples
1485    ///
1486    /// ```
1487    /// use futures_lite::stream::{self, StreamExt};
1488    ///
1489    /// # spin_on::spin_on(async {
1490    /// let mut s = stream::iter(vec![1, 2, 3]);
1491    /// s.for_each(|s| println!("{}", s)).await;
1492    /// # });
1493    /// ```
1494    fn for_each<F>(self, f: F) -> ForEachFuture<Self, F>
1495    where
1496        Self: Sized,
1497        F: FnMut(Self::Item),
1498    {
1499        ForEachFuture { stream: self, f }
1500    }
1501
1502    /// Calls a fallible closure on each item of the stream, stopping on first error.
1503    ///
1504    /// # Examples
1505    ///
1506    /// ```
1507    /// use futures_lite::stream::{self, StreamExt};
1508    ///
1509    /// # spin_on::spin_on(async {
1510    /// let mut s = stream::iter(vec![0, 1, 2, 3]);
1511    ///
1512    /// let mut v = vec![];
1513    /// let res = s
1514    ///     .try_for_each(|n| {
1515    ///         if n < 2 {
1516    ///             v.push(n);
1517    ///             Ok(())
1518    ///         } else {
1519    ///             Err("too big")
1520    ///         }
1521    ///     })
1522    ///     .await;
1523    ///
1524    /// assert_eq!(v, &[0, 1]);
1525    /// assert_eq!(res, Err("too big"));
1526    /// # });
1527    /// ```
1528    fn try_for_each<F, E>(&mut self, f: F) -> TryForEachFuture<'_, Self, F>
1529    where
1530        Self: Unpin,
1531        F: FnMut(Self::Item) -> Result<(), E>,
1532    {
1533        TryForEachFuture { stream: self, f }
1534    }
1535
1536    /// Zips up two streams into a single stream of pairs.
1537    ///
1538    /// The stream of pairs stops when either of the original two streams is exhausted.
1539    ///
1540    /// # Examples
1541    ///
1542    /// ```
1543    /// use futures_lite::stream::{self, StreamExt};
1544    ///
1545    /// # spin_on::spin_on(async {
1546    /// let l = stream::iter(vec![1, 2, 3]);
1547    /// let r = stream::iter(vec![4, 5, 6, 7]);
1548    /// let mut s = l.zip(r);
1549    ///
1550    /// assert_eq!(s.next().await, Some((1, 4)));
1551    /// assert_eq!(s.next().await, Some((2, 5)));
1552    /// assert_eq!(s.next().await, Some((3, 6)));
1553    /// assert_eq!(s.next().await, None);
1554    /// # });
1555    /// ```
1556    fn zip<U>(self, other: U) -> Zip<Self, U>
1557    where
1558        Self: Sized,
1559        U: Stream,
1560    {
1561        Zip {
1562            item_slot: None,
1563            first: self,
1564            second: other,
1565        }
1566    }
1567
1568    /// Collects a stream of pairs into a pair of collections.
1569    ///
1570    /// # Examples
1571    ///
1572    /// ```
1573    /// use futures_lite::stream::{self, StreamExt};
1574    ///
1575    /// # spin_on::spin_on(async {
1576    /// let s = stream::iter(vec![(1, 2), (3, 4)]);
1577    /// let (left, right): (Vec<_>, Vec<_>) = s.unzip().await;
1578    ///
1579    /// assert_eq!(left, [1, 3]);
1580    /// assert_eq!(right, [2, 4]);
1581    /// # });
1582    /// ```
1583    fn unzip<A, B, FromA, FromB>(self) -> UnzipFuture<Self, FromA, FromB>
1584    where
1585        FromA: Default + Extend<A>,
1586        FromB: Default + Extend<B>,
1587        Self: Stream<Item = (A, B)> + Sized,
1588    {
1589        UnzipFuture {
1590            stream: self,
1591            res: Some(Default::default()),
1592        }
1593    }
1594
1595    /// Merges with `other` stream, preferring items from `self` whenever both streams are ready.
1596    ///
1597    /// # Examples
1598    ///
1599    /// ```
1600    /// use futures_lite::stream::{self, StreamExt};
1601    /// use futures_lite::stream::{once, pending};
1602    ///
1603    /// # spin_on::spin_on(async {
1604    /// assert_eq!(once(1).or(pending()).next().await, Some(1));
1605    /// assert_eq!(pending().or(once(2)).next().await, Some(2));
1606    ///
1607    /// // The first future wins.
1608    /// assert_eq!(once(1).or(once(2)).next().await, Some(1));
1609    /// # })
1610    /// ```
1611    fn or<S>(self, other: S) -> Or<Self, S>
1612    where
1613        Self: Sized,
1614        S: Stream<Item = Self::Item>,
1615    {
1616        Or {
1617            stream1: self,
1618            stream2: other,
1619        }
1620    }
1621
1622    /// Merges with `other` stream, with no preference for either stream when both are ready.
1623    ///
1624    /// # Examples
1625    ///
1626    /// ```
1627    /// use futures_lite::stream::{self, StreamExt};
1628    /// use futures_lite::stream::{once, pending};
1629    ///
1630    /// # spin_on::spin_on(async {
1631    /// assert_eq!(once(1).race(pending()).next().await, Some(1));
1632    /// assert_eq!(pending().race(once(2)).next().await, Some(2));
1633    ///
1634    /// // One of the two stream is randomly chosen as the winner.
1635    /// let res = once(1).race(once(2)).next().await;
1636    /// # })
1637    /// ```
1638    #[cfg(feature = "std")]
1639    fn race<S>(self, other: S) -> Race<Self, S>
1640    where
1641        Self: Sized,
1642        S: Stream<Item = Self::Item>,
1643    {
1644        Race {
1645            stream1: self,
1646            stream2: other,
1647        }
1648    }
1649
1650    /// Boxes the stream and changes its type to `dyn Stream + Send + 'a`.
1651    ///
1652    /// # Examples
1653    ///
1654    /// ```
1655    /// use futures_lite::stream::{self, StreamExt};
1656    ///
1657    /// # spin_on::spin_on(async {
1658    /// let a = stream::once(1);
1659    /// let b = stream::empty();
1660    ///
1661    /// // Streams of different types can be stored in
1662    /// // the same collection when they are boxed:
1663    /// let streams = vec![a.boxed(), b.boxed()];
1664    /// # })
1665    /// ```
1666    #[cfg(feature = "alloc")]
1667    fn boxed<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + Send + 'a>>
1668    where
1669        Self: Send + Sized + 'a,
1670    {
1671        Box::pin(self)
1672    }
1673
1674    /// Boxes the stream and changes its type to `dyn Stream + 'a`.
1675    ///
1676    /// # Examples
1677    ///
1678    /// ```
1679    /// use futures_lite::stream::{self, StreamExt};
1680    ///
1681    /// # spin_on::spin_on(async {
1682    /// let a = stream::once(1);
1683    /// let b = stream::empty();
1684    ///
1685    /// // Streams of different types can be stored in
1686    /// // the same collection when they are boxed:
1687    /// let streams = vec![a.boxed_local(), b.boxed_local()];
1688    /// # })
1689    /// ```
1690    #[cfg(feature = "alloc")]
1691    fn boxed_local<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a>>
1692    where
1693        Self: Sized + 'a,
1694    {
1695        Box::pin(self)
1696    }
1697}
1698
1699impl<S: Stream + ?Sized> StreamExt for S {}
1700
1701/// Type alias for `Pin<Box<dyn Stream<Item = T> + Send + 'static>>`.
1702///
1703/// # Examples
1704///
1705/// ```
1706/// use futures_lite::stream::{self, StreamExt};
1707///
1708/// // These two lines are equivalent:
1709/// let s1: stream::Boxed<i32> = stream::once(7).boxed();
1710/// let s2: stream::Boxed<i32> = Box::pin(stream::once(7));
1711/// ```
1712#[cfg(feature = "alloc")]
1713pub type Boxed<T> = Pin<Box<dyn Stream<Item = T> + Send + 'static>>;
1714
1715/// Type alias for `Pin<Box<dyn Stream<Item = T> + 'static>>`.
1716///
1717/// # Examples
1718///
1719/// ```
1720/// use futures_lite::stream::{self, StreamExt};
1721///
1722/// // These two lines are equivalent:
1723/// let s1: stream::BoxedLocal<i32> = stream::once(7).boxed_local();
1724/// let s2: stream::BoxedLocal<i32> = Box::pin(stream::once(7));
1725/// ```
1726#[cfg(feature = "alloc")]
1727pub type BoxedLocal<T> = Pin<Box<dyn Stream<Item = T> + 'static>>;
1728
1729/// Future for the [`StreamExt::next()`] method.
1730#[derive(Debug)]
1731#[must_use = "futures do nothing unless you `.await` or poll them"]
1732pub struct NextFuture<'a, S: ?Sized> {
1733    stream: &'a mut S,
1734}
1735
1736impl<S: Unpin + ?Sized> Unpin for NextFuture<'_, S> {}
1737
1738impl<S: Stream + Unpin + ?Sized> Future for NextFuture<'_, S> {
1739    type Output = Option<S::Item>;
1740
1741    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1742        self.stream.poll_next(cx)
1743    }
1744}
1745
1746/// Future for the [`StreamExt::try_next()`] method.
1747#[derive(Debug)]
1748#[must_use = "futures do nothing unless you `.await` or poll them"]
1749pub struct TryNextFuture<'a, S: ?Sized> {
1750    stream: &'a mut S,
1751}
1752
1753impl<S: Unpin + ?Sized> Unpin for TryNextFuture<'_, S> {}
1754
1755impl<T, E, S> Future for TryNextFuture<'_, S>
1756where
1757    S: Stream<Item = Result<T, E>> + Unpin + ?Sized,
1758{
1759    type Output = Result<Option<T>, E>;
1760
1761    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1762        let res = ready!(self.stream.poll_next(cx));
1763        Poll::Ready(res.transpose())
1764    }
1765}
1766
1767pin_project! {
1768    /// Future for the [`StreamExt::count()`] method.
1769    #[derive(Debug)]
1770    #[must_use = "futures do nothing unless you `.await` or poll them"]
1771    pub struct CountFuture<S: ?Sized> {
1772        count: usize,
1773        #[pin]
1774        stream: S,
1775    }
1776}
1777
1778impl<S: Stream + ?Sized> Future for CountFuture<S> {
1779    type Output = usize;
1780
1781    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1782        loop {
1783            match ready!(self.as_mut().project().stream.poll_next(cx)) {
1784                None => return Poll::Ready(self.count),
1785                Some(_) => *self.as_mut().project().count += 1,
1786            }
1787        }
1788    }
1789}
1790
1791pin_project! {
1792    /// Future for the [`StreamExt::collect()`] method.
1793    #[derive(Debug)]
1794    #[must_use = "futures do nothing unless you `.await` or poll them"]
1795    pub struct CollectFuture<S, C> {
1796        #[pin]
1797        stream: S,
1798        collection: C,
1799    }
1800}
1801
1802impl<S, C> Future for CollectFuture<S, C>
1803where
1804    S: Stream,
1805    C: Default + Extend<S::Item>,
1806{
1807    type Output = C;
1808
1809    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C> {
1810        let mut this = self.as_mut().project();
1811        loop {
1812            match ready!(this.stream.as_mut().poll_next(cx)) {
1813                Some(e) => this.collection.extend(Some(e)),
1814                None => {
1815                    return Poll::Ready(mem::replace(self.project().collection, Default::default()))
1816                }
1817            }
1818        }
1819    }
1820}
1821
1822pin_project! {
1823    /// Future for the [`StreamExt::try_collect()`] method.
1824    #[derive(Debug)]
1825    #[must_use = "futures do nothing unless you `.await` or poll them"]
1826    pub struct TryCollectFuture<S, C> {
1827        #[pin]
1828        stream: S,
1829        items: C,
1830    }
1831}
1832
1833impl<T, E, S, C> Future for TryCollectFuture<S, C>
1834where
1835    S: Stream<Item = Result<T, E>>,
1836    C: Default + Extend<T>,
1837{
1838    type Output = Result<C, E>;
1839
1840    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1841        let mut this = self.project();
1842        Poll::Ready(Ok(loop {
1843            match ready!(this.stream.as_mut().poll_next(cx)?) {
1844                Some(x) => this.items.extend(Some(x)),
1845                None => break mem::replace(this.items, Default::default()),
1846            }
1847        }))
1848    }
1849}
1850
1851pin_project! {
1852    /// Future for the [`StreamExt::partition()`] method.
1853    #[derive(Debug)]
1854    #[must_use = "futures do nothing unless you `.await` or poll them"]
1855    pub struct PartitionFuture<S, P, B> {
1856        #[pin]
1857        stream: S,
1858        predicate: P,
1859        res: Option<(B, B)>,
1860    }
1861}
1862
1863impl<S, P, B> Future for PartitionFuture<S, P, B>
1864where
1865    S: Stream + Sized,
1866    P: FnMut(&S::Item) -> bool,
1867    B: Default + Extend<S::Item>,
1868{
1869    type Output = (B, B);
1870
1871    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1872        let mut this = self.project();
1873        loop {
1874            match ready!(this.stream.as_mut().poll_next(cx)) {
1875                Some(v) => {
1876                    let res = this.res.as_mut().unwrap();
1877                    if (this.predicate)(&v) {
1878                        res.0.extend(Some(v))
1879                    } else {
1880                        res.1.extend(Some(v))
1881                    }
1882                }
1883                None => return Poll::Ready(this.res.take().unwrap()),
1884            }
1885        }
1886    }
1887}
1888
1889pin_project! {
1890    /// Future for the [`StreamExt::fold()`] method.
1891    #[derive(Debug)]
1892    #[must_use = "futures do nothing unless you `.await` or poll them"]
1893    pub struct FoldFuture<S, F, T> {
1894        #[pin]
1895        stream: S,
1896        f: F,
1897        acc: Option<T>,
1898    }
1899}
1900
1901impl<S, F, T> Future for FoldFuture<S, F, T>
1902where
1903    S: Stream,
1904    F: FnMut(T, S::Item) -> T,
1905{
1906    type Output = T;
1907
1908    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1909        let mut this = self.project();
1910        loop {
1911            match ready!(this.stream.as_mut().poll_next(cx)) {
1912                Some(v) => {
1913                    let old = this.acc.take().unwrap();
1914                    let new = (this.f)(old, v);
1915                    *this.acc = Some(new);
1916                }
1917                None => return Poll::Ready(this.acc.take().unwrap()),
1918            }
1919        }
1920    }
1921}
1922
1923/// Future for the [`StreamExt::try_fold()`] method.
1924#[derive(Debug)]
1925#[must_use = "futures do nothing unless you `.await` or poll them"]
1926pub struct TryFoldFuture<'a, S, F, B> {
1927    stream: &'a mut S,
1928    f: F,
1929    acc: Option<B>,
1930}
1931
1932impl<'a, S, F, B> Unpin for TryFoldFuture<'a, S, F, B> {}
1933
1934impl<'a, T, E, S, F, B> Future for TryFoldFuture<'a, S, F, B>
1935where
1936    S: Stream<Item = Result<T, E>> + Unpin,
1937    F: FnMut(B, T) -> Result<B, E>,
1938{
1939    type Output = Result<B, E>;
1940
1941    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1942        loop {
1943            match ready!(self.stream.poll_next(cx)) {
1944                Some(Err(e)) => return Poll::Ready(Err(e)),
1945                Some(Ok(t)) => {
1946                    let old = self.acc.take().unwrap();
1947                    let new = (&mut self.f)(old, t);
1948
1949                    match new {
1950                        Ok(t) => self.acc = Some(t),
1951                        Err(e) => return Poll::Ready(Err(e)),
1952                    }
1953                }
1954                None => return Poll::Ready(Ok(self.acc.take().unwrap())),
1955            }
1956        }
1957    }
1958}
1959
1960pin_project! {
1961    /// Stream for the [`StreamExt::scan()`] method.
1962    #[derive(Clone, Debug)]
1963    #[must_use = "streams do nothing unless polled"]
1964    pub struct Scan<S, St, F> {
1965        #[pin]
1966        stream: S,
1967        state_f: (St, F),
1968    }
1969}
1970
1971impl<S, St, F, B> Stream for Scan<S, St, F>
1972where
1973    S: Stream,
1974    F: FnMut(&mut St, S::Item) -> Option<B>,
1975{
1976    type Item = B;
1977
1978    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<B>> {
1979        let mut this = self.project();
1980        this.stream.as_mut().poll_next(cx).map(|item| {
1981            item.and_then(|item| {
1982                let (state, f) = this.state_f;
1983                f(state, item)
1984            })
1985        })
1986    }
1987}
1988
1989pin_project! {
1990    /// Stream for the [`StreamExt::fuse()`] method.
1991    #[derive(Clone, Debug)]
1992    #[must_use = "streams do nothing unless polled"]
1993    pub struct Fuse<S> {
1994        #[pin]
1995        stream: S,
1996        done: bool,
1997    }
1998}
1999
2000impl<S: Stream> Stream for Fuse<S> {
2001    type Item = S::Item;
2002
2003    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
2004        let this = self.project();
2005
2006        if *this.done {
2007            Poll::Ready(None)
2008        } else {
2009            let next = ready!(this.stream.poll_next(cx));
2010            if next.is_none() {
2011                *this.done = true;
2012            }
2013            Poll::Ready(next)
2014        }
2015    }
2016}
2017
2018pin_project! {
2019    /// Stream for the [`StreamExt::map()`] method.
2020    #[derive(Clone, Debug)]
2021    #[must_use = "streams do nothing unless polled"]
2022    pub struct Map<S, F> {
2023        #[pin]
2024        stream: S,
2025        f: F,
2026    }
2027}
2028
2029impl<S, F, T> Stream for Map<S, F>
2030where
2031    S: Stream,
2032    F: FnMut(S::Item) -> T,
2033{
2034    type Item = T;
2035
2036    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2037        let this = self.project();
2038        let next = ready!(this.stream.poll_next(cx));
2039        Poll::Ready(next.map(this.f))
2040    }
2041
2042    fn size_hint(&self) -> (usize, Option<usize>) {
2043        self.stream.size_hint()
2044    }
2045}
2046
2047pin_project! {
2048    /// Stream for the [`StreamExt::flat_map()`] method.
2049    #[derive(Clone, Debug)]
2050    #[must_use = "streams do nothing unless polled"]
2051    pub struct FlatMap<S, U, F> {
2052        #[pin]
2053        stream: Map<S, F>,
2054        #[pin]
2055        inner_stream: Option<U>,
2056    }
2057}
2058
2059impl<S, U, F> Stream for FlatMap<S, U, F>
2060where
2061    S: Stream,
2062    U: Stream,
2063    F: FnMut(S::Item) -> U,
2064{
2065    type Item = U::Item;
2066
2067    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2068        let mut this = self.project();
2069        loop {
2070            if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
2071                match ready!(inner.poll_next(cx)) {
2072                    Some(item) => return Poll::Ready(Some(item)),
2073                    None => this.inner_stream.set(None),
2074                }
2075            }
2076
2077            match ready!(this.stream.as_mut().poll_next(cx)) {
2078                Some(stream) => this.inner_stream.set(Some(stream)),
2079                None => return Poll::Ready(None),
2080            }
2081        }
2082    }
2083}
2084
2085pin_project! {
2086    /// Stream for the [`StreamExt::flat_map()`] method.
2087    #[derive(Clone, Debug)]
2088    #[must_use = "streams do nothing unless polled"]
2089    pub struct Flatten<S: Stream> {
2090        #[pin]
2091        stream: S,
2092        #[pin]
2093        inner_stream: Option<S::Item>,
2094    }
2095}
2096
2097impl<S, U> Stream for Flatten<S>
2098where
2099    S: Stream<Item = U>,
2100    U: Stream,
2101{
2102    type Item = U::Item;
2103
2104    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2105        let mut this = self.project();
2106        loop {
2107            if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
2108                match ready!(inner.poll_next(cx)) {
2109                    Some(item) => return Poll::Ready(Some(item)),
2110                    None => this.inner_stream.set(None),
2111                }
2112            }
2113
2114            match ready!(this.stream.as_mut().poll_next(cx)) {
2115                Some(inner) => this.inner_stream.set(Some(inner)),
2116                None => return Poll::Ready(None),
2117            }
2118        }
2119    }
2120}
2121
2122pin_project! {
2123    /// Stream for the [`StreamExt::then()`] method.
2124    #[derive(Clone, Debug)]
2125    #[must_use = "streams do nothing unless polled"]
2126    pub struct Then<S, F, Fut> {
2127        #[pin]
2128        stream: S,
2129        #[pin]
2130        future: Option<Fut>,
2131        f: F,
2132    }
2133}
2134
2135impl<S, F, Fut> Stream for Then<S, F, Fut>
2136where
2137    S: Stream,
2138    F: FnMut(S::Item) -> Fut,
2139    Fut: Future,
2140{
2141    type Item = Fut::Output;
2142
2143    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2144        let mut this = self.project();
2145
2146        loop {
2147            if let Some(fut) = this.future.as_mut().as_pin_mut() {
2148                let item = ready!(fut.poll(cx));
2149                this.future.set(None);
2150                return Poll::Ready(Some(item));
2151            } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
2152                this.future.set(Some((this.f)(item)));
2153            } else {
2154                return Poll::Ready(None);
2155            }
2156        }
2157    }
2158
2159    fn size_hint(&self) -> (usize, Option<usize>) {
2160        let future_len = if self.future.is_some() { 1 } else { 0 };
2161        let (lower, upper) = self.stream.size_hint();
2162        let lower = lower.saturating_add(future_len);
2163        let upper = upper.and_then(|u| u.checked_add(future_len));
2164        (lower, upper)
2165    }
2166}
2167
2168pin_project! {
2169    /// Stream for the [`StreamExt::filter()`] method.
2170    #[derive(Clone, Debug)]
2171    #[must_use = "streams do nothing unless polled"]
2172    pub struct Filter<S, P> {
2173        #[pin]
2174        stream: S,
2175        predicate: P,
2176    }
2177}
2178
2179impl<S, P> Stream for Filter<S, P>
2180where
2181    S: Stream,
2182    P: FnMut(&S::Item) -> bool,
2183{
2184    type Item = S::Item;
2185
2186    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2187        let mut this = self.project();
2188        loop {
2189            match ready!(this.stream.as_mut().poll_next(cx)) {
2190                None => return Poll::Ready(None),
2191                Some(v) if (this.predicate)(&v) => return Poll::Ready(Some(v)),
2192                Some(_) => {}
2193            }
2194        }
2195    }
2196}
2197
2198/// Merges two streams, preferring items from `stream1` whenever both streams are ready.
2199///
2200/// # Examples
2201///
2202/// ```
2203/// use futures_lite::stream::{self, once, pending, StreamExt};
2204///
2205/// # spin_on::spin_on(async {
2206/// assert_eq!(stream::or(once(1), pending()).next().await, Some(1));
2207/// assert_eq!(stream::or(pending(), once(2)).next().await, Some(2));
2208///
2209/// // The first stream wins.
2210/// assert_eq!(stream::or(once(1), once(2)).next().await, Some(1));
2211/// # })
2212/// ```
2213pub fn or<T, S1, S2>(stream1: S1, stream2: S2) -> Or<S1, S2>
2214where
2215    S1: Stream<Item = T>,
2216    S2: Stream<Item = T>,
2217{
2218    Or { stream1, stream2 }
2219}
2220
2221pin_project! {
2222    /// Stream for the [`or()`] function and the [`StreamExt::or()`] method.
2223    #[derive(Clone, Debug)]
2224    #[must_use = "streams do nothing unless polled"]
2225    pub struct Or<S1, S2> {
2226        #[pin]
2227        stream1: S1,
2228        #[pin]
2229        stream2: S2,
2230    }
2231}
2232
2233impl<T, S1, S2> Stream for Or<S1, S2>
2234where
2235    S1: Stream<Item = T>,
2236    S2: Stream<Item = T>,
2237{
2238    type Item = T;
2239
2240    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2241        let mut this = self.project();
2242
2243        if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2244            return Poll::Ready(Some(t));
2245        }
2246        this.stream2.as_mut().poll_next(cx)
2247    }
2248}
2249
2250/// Merges two streams, with no preference for either stream when both are ready.
2251///
2252/// # Examples
2253///
2254/// ```
2255/// use futures_lite::stream::{self, once, pending, StreamExt};
2256///
2257/// # spin_on::spin_on(async {
2258/// assert_eq!(stream::race(once(1), pending()).next().await, Some(1));
2259/// assert_eq!(stream::race(pending(), once(2)).next().await, Some(2));
2260///
2261/// // One of the two stream is randomly chosen as the winner.
2262/// let res = stream::race(once(1), once(2)).next().await;
2263/// # })
2264#[cfg(feature = "std")]
2265pub fn race<T, S1, S2>(stream1: S1, stream2: S2) -> Race<S1, S2>
2266where
2267    S1: Stream<Item = T>,
2268    S2: Stream<Item = T>,
2269{
2270    Race { stream1, stream2 }
2271}
2272
2273#[cfg(feature = "std")]
2274pin_project! {
2275    /// Stream for the [`race()`] function and the [`StreamExt::race()`] method.
2276    #[derive(Clone, Debug)]
2277    #[must_use = "streams do nothing unless polled"]
2278    pub struct Race<S1, S2> {
2279        #[pin]
2280        stream1: S1,
2281        #[pin]
2282        stream2: S2,
2283    }
2284}
2285
2286#[cfg(feature = "std")]
2287impl<T, S1, S2> Stream for Race<S1, S2>
2288where
2289    S1: Stream<Item = T>,
2290    S2: Stream<Item = T>,
2291{
2292    type Item = T;
2293
2294    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2295        let mut this = self.project();
2296
2297        if fastrand::bool() {
2298            if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2299                return Poll::Ready(Some(t));
2300            }
2301            if let Poll::Ready(Some(t)) = this.stream2.as_mut().poll_next(cx) {
2302                return Poll::Ready(Some(t));
2303            }
2304        } else {
2305            if let Poll::Ready(Some(t)) = this.stream2.as_mut().poll_next(cx) {
2306                return Poll::Ready(Some(t));
2307            }
2308            if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2309                return Poll::Ready(Some(t));
2310            }
2311        }
2312        Poll::Pending
2313    }
2314}
2315
2316pin_project! {
2317    /// Stream for the [`StreamExt::filter_map()`] method.
2318    #[derive(Clone, Debug)]
2319    #[must_use = "streams do nothing unless polled"]
2320    pub struct FilterMap<S, F> {
2321        #[pin]
2322        stream: S,
2323        f: F,
2324    }
2325}
2326
2327impl<S, F, T> Stream for FilterMap<S, F>
2328where
2329    S: Stream,
2330    F: FnMut(S::Item) -> Option<T>,
2331{
2332    type Item = T;
2333
2334    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2335        let mut this = self.project();
2336        loop {
2337            match ready!(this.stream.as_mut().poll_next(cx)) {
2338                None => return Poll::Ready(None),
2339                Some(v) => {
2340                    if let Some(t) = (this.f)(v) {
2341                        return Poll::Ready(Some(t));
2342                    }
2343                }
2344            }
2345        }
2346    }
2347}
2348
2349pin_project! {
2350    /// Stream for the [`StreamExt::take()`] method.
2351    #[derive(Clone, Debug)]
2352    #[must_use = "streams do nothing unless polled"]
2353    pub struct Take<S> {
2354        #[pin]
2355        stream: S,
2356        n: usize,
2357    }
2358}
2359
2360impl<S: Stream> Stream for Take<S> {
2361    type Item = S::Item;
2362
2363    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
2364        let this = self.project();
2365
2366        if *this.n == 0 {
2367            Poll::Ready(None)
2368        } else {
2369            let next = ready!(this.stream.poll_next(cx));
2370            match next {
2371                Some(_) => *this.n -= 1,
2372                None => *this.n = 0,
2373            }
2374            Poll::Ready(next)
2375        }
2376    }
2377}
2378
2379pin_project! {
2380    /// Stream for the [`StreamExt::take_while()`] method.
2381    #[derive(Clone, Debug)]
2382    #[must_use = "streams do nothing unless polled"]
2383    pub struct TakeWhile<S, P> {
2384        #[pin]
2385        stream: S,
2386        predicate: P,
2387    }
2388}
2389
2390impl<S, P> Stream for TakeWhile<S, P>
2391where
2392    S: Stream,
2393    P: FnMut(&S::Item) -> bool,
2394{
2395    type Item = S::Item;
2396
2397    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2398        let this = self.project();
2399
2400        match ready!(this.stream.poll_next(cx)) {
2401            Some(v) => {
2402                if (this.predicate)(&v) {
2403                    Poll::Ready(Some(v))
2404                } else {
2405                    Poll::Ready(None)
2406                }
2407            }
2408            None => Poll::Ready(None),
2409        }
2410    }
2411}
2412
2413pin_project! {
2414    /// Stream for the [`StreamExt::skip()`] method.
2415    #[derive(Clone, Debug)]
2416    #[must_use = "streams do nothing unless polled"]
2417    pub struct Skip<S> {
2418        #[pin]
2419        stream: S,
2420        n: usize,
2421    }
2422}
2423
2424impl<S: Stream> Stream for Skip<S> {
2425    type Item = S::Item;
2426
2427    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2428        let mut this = self.project();
2429        loop {
2430            match ready!(this.stream.as_mut().poll_next(cx)) {
2431                Some(v) => match *this.n {
2432                    0 => return Poll::Ready(Some(v)),
2433                    _ => *this.n -= 1,
2434                },
2435                None => return Poll::Ready(None),
2436            }
2437        }
2438    }
2439}
2440
2441pin_project! {
2442    /// Stream for the [`StreamExt::skip_while()`] method.
2443    #[derive(Clone, Debug)]
2444    #[must_use = "streams do nothing unless polled"]
2445    pub struct SkipWhile<S, P> {
2446        #[pin]
2447        stream: S,
2448        predicate: Option<P>,
2449    }
2450}
2451
2452impl<S, P> Stream for SkipWhile<S, P>
2453where
2454    S: Stream,
2455    P: FnMut(&S::Item) -> bool,
2456{
2457    type Item = S::Item;
2458
2459    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2460        let mut this = self.project();
2461        loop {
2462            match ready!(this.stream.as_mut().poll_next(cx)) {
2463                Some(v) => match this.predicate {
2464                    Some(p) => {
2465                        if !p(&v) {
2466                            *this.predicate = None;
2467                            return Poll::Ready(Some(v));
2468                        }
2469                    }
2470                    None => return Poll::Ready(Some(v)),
2471                },
2472                None => return Poll::Ready(None),
2473            }
2474        }
2475    }
2476}
2477
2478pin_project! {
2479    /// Stream for the [`StreamExt::step_by()`] method.
2480    #[derive(Clone, Debug)]
2481    #[must_use = "streams do nothing unless polled"]
2482    pub struct StepBy<S> {
2483        #[pin]
2484        stream: S,
2485        step: usize,
2486        i: usize,
2487    }
2488}
2489
2490impl<S: Stream> Stream for StepBy<S> {
2491    type Item = S::Item;
2492
2493    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2494        let mut this = self.project();
2495        loop {
2496            match ready!(this.stream.as_mut().poll_next(cx)) {
2497                Some(v) => {
2498                    if *this.i == 0 {
2499                        *this.i = *this.step - 1;
2500                        return Poll::Ready(Some(v));
2501                    } else {
2502                        *this.i -= 1;
2503                    }
2504                }
2505                None => return Poll::Ready(None),
2506            }
2507        }
2508    }
2509}
2510
2511pin_project! {
2512    /// Stream for the [`StreamExt::chain()`] method.
2513    #[derive(Clone, Debug)]
2514    #[must_use = "streams do nothing unless polled"]
2515    pub struct Chain<S, U> {
2516        #[pin]
2517        first: Fuse<S>,
2518        #[pin]
2519        second: Fuse<U>,
2520    }
2521}
2522
2523impl<S: Stream, U: Stream<Item = S::Item>> Stream for Chain<S, U> {
2524    type Item = S::Item;
2525
2526    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2527        let mut this = self.project();
2528
2529        if !this.first.done {
2530            let next = ready!(this.first.as_mut().poll_next(cx));
2531            if let Some(next) = next {
2532                return Poll::Ready(Some(next));
2533            }
2534        }
2535
2536        if !this.second.done {
2537            let next = ready!(this.second.as_mut().poll_next(cx));
2538            if let Some(next) = next {
2539                return Poll::Ready(Some(next));
2540            }
2541        }
2542
2543        if this.first.done && this.second.done {
2544            Poll::Ready(None)
2545        } else {
2546            Poll::Pending
2547        }
2548    }
2549}
2550
2551pin_project! {
2552    /// Stream for the [`StreamExt::cloned()`] method.
2553    #[derive(Clone, Debug)]
2554    #[must_use = "streams do nothing unless polled"]
2555    pub struct Cloned<S> {
2556        #[pin]
2557        stream: S,
2558    }
2559}
2560
2561impl<'a, S, T: 'a> Stream for Cloned<S>
2562where
2563    S: Stream<Item = &'a T>,
2564    T: Clone,
2565{
2566    type Item = T;
2567
2568    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2569        let this = self.project();
2570        let next = ready!(this.stream.poll_next(cx));
2571        Poll::Ready(next.cloned())
2572    }
2573}
2574
2575pin_project! {
2576    /// Stream for the [`StreamExt::copied()`] method.
2577    #[derive(Clone, Debug)]
2578    #[must_use = "streams do nothing unless polled"]
2579    pub struct Copied<S> {
2580        #[pin]
2581        stream: S,
2582    }
2583}
2584
2585impl<'a, S, T: 'a> Stream for Copied<S>
2586where
2587    S: Stream<Item = &'a T>,
2588    T: Copy,
2589{
2590    type Item = T;
2591
2592    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2593        let this = self.project();
2594        let next = ready!(this.stream.poll_next(cx));
2595        Poll::Ready(next.copied())
2596    }
2597}
2598
2599pin_project! {
2600    /// Stream for the [`StreamExt::cycle()`] method.
2601    #[derive(Clone, Debug)]
2602    #[must_use = "streams do nothing unless polled"]
2603    pub struct Cycle<S> {
2604        orig: S,
2605        #[pin]
2606        stream: S,
2607    }
2608}
2609
2610impl<S> Stream for Cycle<S>
2611where
2612    S: Stream + Clone,
2613{
2614    type Item = S::Item;
2615
2616    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2617        match ready!(self.as_mut().project().stream.as_mut().poll_next(cx)) {
2618            Some(item) => Poll::Ready(Some(item)),
2619            None => {
2620                let new = self.as_mut().orig.clone();
2621                self.as_mut().project().stream.set(new);
2622                self.project().stream.poll_next(cx)
2623            }
2624        }
2625    }
2626}
2627
2628pin_project! {
2629    /// Stream for the [`StreamExt::cycle()`] method.
2630    #[derive(Clone, Debug)]
2631    #[must_use = "streams do nothing unless polled"]
2632    pub struct Enumerate<S> {
2633        #[pin]
2634        stream: S,
2635        i: usize,
2636    }
2637}
2638
2639impl<S> Stream for Enumerate<S>
2640where
2641    S: Stream,
2642{
2643    type Item = (usize, S::Item);
2644
2645    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2646        let this = self.project();
2647
2648        match ready!(this.stream.poll_next(cx)) {
2649            Some(v) => {
2650                let ret = (*this.i, v);
2651                *this.i += 1;
2652                Poll::Ready(Some(ret))
2653            }
2654            None => Poll::Ready(None),
2655        }
2656    }
2657}
2658
2659pin_project! {
2660    /// Stream for the [`StreamExt::inspect()`] method.
2661    #[derive(Clone, Debug)]
2662    #[must_use = "streams do nothing unless polled"]
2663    pub struct Inspect<S, F> {
2664        #[pin]
2665        stream: S,
2666        f: F,
2667    }
2668}
2669
2670impl<S, F> Stream for Inspect<S, F>
2671where
2672    S: Stream,
2673    F: FnMut(&S::Item),
2674{
2675    type Item = S::Item;
2676
2677    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2678        let mut this = self.project();
2679        let next = ready!(this.stream.as_mut().poll_next(cx));
2680        if let Some(x) = &next {
2681            (this.f)(x);
2682        }
2683        Poll::Ready(next)
2684    }
2685}
2686
2687/// Future for the [`StreamExt::nth()`] method.
2688#[derive(Debug)]
2689#[must_use = "futures do nothing unless you `.await` or poll them"]
2690pub struct NthFuture<'a, S: ?Sized> {
2691    stream: &'a mut S,
2692    n: usize,
2693}
2694
2695impl<S: Unpin + ?Sized> Unpin for NthFuture<'_, S> {}
2696
2697impl<'a, S> Future for NthFuture<'a, S>
2698where
2699    S: Stream + Unpin + ?Sized,
2700{
2701    type Output = Option<S::Item>;
2702
2703    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2704        loop {
2705            match ready!(self.stream.poll_next(cx)) {
2706                Some(v) => match self.n {
2707                    0 => return Poll::Ready(Some(v)),
2708                    _ => self.n -= 1,
2709                },
2710                None => return Poll::Ready(None),
2711            }
2712        }
2713    }
2714}
2715
2716pin_project! {
2717    /// Future for the [`StreamExt::last()`] method.
2718    #[derive(Debug)]
2719    #[must_use = "futures do nothing unless you `.await` or poll them"]
2720    pub struct LastFuture<S: Stream> {
2721        #[pin]
2722        stream: S,
2723        last: Option<S::Item>,
2724    }
2725}
2726
2727impl<S: Stream> Future for LastFuture<S> {
2728    type Output = Option<S::Item>;
2729
2730    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2731        let mut this = self.project();
2732        loop {
2733            match ready!(this.stream.as_mut().poll_next(cx)) {
2734                Some(new) => *this.last = Some(new),
2735                None => return Poll::Ready(this.last.take()),
2736            }
2737        }
2738    }
2739}
2740
2741/// Future for the [`StreamExt::find()`] method.
2742#[derive(Debug)]
2743#[must_use = "futures do nothing unless you `.await` or poll them"]
2744pub struct FindFuture<'a, S: ?Sized, P> {
2745    stream: &'a mut S,
2746    predicate: P,
2747}
2748
2749impl<S: Unpin + ?Sized, P> Unpin for FindFuture<'_, S, P> {}
2750
2751impl<'a, S, P> Future for FindFuture<'a, S, P>
2752where
2753    S: Stream + Unpin + ?Sized,
2754    P: FnMut(&S::Item) -> bool,
2755{
2756    type Output = Option<S::Item>;
2757
2758    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2759        loop {
2760            match ready!(self.stream.poll_next(cx)) {
2761                Some(v) if (&mut self.predicate)(&v) => return Poll::Ready(Some(v)),
2762                Some(_) => {}
2763                None => return Poll::Ready(None),
2764            }
2765        }
2766    }
2767}
2768
2769/// Future for the [`StreamExt::find_map()`] method.
2770#[derive(Debug)]
2771#[must_use = "futures do nothing unless you `.await` or poll them"]
2772pub struct FindMapFuture<'a, S: ?Sized, F> {
2773    stream: &'a mut S,
2774    f: F,
2775}
2776
2777impl<S: Unpin + ?Sized, F> Unpin for FindMapFuture<'_, S, F> {}
2778
2779impl<'a, S, B, F> Future for FindMapFuture<'a, S, F>
2780where
2781    S: Stream + Unpin + ?Sized,
2782    F: FnMut(S::Item) -> Option<B>,
2783{
2784    type Output = Option<B>;
2785
2786    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2787        loop {
2788            match ready!(self.stream.poll_next(cx)) {
2789                Some(v) => {
2790                    if let Some(v) = (&mut self.f)(v) {
2791                        return Poll::Ready(Some(v));
2792                    }
2793                }
2794                None => return Poll::Ready(None),
2795            }
2796        }
2797    }
2798}
2799
2800/// Future for the [`StreamExt::position()`] method.
2801#[derive(Debug)]
2802#[must_use = "futures do nothing unless you `.await` or poll them"]
2803pub struct PositionFuture<'a, S: ?Sized, P> {
2804    stream: &'a mut S,
2805    predicate: P,
2806    index: usize,
2807}
2808
2809impl<'a, S: Unpin + ?Sized, P> Unpin for PositionFuture<'a, S, P> {}
2810
2811impl<'a, S, P> Future for PositionFuture<'a, S, P>
2812where
2813    S: Stream + Unpin + ?Sized,
2814    P: FnMut(S::Item) -> bool,
2815{
2816    type Output = Option<usize>;
2817
2818    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2819        loop {
2820            match ready!(self.stream.poll_next(cx)) {
2821                Some(v) => {
2822                    if (&mut self.predicate)(v) {
2823                        return Poll::Ready(Some(self.index));
2824                    } else {
2825                        self.index += 1;
2826                    }
2827                }
2828                None => return Poll::Ready(None),
2829            }
2830        }
2831    }
2832}
2833
2834/// Future for the [`StreamExt::all()`] method.
2835#[derive(Debug)]
2836#[must_use = "futures do nothing unless you `.await` or poll them"]
2837pub struct AllFuture<'a, S: ?Sized, P> {
2838    stream: &'a mut S,
2839    predicate: P,
2840}
2841
2842impl<S: Unpin + ?Sized, P> Unpin for AllFuture<'_, S, P> {}
2843
2844impl<S, P> Future for AllFuture<'_, S, P>
2845where
2846    S: Stream + Unpin + ?Sized,
2847    P: FnMut(S::Item) -> bool,
2848{
2849    type Output = bool;
2850
2851    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2852        loop {
2853            match ready!(self.stream.poll_next(cx)) {
2854                Some(v) => {
2855                    if !(&mut self.predicate)(v) {
2856                        return Poll::Ready(false);
2857                    }
2858                }
2859                None => return Poll::Ready(true),
2860            }
2861        }
2862    }
2863}
2864
2865/// Future for the [`StreamExt::any()`] method.
2866#[derive(Debug)]
2867#[must_use = "futures do nothing unless you `.await` or poll them"]
2868pub struct AnyFuture<'a, S: ?Sized, P> {
2869    stream: &'a mut S,
2870    predicate: P,
2871}
2872
2873impl<S: Unpin + ?Sized, P> Unpin for AnyFuture<'_, S, P> {}
2874
2875impl<S, P> Future for AnyFuture<'_, S, P>
2876where
2877    S: Stream + Unpin + ?Sized,
2878    P: FnMut(S::Item) -> bool,
2879{
2880    type Output = bool;
2881
2882    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2883        loop {
2884            match ready!(self.stream.poll_next(cx)) {
2885                Some(v) => {
2886                    if (&mut self.predicate)(v) {
2887                        return Poll::Ready(true);
2888                    }
2889                }
2890                None => return Poll::Ready(false),
2891            }
2892        }
2893    }
2894}
2895
2896pin_project! {
2897    /// Future for the [`StreamExt::for_each()`] method.
2898    #[derive(Debug)]
2899    #[must_use = "futures do nothing unless you `.await` or poll them"]
2900    pub struct ForEachFuture<S, F> {
2901        #[pin]
2902        stream: S,
2903        f: F,
2904    }
2905}
2906
2907impl<S, F> Future for ForEachFuture<S, F>
2908where
2909    S: Stream,
2910    F: FnMut(S::Item),
2911{
2912    type Output = ();
2913
2914    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2915        let mut this = self.project();
2916        loop {
2917            match ready!(this.stream.as_mut().poll_next(cx)) {
2918                Some(v) => (this.f)(v),
2919                None => return Poll::Ready(()),
2920            }
2921        }
2922    }
2923}
2924
2925/// Future for the [`StreamExt::try_for_each()`] method.
2926#[derive(Debug)]
2927#[must_use = "futures do nothing unless you `.await` or poll them"]
2928pub struct TryForEachFuture<'a, S: ?Sized, F> {
2929    stream: &'a mut S,
2930    f: F,
2931}
2932
2933impl<'a, S: Unpin + ?Sized, F> Unpin for TryForEachFuture<'a, S, F> {}
2934
2935impl<'a, S, F, E> Future for TryForEachFuture<'a, S, F>
2936where
2937    S: Stream + Unpin + ?Sized,
2938    F: FnMut(S::Item) -> Result<(), E>,
2939{
2940    type Output = Result<(), E>;
2941
2942    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2943        loop {
2944            match ready!(self.stream.poll_next(cx)) {
2945                None => return Poll::Ready(Ok(())),
2946                Some(v) => (&mut self.f)(v)?,
2947            }
2948        }
2949    }
2950}
2951
2952pin_project! {
2953    /// Stream for the [`StreamExt::zip()`] method.
2954    #[derive(Clone, Debug)]
2955    #[must_use = "streams do nothing unless polled"]
2956    pub struct Zip<A: Stream, B> {
2957        item_slot: Option<A::Item>,
2958        #[pin]
2959        first: A,
2960        #[pin]
2961        second: B,
2962    }
2963}
2964
2965impl<A: Stream, B: Stream> Stream for Zip<A, B> {
2966    type Item = (A::Item, B::Item);
2967
2968    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2969        let this = self.project();
2970
2971        if this.item_slot.is_none() {
2972            match this.first.poll_next(cx) {
2973                Poll::Pending => return Poll::Pending,
2974                Poll::Ready(None) => return Poll::Ready(None),
2975                Poll::Ready(Some(item)) => *this.item_slot = Some(item),
2976            }
2977        }
2978
2979        let second_item = ready!(this.second.poll_next(cx));
2980        let first_item = this.item_slot.take().unwrap();
2981        Poll::Ready(second_item.map(|second_item| (first_item, second_item)))
2982    }
2983}
2984
2985pin_project! {
2986    /// Future for the [`StreamExt::unzip()`] method.
2987    #[derive(Debug)]
2988    #[must_use = "futures do nothing unless you `.await` or poll them"]
2989    pub struct UnzipFuture<S, FromA, FromB> {
2990        #[pin]
2991        stream: S,
2992        res: Option<(FromA, FromB)>,
2993    }
2994}
2995
2996impl<S, A, B, FromA, FromB> Future for UnzipFuture<S, FromA, FromB>
2997where
2998    S: Stream<Item = (A, B)>,
2999    FromA: Default + Extend<A>,
3000    FromB: Default + Extend<B>,
3001{
3002    type Output = (FromA, FromB);
3003
3004    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3005        let mut this = self.project();
3006
3007        loop {
3008            match ready!(this.stream.as_mut().poll_next(cx)) {
3009                Some((a, b)) => {
3010                    let res = this.res.as_mut().unwrap();
3011                    res.0.extend(Some(a));
3012                    res.1.extend(Some(b));
3013                }
3014                None => return Poll::Ready(this.res.take().unwrap()),
3015            }
3016        }
3017    }
3018}