1#[cfg(feature = "alloc")]
19extern crate alloc;
20
21#[doc(no_inline)]
22pub use futures_core::stream::Stream;
23
24#[cfg(feature = "alloc")]
25use alloc::boxed::Box;
26
27use core::fmt;
28use core::future::Future;
29use core::marker::PhantomData;
30use core::mem;
31use core::pin::Pin;
32use core::task::{Context, Poll};
33
34use pin_project_lite::pin_project;
35
36use crate::ready;
37
38#[cfg(feature = "std")]
53pub fn block_on<S: Stream + Unpin>(stream: S) -> BlockOn<S> {
54 BlockOn(stream)
55}
56
57#[derive(Debug)]
59pub struct BlockOn<S>(S);
60
61#[cfg(feature = "std")]
62impl<S: Stream + Unpin> Iterator for BlockOn<S> {
63 type Item = S::Item;
64
65 fn next(&mut self) -> Option<Self::Item> {
66 crate::future::block_on(self.0.next())
67 }
68}
69
70pub fn empty<T>() -> Empty<T> {
83 Empty {
84 _marker: PhantomData,
85 }
86}
87
88#[derive(Clone, Debug)]
90#[must_use = "streams do nothing unless polled"]
91pub struct Empty<T> {
92 _marker: PhantomData<T>,
93}
94
95impl<T> Unpin for Empty<T> {}
96
97impl<T> Stream for Empty<T> {
98 type Item = T;
99
100 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
101 Poll::Ready(None)
102 }
103
104 fn size_hint(&self) -> (usize, Option<usize>) {
105 (0, Some(0))
106 }
107}
108
109pub fn iter<I: IntoIterator>(iter: I) -> Iter<I::IntoIter> {
125 Iter {
126 iter: iter.into_iter(),
127 }
128}
129
130#[derive(Clone, Debug)]
132#[must_use = "streams do nothing unless polled"]
133pub struct Iter<I> {
134 iter: I,
135}
136
137impl<I> Unpin for Iter<I> {}
138
139impl<I: Iterator> Stream for Iter<I> {
140 type Item = I::Item;
141
142 fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
143 Poll::Ready(self.iter.next())
144 }
145
146 fn size_hint(&self) -> (usize, Option<usize>) {
147 self.iter.size_hint()
148 }
149}
150
151pub fn once<T>(t: T) -> Once<T> {
166 Once { value: Some(t) }
167}
168
169pin_project! {
170 #[derive(Clone, Debug)]
172 #[must_use = "streams do nothing unless polled"]
173 pub struct Once<T> {
174 value: Option<T>,
175 }
176}
177
178impl<T> Stream for Once<T> {
179 type Item = T;
180
181 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
182 Poll::Ready(self.project().value.take())
183 }
184
185 fn size_hint(&self) -> (usize, Option<usize>) {
186 if self.value.is_some() {
187 (1, Some(1))
188 } else {
189 (0, Some(0))
190 }
191 }
192}
193
194pub fn pending<T>() -> Pending<T> {
208 Pending {
209 _marker: PhantomData,
210 }
211}
212
213#[derive(Clone, Debug)]
215#[must_use = "streams do nothing unless polled"]
216pub struct Pending<T> {
217 _marker: PhantomData<T>,
218}
219
220impl<T> Unpin for Pending<T> {}
221
222impl<T> Stream for Pending<T> {
223 type Item = T;
224
225 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
226 Poll::Pending
227 }
228
229 fn size_hint(&self) -> (usize, Option<usize>) {
230 (0, Some(0))
231 }
232}
233
234pub fn poll_fn<T, F>(f: F) -> PollFn<F>
251where
252 F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
253{
254 PollFn { f }
255}
256
257#[derive(Clone)]
259#[must_use = "streams do nothing unless polled"]
260pub struct PollFn<F> {
261 f: F,
262}
263
264impl<F> Unpin for PollFn<F> {}
265
266impl<F> fmt::Debug for PollFn<F> {
267 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
268 f.debug_struct("PollFn").finish()
269 }
270}
271
272impl<T, F> Stream for PollFn<F>
273where
274 F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
275{
276 type Item = T;
277
278 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
279 (&mut self.f)(cx)
280 }
281}
282
283pub fn repeat<T: Clone>(item: T) -> Repeat<T> {
298 Repeat { item }
299}
300
301#[derive(Clone, Debug)]
303#[must_use = "streams do nothing unless polled"]
304pub struct Repeat<T> {
305 item: T,
306}
307
308impl<T> Unpin for Repeat<T> {}
309
310impl<T: Clone> Stream for Repeat<T> {
311 type Item = T;
312
313 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
314 Poll::Ready(Some(self.item.clone()))
315 }
316
317 fn size_hint(&self) -> (usize, Option<usize>) {
318 (usize::max_value(), None)
319 }
320}
321
322pub fn repeat_with<T, F>(repeater: F) -> RepeatWith<F>
337where
338 F: FnMut() -> T,
339{
340 RepeatWith { f: repeater }
341}
342
343#[derive(Clone, Debug)]
345#[must_use = "streams do nothing unless polled"]
346pub struct RepeatWith<F> {
347 f: F,
348}
349
350impl<F> Unpin for RepeatWith<F> {}
351
352impl<T, F> Stream for RepeatWith<F>
353where
354 F: FnMut() -> T,
355{
356 type Item = T;
357
358 fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
359 let item = (&mut self.f)();
360 Poll::Ready(Some(item))
361 }
362
363 fn size_hint(&self) -> (usize, Option<usize>) {
364 (usize::max_value(), None)
365 }
366}
367
368pub fn unfold<T, F, Fut, Item>(seed: T, f: F) -> Unfold<T, F, Fut>
390where
391 F: FnMut(T) -> Fut,
392 Fut: Future<Output = Option<(Item, T)>>,
393{
394 Unfold {
395 f,
396 state: Some(seed),
397 fut: None,
398 }
399}
400
401pin_project! {
402 #[derive(Clone)]
404 #[must_use = "streams do nothing unless polled"]
405 pub struct Unfold<T, F, Fut> {
406 f: F,
407 state: Option<T>,
408 #[pin]
409 fut: Option<Fut>,
410 }
411}
412
413impl<T, F, Fut> fmt::Debug for Unfold<T, F, Fut>
414where
415 T: fmt::Debug,
416 Fut: fmt::Debug,
417{
418 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
419 f.debug_struct("Unfold")
420 .field("state", &self.state)
421 .field("fut", &self.fut)
422 .finish()
423 }
424}
425
426impl<T, F, Fut, Item> Stream for Unfold<T, F, Fut>
427where
428 F: FnMut(T) -> Fut,
429 Fut: Future<Output = Option<(Item, T)>>,
430{
431 type Item = Item;
432
433 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
434 let mut this = self.project();
435
436 if let Some(state) = this.state.take() {
437 this.fut.set(Some((this.f)(state)));
438 }
439
440 let step = ready!(this
441 .fut
442 .as_mut()
443 .as_pin_mut()
444 .expect("`Unfold` must not be polled after it returned `Poll::Ready(None)`")
445 .poll(cx));
446 this.fut.set(None);
447
448 if let Some((item, next_state)) = step {
449 *this.state = Some(next_state);
450 Poll::Ready(Some(item))
451 } else {
452 Poll::Ready(None)
453 }
454 }
455}
456
457pub fn try_unfold<T, E, F, Fut, Item>(init: T, f: F) -> TryUnfold<T, F, Fut>
479where
480 F: FnMut(T) -> Fut,
481 Fut: Future<Output = Result<Option<(Item, T)>, E>>,
482{
483 TryUnfold {
484 f,
485 state: Some(init),
486 fut: None,
487 }
488}
489
490pin_project! {
491 #[derive(Clone)]
493 #[must_use = "streams do nothing unless polled"]
494 pub struct TryUnfold<T, F, Fut> {
495 f: F,
496 state: Option<T>,
497 #[pin]
498 fut: Option<Fut>,
499 }
500}
501
502impl<T, F, Fut> fmt::Debug for TryUnfold<T, F, Fut>
503where
504 T: fmt::Debug,
505 Fut: fmt::Debug,
506{
507 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
508 f.debug_struct("TryUnfold")
509 .field("state", &self.state)
510 .field("fut", &self.fut)
511 .finish()
512 }
513}
514
515impl<T, E, F, Fut, Item> Stream for TryUnfold<T, F, Fut>
516where
517 F: FnMut(T) -> Fut,
518 Fut: Future<Output = Result<Option<(Item, T)>, E>>,
519{
520 type Item = Result<Item, E>;
521
522 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
523 let mut this = self.project();
524
525 if let Some(state) = this.state.take() {
526 this.fut.set(Some((this.f)(state)));
527 }
528
529 match this.fut.as_mut().as_pin_mut() {
530 None => {
531 Poll::Ready(None)
533 }
534 Some(future) => {
535 let step = ready!(future.poll(cx));
536 this.fut.set(None);
537
538 match step {
539 Ok(Some((item, next_state))) => {
540 *this.state = Some(next_state);
541 Poll::Ready(Some(Ok(item)))
542 }
543 Ok(None) => Poll::Ready(None),
544 Err(e) => Poll::Ready(Some(Err(e))),
545 }
546 }
547 }
548 }
549}
550
551pub trait StreamExt: Stream {
553 fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
555 where
556 Self: Unpin,
557 {
558 Stream::poll_next(Pin::new(self), cx)
559 }
560
561 fn next(&mut self) -> NextFuture<'_, Self>
581 where
582 Self: Unpin,
583 {
584 NextFuture { stream: self }
585 }
586
587 fn try_next<T, E>(&mut self) -> TryNextFuture<'_, Self>
609 where
610 Self: Stream<Item = Result<T, E>> + Unpin,
611 {
612 TryNextFuture { stream: self }
613 }
614
615 fn count(self) -> CountFuture<Self>
631 where
632 Self: Sized,
633 {
634 CountFuture {
635 stream: self,
636 count: 0,
637 }
638 }
639
640 fn map<T, F>(self, f: F) -> Map<Self, F>
658 where
659 Self: Sized,
660 F: FnMut(Self::Item) -> T,
661 {
662 Map { stream: self, f }
663 }
664
665 fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
684 where
685 Self: Sized,
686 U: Stream,
687 F: FnMut(Self::Item) -> U,
688 {
689 FlatMap {
690 stream: self.map(f),
691 inner_stream: None,
692 }
693 }
694
695 fn flatten(self) -> Flatten<Self>
712 where
713 Self: Sized,
714 Self::Item: Stream,
715 {
716 Flatten {
717 stream: self,
718 inner_stream: None,
719 }
720 }
721
722 fn then<F, Fut>(self, f: F) -> Then<Self, F, Fut>
742 where
743 Self: Sized,
744 F: FnMut(Self::Item) -> Fut,
745 Fut: Future,
746 {
747 Then {
748 stream: self,
749 future: None,
750 f,
751 }
752 }
753
754 fn filter<P>(self, predicate: P) -> Filter<Self, P>
771 where
772 Self: Sized,
773 P: FnMut(&Self::Item) -> bool,
774 {
775 Filter {
776 stream: self,
777 predicate,
778 }
779 }
780
781 fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
799 where
800 Self: Sized,
801 F: FnMut(Self::Item) -> Option<T>,
802 {
803 FilterMap { stream: self, f }
804 }
805
806 fn take(self, n: usize) -> Take<Self>
822 where
823 Self: Sized,
824 {
825 Take { stream: self, n }
826 }
827
828 fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
845 where
846 Self: Sized,
847 P: FnMut(&Self::Item) -> bool,
848 {
849 TakeWhile {
850 stream: self,
851 predicate,
852 }
853 }
854
855 fn skip(self, n: usize) -> Skip<Self>
871 where
872 Self: Sized,
873 {
874 Skip { stream: self, n }
875 }
876
877 fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
894 where
895 Self: Sized,
896 P: FnMut(&Self::Item) -> bool,
897 {
898 SkipWhile {
899 stream: self,
900 predicate: Some(predicate),
901 }
902 }
903
904 fn step_by(self, step: usize) -> StepBy<Self>
926 where
927 Self: Sized,
928 {
929 assert!(step > 0, "`step` must be greater than zero");
930 StepBy {
931 stream: self,
932 step,
933 i: 0,
934 }
935 }
936
937 fn chain<U>(self, other: U) -> Chain<Self, U>
957 where
958 Self: Sized,
959 U: Stream<Item = Self::Item> + Sized,
960 {
961 Chain {
962 first: self.fuse(),
963 second: other.fuse(),
964 }
965 }
966
967 fn cloned<'a, T>(self) -> Cloned<Self>
984 where
985 Self: Stream<Item = &'a T> + Sized,
986 T: Clone + 'a,
987 {
988 Cloned { stream: self }
989 }
990
991 fn copied<'a, T>(self) -> Copied<Self>
1008 where
1009 Self: Stream<Item = &'a T> + Sized,
1010 T: Copy + 'a,
1011 {
1012 Copied { stream: self }
1013 }
1014
1015 fn collect<C>(self) -> CollectFuture<Self, C>
1030 where
1031 Self: Sized,
1032 C: Default + Extend<Self::Item>,
1033 {
1034 CollectFuture {
1035 stream: self,
1036 collection: Default::default(),
1037 }
1038 }
1039
1040 fn try_collect<T, E, C>(self) -> TryCollectFuture<Self, C>
1056 where
1057 Self: Stream<Item = Result<T, E>> + Sized,
1058 C: Default + Extend<T>,
1059 {
1060 TryCollectFuture {
1061 stream: self,
1062 items: Default::default(),
1063 }
1064 }
1065
1066 fn partition<B, P>(self, predicate: P) -> PartitionFuture<Self, P, B>
1083 where
1084 Self: Sized,
1085 B: Default + Extend<Self::Item>,
1086 P: FnMut(&Self::Item) -> bool,
1087 {
1088 PartitionFuture {
1089 stream: self,
1090 predicate,
1091 res: Some(Default::default()),
1092 }
1093 }
1094
1095 fn fold<T, F>(self, init: T, f: F) -> FoldFuture<Self, F, T>
1113 where
1114 Self: Sized,
1115 F: FnMut(T, Self::Item) -> T,
1116 {
1117 FoldFuture {
1118 stream: self,
1119 f,
1120 acc: Some(init),
1121 }
1122 }
1123
1124 fn try_fold<T, E, F, B>(&mut self, init: B, f: F) -> TryFoldFuture<'_, Self, F, B>
1151 where
1152 Self: Stream<Item = Result<T, E>> + Unpin + Sized,
1153 F: FnMut(B, T) -> Result<B, E>,
1154 {
1155 TryFoldFuture {
1156 stream: self,
1157 f,
1158 acc: Some(init),
1159 }
1160 }
1161
1162 fn scan<St, B, F>(self, initial_state: St, f: F) -> Scan<Self, St, F>
1186 where
1187 Self: Sized,
1188 F: FnMut(&mut St, Self::Item) -> Option<B>,
1189 {
1190 Scan {
1191 stream: self,
1192 state_f: (initial_state, f),
1193 }
1194 }
1195
1196 fn fuse(self) -> Fuse<Self>
1212 where
1213 Self: Sized,
1214 {
1215 Fuse {
1216 stream: self,
1217 done: false,
1218 }
1219 }
1220
1221 fn cycle(self) -> Cycle<Self>
1238 where
1239 Self: Clone + Sized,
1240 {
1241 Cycle {
1242 orig: self.clone(),
1243 stream: self,
1244 }
1245 }
1246
1247 fn enumerate(self) -> Enumerate<Self>
1265 where
1266 Self: Sized,
1267 {
1268 Enumerate { stream: self, i: 0 }
1269 }
1270
1271 fn inspect<F>(self, f: F) -> Inspect<Self, F>
1290 where
1291 Self: Sized,
1292 F: FnMut(&Self::Item),
1293 {
1294 Inspect { stream: self, f }
1295 }
1296
1297 fn nth(&mut self, n: usize) -> NthFuture<'_, Self>
1315 where
1316 Self: Unpin,
1317 {
1318 NthFuture { stream: self, n }
1319 }
1320
1321 fn last(self) -> LastFuture<Self>
1337 where
1338 Self: Sized,
1339 {
1340 LastFuture {
1341 stream: self,
1342 last: None,
1343 }
1344 }
1345
1346 fn find<P>(&mut self, predicate: P) -> FindFuture<'_, Self, P>
1361 where
1362 Self: Unpin,
1363 P: FnMut(&Self::Item) -> bool,
1364 {
1365 FindFuture {
1366 stream: self,
1367 predicate,
1368 }
1369 }
1370
1371 fn find_map<F, B>(&mut self, f: F) -> FindMapFuture<'_, Self, F>
1386 where
1387 Self: Unpin,
1388 F: FnMut(Self::Item) -> Option<B>,
1389 {
1390 FindMapFuture { stream: self, f }
1391 }
1392
1393 fn position<P>(&mut self, predicate: P) -> PositionFuture<'_, Self, P>
1409 where
1410 Self: Unpin,
1411 P: FnMut(Self::Item) -> bool,
1412 {
1413 PositionFuture {
1414 stream: self,
1415 predicate,
1416 index: 0,
1417 }
1418 }
1419
1420 fn all<P>(&mut self, predicate: P) -> AllFuture<'_, Self, P>
1441 where
1442 Self: Unpin,
1443 P: FnMut(Self::Item) -> bool,
1444 {
1445 AllFuture {
1446 stream: self,
1447 predicate,
1448 }
1449 }
1450
1451 fn any<P>(&mut self, predicate: P) -> AnyFuture<'_, Self, P>
1472 where
1473 Self: Unpin,
1474 P: FnMut(Self::Item) -> bool,
1475 {
1476 AnyFuture {
1477 stream: self,
1478 predicate,
1479 }
1480 }
1481
1482 fn for_each<F>(self, f: F) -> ForEachFuture<Self, F>
1495 where
1496 Self: Sized,
1497 F: FnMut(Self::Item),
1498 {
1499 ForEachFuture { stream: self, f }
1500 }
1501
1502 fn try_for_each<F, E>(&mut self, f: F) -> TryForEachFuture<'_, Self, F>
1529 where
1530 Self: Unpin,
1531 F: FnMut(Self::Item) -> Result<(), E>,
1532 {
1533 TryForEachFuture { stream: self, f }
1534 }
1535
1536 fn zip<U>(self, other: U) -> Zip<Self, U>
1557 where
1558 Self: Sized,
1559 U: Stream,
1560 {
1561 Zip {
1562 item_slot: None,
1563 first: self,
1564 second: other,
1565 }
1566 }
1567
1568 fn unzip<A, B, FromA, FromB>(self) -> UnzipFuture<Self, FromA, FromB>
1584 where
1585 FromA: Default + Extend<A>,
1586 FromB: Default + Extend<B>,
1587 Self: Stream<Item = (A, B)> + Sized,
1588 {
1589 UnzipFuture {
1590 stream: self,
1591 res: Some(Default::default()),
1592 }
1593 }
1594
1595 fn or<S>(self, other: S) -> Or<Self, S>
1612 where
1613 Self: Sized,
1614 S: Stream<Item = Self::Item>,
1615 {
1616 Or {
1617 stream1: self,
1618 stream2: other,
1619 }
1620 }
1621
1622 #[cfg(feature = "std")]
1639 fn race<S>(self, other: S) -> Race<Self, S>
1640 where
1641 Self: Sized,
1642 S: Stream<Item = Self::Item>,
1643 {
1644 Race {
1645 stream1: self,
1646 stream2: other,
1647 }
1648 }
1649
1650 #[cfg(feature = "alloc")]
1667 fn boxed<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + Send + 'a>>
1668 where
1669 Self: Send + Sized + 'a,
1670 {
1671 Box::pin(self)
1672 }
1673
1674 #[cfg(feature = "alloc")]
1691 fn boxed_local<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a>>
1692 where
1693 Self: Sized + 'a,
1694 {
1695 Box::pin(self)
1696 }
1697}
1698
1699impl<S: Stream + ?Sized> StreamExt for S {}
1700
1701#[cfg(feature = "alloc")]
1713pub type Boxed<T> = Pin<Box<dyn Stream<Item = T> + Send + 'static>>;
1714
1715#[cfg(feature = "alloc")]
1727pub type BoxedLocal<T> = Pin<Box<dyn Stream<Item = T> + 'static>>;
1728
1729#[derive(Debug)]
1731#[must_use = "futures do nothing unless you `.await` or poll them"]
1732pub struct NextFuture<'a, S: ?Sized> {
1733 stream: &'a mut S,
1734}
1735
1736impl<S: Unpin + ?Sized> Unpin for NextFuture<'_, S> {}
1737
1738impl<S: Stream + Unpin + ?Sized> Future for NextFuture<'_, S> {
1739 type Output = Option<S::Item>;
1740
1741 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1742 self.stream.poll_next(cx)
1743 }
1744}
1745
1746#[derive(Debug)]
1748#[must_use = "futures do nothing unless you `.await` or poll them"]
1749pub struct TryNextFuture<'a, S: ?Sized> {
1750 stream: &'a mut S,
1751}
1752
1753impl<S: Unpin + ?Sized> Unpin for TryNextFuture<'_, S> {}
1754
1755impl<T, E, S> Future for TryNextFuture<'_, S>
1756where
1757 S: Stream<Item = Result<T, E>> + Unpin + ?Sized,
1758{
1759 type Output = Result<Option<T>, E>;
1760
1761 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1762 let res = ready!(self.stream.poll_next(cx));
1763 Poll::Ready(res.transpose())
1764 }
1765}
1766
1767pin_project! {
1768 #[derive(Debug)]
1770 #[must_use = "futures do nothing unless you `.await` or poll them"]
1771 pub struct CountFuture<S: ?Sized> {
1772 count: usize,
1773 #[pin]
1774 stream: S,
1775 }
1776}
1777
1778impl<S: Stream + ?Sized> Future for CountFuture<S> {
1779 type Output = usize;
1780
1781 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1782 loop {
1783 match ready!(self.as_mut().project().stream.poll_next(cx)) {
1784 None => return Poll::Ready(self.count),
1785 Some(_) => *self.as_mut().project().count += 1,
1786 }
1787 }
1788 }
1789}
1790
1791pin_project! {
1792 #[derive(Debug)]
1794 #[must_use = "futures do nothing unless you `.await` or poll them"]
1795 pub struct CollectFuture<S, C> {
1796 #[pin]
1797 stream: S,
1798 collection: C,
1799 }
1800}
1801
1802impl<S, C> Future for CollectFuture<S, C>
1803where
1804 S: Stream,
1805 C: Default + Extend<S::Item>,
1806{
1807 type Output = C;
1808
1809 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C> {
1810 let mut this = self.as_mut().project();
1811 loop {
1812 match ready!(this.stream.as_mut().poll_next(cx)) {
1813 Some(e) => this.collection.extend(Some(e)),
1814 None => {
1815 return Poll::Ready(mem::replace(self.project().collection, Default::default()))
1816 }
1817 }
1818 }
1819 }
1820}
1821
1822pin_project! {
1823 #[derive(Debug)]
1825 #[must_use = "futures do nothing unless you `.await` or poll them"]
1826 pub struct TryCollectFuture<S, C> {
1827 #[pin]
1828 stream: S,
1829 items: C,
1830 }
1831}
1832
1833impl<T, E, S, C> Future for TryCollectFuture<S, C>
1834where
1835 S: Stream<Item = Result<T, E>>,
1836 C: Default + Extend<T>,
1837{
1838 type Output = Result<C, E>;
1839
1840 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1841 let mut this = self.project();
1842 Poll::Ready(Ok(loop {
1843 match ready!(this.stream.as_mut().poll_next(cx)?) {
1844 Some(x) => this.items.extend(Some(x)),
1845 None => break mem::replace(this.items, Default::default()),
1846 }
1847 }))
1848 }
1849}
1850
1851pin_project! {
1852 #[derive(Debug)]
1854 #[must_use = "futures do nothing unless you `.await` or poll them"]
1855 pub struct PartitionFuture<S, P, B> {
1856 #[pin]
1857 stream: S,
1858 predicate: P,
1859 res: Option<(B, B)>,
1860 }
1861}
1862
1863impl<S, P, B> Future for PartitionFuture<S, P, B>
1864where
1865 S: Stream + Sized,
1866 P: FnMut(&S::Item) -> bool,
1867 B: Default + Extend<S::Item>,
1868{
1869 type Output = (B, B);
1870
1871 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1872 let mut this = self.project();
1873 loop {
1874 match ready!(this.stream.as_mut().poll_next(cx)) {
1875 Some(v) => {
1876 let res = this.res.as_mut().unwrap();
1877 if (this.predicate)(&v) {
1878 res.0.extend(Some(v))
1879 } else {
1880 res.1.extend(Some(v))
1881 }
1882 }
1883 None => return Poll::Ready(this.res.take().unwrap()),
1884 }
1885 }
1886 }
1887}
1888
1889pin_project! {
1890 #[derive(Debug)]
1892 #[must_use = "futures do nothing unless you `.await` or poll them"]
1893 pub struct FoldFuture<S, F, T> {
1894 #[pin]
1895 stream: S,
1896 f: F,
1897 acc: Option<T>,
1898 }
1899}
1900
1901impl<S, F, T> Future for FoldFuture<S, F, T>
1902where
1903 S: Stream,
1904 F: FnMut(T, S::Item) -> T,
1905{
1906 type Output = T;
1907
1908 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1909 let mut this = self.project();
1910 loop {
1911 match ready!(this.stream.as_mut().poll_next(cx)) {
1912 Some(v) => {
1913 let old = this.acc.take().unwrap();
1914 let new = (this.f)(old, v);
1915 *this.acc = Some(new);
1916 }
1917 None => return Poll::Ready(this.acc.take().unwrap()),
1918 }
1919 }
1920 }
1921}
1922
1923#[derive(Debug)]
1925#[must_use = "futures do nothing unless you `.await` or poll them"]
1926pub struct TryFoldFuture<'a, S, F, B> {
1927 stream: &'a mut S,
1928 f: F,
1929 acc: Option<B>,
1930}
1931
1932impl<'a, S, F, B> Unpin for TryFoldFuture<'a, S, F, B> {}
1933
1934impl<'a, T, E, S, F, B> Future for TryFoldFuture<'a, S, F, B>
1935where
1936 S: Stream<Item = Result<T, E>> + Unpin,
1937 F: FnMut(B, T) -> Result<B, E>,
1938{
1939 type Output = Result<B, E>;
1940
1941 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1942 loop {
1943 match ready!(self.stream.poll_next(cx)) {
1944 Some(Err(e)) => return Poll::Ready(Err(e)),
1945 Some(Ok(t)) => {
1946 let old = self.acc.take().unwrap();
1947 let new = (&mut self.f)(old, t);
1948
1949 match new {
1950 Ok(t) => self.acc = Some(t),
1951 Err(e) => return Poll::Ready(Err(e)),
1952 }
1953 }
1954 None => return Poll::Ready(Ok(self.acc.take().unwrap())),
1955 }
1956 }
1957 }
1958}
1959
1960pin_project! {
1961 #[derive(Clone, Debug)]
1963 #[must_use = "streams do nothing unless polled"]
1964 pub struct Scan<S, St, F> {
1965 #[pin]
1966 stream: S,
1967 state_f: (St, F),
1968 }
1969}
1970
1971impl<S, St, F, B> Stream for Scan<S, St, F>
1972where
1973 S: Stream,
1974 F: FnMut(&mut St, S::Item) -> Option<B>,
1975{
1976 type Item = B;
1977
1978 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<B>> {
1979 let mut this = self.project();
1980 this.stream.as_mut().poll_next(cx).map(|item| {
1981 item.and_then(|item| {
1982 let (state, f) = this.state_f;
1983 f(state, item)
1984 })
1985 })
1986 }
1987}
1988
1989pin_project! {
1990 #[derive(Clone, Debug)]
1992 #[must_use = "streams do nothing unless polled"]
1993 pub struct Fuse<S> {
1994 #[pin]
1995 stream: S,
1996 done: bool,
1997 }
1998}
1999
2000impl<S: Stream> Stream for Fuse<S> {
2001 type Item = S::Item;
2002
2003 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
2004 let this = self.project();
2005
2006 if *this.done {
2007 Poll::Ready(None)
2008 } else {
2009 let next = ready!(this.stream.poll_next(cx));
2010 if next.is_none() {
2011 *this.done = true;
2012 }
2013 Poll::Ready(next)
2014 }
2015 }
2016}
2017
2018pin_project! {
2019 #[derive(Clone, Debug)]
2021 #[must_use = "streams do nothing unless polled"]
2022 pub struct Map<S, F> {
2023 #[pin]
2024 stream: S,
2025 f: F,
2026 }
2027}
2028
2029impl<S, F, T> Stream for Map<S, F>
2030where
2031 S: Stream,
2032 F: FnMut(S::Item) -> T,
2033{
2034 type Item = T;
2035
2036 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2037 let this = self.project();
2038 let next = ready!(this.stream.poll_next(cx));
2039 Poll::Ready(next.map(this.f))
2040 }
2041
2042 fn size_hint(&self) -> (usize, Option<usize>) {
2043 self.stream.size_hint()
2044 }
2045}
2046
2047pin_project! {
2048 #[derive(Clone, Debug)]
2050 #[must_use = "streams do nothing unless polled"]
2051 pub struct FlatMap<S, U, F> {
2052 #[pin]
2053 stream: Map<S, F>,
2054 #[pin]
2055 inner_stream: Option<U>,
2056 }
2057}
2058
2059impl<S, U, F> Stream for FlatMap<S, U, F>
2060where
2061 S: Stream,
2062 U: Stream,
2063 F: FnMut(S::Item) -> U,
2064{
2065 type Item = U::Item;
2066
2067 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2068 let mut this = self.project();
2069 loop {
2070 if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
2071 match ready!(inner.poll_next(cx)) {
2072 Some(item) => return Poll::Ready(Some(item)),
2073 None => this.inner_stream.set(None),
2074 }
2075 }
2076
2077 match ready!(this.stream.as_mut().poll_next(cx)) {
2078 Some(stream) => this.inner_stream.set(Some(stream)),
2079 None => return Poll::Ready(None),
2080 }
2081 }
2082 }
2083}
2084
2085pin_project! {
2086 #[derive(Clone, Debug)]
2088 #[must_use = "streams do nothing unless polled"]
2089 pub struct Flatten<S: Stream> {
2090 #[pin]
2091 stream: S,
2092 #[pin]
2093 inner_stream: Option<S::Item>,
2094 }
2095}
2096
2097impl<S, U> Stream for Flatten<S>
2098where
2099 S: Stream<Item = U>,
2100 U: Stream,
2101{
2102 type Item = U::Item;
2103
2104 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2105 let mut this = self.project();
2106 loop {
2107 if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
2108 match ready!(inner.poll_next(cx)) {
2109 Some(item) => return Poll::Ready(Some(item)),
2110 None => this.inner_stream.set(None),
2111 }
2112 }
2113
2114 match ready!(this.stream.as_mut().poll_next(cx)) {
2115 Some(inner) => this.inner_stream.set(Some(inner)),
2116 None => return Poll::Ready(None),
2117 }
2118 }
2119 }
2120}
2121
2122pin_project! {
2123 #[derive(Clone, Debug)]
2125 #[must_use = "streams do nothing unless polled"]
2126 pub struct Then<S, F, Fut> {
2127 #[pin]
2128 stream: S,
2129 #[pin]
2130 future: Option<Fut>,
2131 f: F,
2132 }
2133}
2134
2135impl<S, F, Fut> Stream for Then<S, F, Fut>
2136where
2137 S: Stream,
2138 F: FnMut(S::Item) -> Fut,
2139 Fut: Future,
2140{
2141 type Item = Fut::Output;
2142
2143 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2144 let mut this = self.project();
2145
2146 loop {
2147 if let Some(fut) = this.future.as_mut().as_pin_mut() {
2148 let item = ready!(fut.poll(cx));
2149 this.future.set(None);
2150 return Poll::Ready(Some(item));
2151 } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
2152 this.future.set(Some((this.f)(item)));
2153 } else {
2154 return Poll::Ready(None);
2155 }
2156 }
2157 }
2158
2159 fn size_hint(&self) -> (usize, Option<usize>) {
2160 let future_len = if self.future.is_some() { 1 } else { 0 };
2161 let (lower, upper) = self.stream.size_hint();
2162 let lower = lower.saturating_add(future_len);
2163 let upper = upper.and_then(|u| u.checked_add(future_len));
2164 (lower, upper)
2165 }
2166}
2167
2168pin_project! {
2169 #[derive(Clone, Debug)]
2171 #[must_use = "streams do nothing unless polled"]
2172 pub struct Filter<S, P> {
2173 #[pin]
2174 stream: S,
2175 predicate: P,
2176 }
2177}
2178
2179impl<S, P> Stream for Filter<S, P>
2180where
2181 S: Stream,
2182 P: FnMut(&S::Item) -> bool,
2183{
2184 type Item = S::Item;
2185
2186 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2187 let mut this = self.project();
2188 loop {
2189 match ready!(this.stream.as_mut().poll_next(cx)) {
2190 None => return Poll::Ready(None),
2191 Some(v) if (this.predicate)(&v) => return Poll::Ready(Some(v)),
2192 Some(_) => {}
2193 }
2194 }
2195 }
2196}
2197
2198pub fn or<T, S1, S2>(stream1: S1, stream2: S2) -> Or<S1, S2>
2214where
2215 S1: Stream<Item = T>,
2216 S2: Stream<Item = T>,
2217{
2218 Or { stream1, stream2 }
2219}
2220
2221pin_project! {
2222 #[derive(Clone, Debug)]
2224 #[must_use = "streams do nothing unless polled"]
2225 pub struct Or<S1, S2> {
2226 #[pin]
2227 stream1: S1,
2228 #[pin]
2229 stream2: S2,
2230 }
2231}
2232
2233impl<T, S1, S2> Stream for Or<S1, S2>
2234where
2235 S1: Stream<Item = T>,
2236 S2: Stream<Item = T>,
2237{
2238 type Item = T;
2239
2240 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2241 let mut this = self.project();
2242
2243 if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2244 return Poll::Ready(Some(t));
2245 }
2246 this.stream2.as_mut().poll_next(cx)
2247 }
2248}
2249
2250#[cfg(feature = "std")]
2265pub fn race<T, S1, S2>(stream1: S1, stream2: S2) -> Race<S1, S2>
2266where
2267 S1: Stream<Item = T>,
2268 S2: Stream<Item = T>,
2269{
2270 Race { stream1, stream2 }
2271}
2272
2273#[cfg(feature = "std")]
2274pin_project! {
2275 #[derive(Clone, Debug)]
2277 #[must_use = "streams do nothing unless polled"]
2278 pub struct Race<S1, S2> {
2279 #[pin]
2280 stream1: S1,
2281 #[pin]
2282 stream2: S2,
2283 }
2284}
2285
2286#[cfg(feature = "std")]
2287impl<T, S1, S2> Stream for Race<S1, S2>
2288where
2289 S1: Stream<Item = T>,
2290 S2: Stream<Item = T>,
2291{
2292 type Item = T;
2293
2294 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2295 let mut this = self.project();
2296
2297 if fastrand::bool() {
2298 if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2299 return Poll::Ready(Some(t));
2300 }
2301 if let Poll::Ready(Some(t)) = this.stream2.as_mut().poll_next(cx) {
2302 return Poll::Ready(Some(t));
2303 }
2304 } else {
2305 if let Poll::Ready(Some(t)) = this.stream2.as_mut().poll_next(cx) {
2306 return Poll::Ready(Some(t));
2307 }
2308 if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2309 return Poll::Ready(Some(t));
2310 }
2311 }
2312 Poll::Pending
2313 }
2314}
2315
2316pin_project! {
2317 #[derive(Clone, Debug)]
2319 #[must_use = "streams do nothing unless polled"]
2320 pub struct FilterMap<S, F> {
2321 #[pin]
2322 stream: S,
2323 f: F,
2324 }
2325}
2326
2327impl<S, F, T> Stream for FilterMap<S, F>
2328where
2329 S: Stream,
2330 F: FnMut(S::Item) -> Option<T>,
2331{
2332 type Item = T;
2333
2334 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2335 let mut this = self.project();
2336 loop {
2337 match ready!(this.stream.as_mut().poll_next(cx)) {
2338 None => return Poll::Ready(None),
2339 Some(v) => {
2340 if let Some(t) = (this.f)(v) {
2341 return Poll::Ready(Some(t));
2342 }
2343 }
2344 }
2345 }
2346 }
2347}
2348
2349pin_project! {
2350 #[derive(Clone, Debug)]
2352 #[must_use = "streams do nothing unless polled"]
2353 pub struct Take<S> {
2354 #[pin]
2355 stream: S,
2356 n: usize,
2357 }
2358}
2359
2360impl<S: Stream> Stream for Take<S> {
2361 type Item = S::Item;
2362
2363 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
2364 let this = self.project();
2365
2366 if *this.n == 0 {
2367 Poll::Ready(None)
2368 } else {
2369 let next = ready!(this.stream.poll_next(cx));
2370 match next {
2371 Some(_) => *this.n -= 1,
2372 None => *this.n = 0,
2373 }
2374 Poll::Ready(next)
2375 }
2376 }
2377}
2378
2379pin_project! {
2380 #[derive(Clone, Debug)]
2382 #[must_use = "streams do nothing unless polled"]
2383 pub struct TakeWhile<S, P> {
2384 #[pin]
2385 stream: S,
2386 predicate: P,
2387 }
2388}
2389
2390impl<S, P> Stream for TakeWhile<S, P>
2391where
2392 S: Stream,
2393 P: FnMut(&S::Item) -> bool,
2394{
2395 type Item = S::Item;
2396
2397 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2398 let this = self.project();
2399
2400 match ready!(this.stream.poll_next(cx)) {
2401 Some(v) => {
2402 if (this.predicate)(&v) {
2403 Poll::Ready(Some(v))
2404 } else {
2405 Poll::Ready(None)
2406 }
2407 }
2408 None => Poll::Ready(None),
2409 }
2410 }
2411}
2412
2413pin_project! {
2414 #[derive(Clone, Debug)]
2416 #[must_use = "streams do nothing unless polled"]
2417 pub struct Skip<S> {
2418 #[pin]
2419 stream: S,
2420 n: usize,
2421 }
2422}
2423
2424impl<S: Stream> Stream for Skip<S> {
2425 type Item = S::Item;
2426
2427 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2428 let mut this = self.project();
2429 loop {
2430 match ready!(this.stream.as_mut().poll_next(cx)) {
2431 Some(v) => match *this.n {
2432 0 => return Poll::Ready(Some(v)),
2433 _ => *this.n -= 1,
2434 },
2435 None => return Poll::Ready(None),
2436 }
2437 }
2438 }
2439}
2440
2441pin_project! {
2442 #[derive(Clone, Debug)]
2444 #[must_use = "streams do nothing unless polled"]
2445 pub struct SkipWhile<S, P> {
2446 #[pin]
2447 stream: S,
2448 predicate: Option<P>,
2449 }
2450}
2451
2452impl<S, P> Stream for SkipWhile<S, P>
2453where
2454 S: Stream,
2455 P: FnMut(&S::Item) -> bool,
2456{
2457 type Item = S::Item;
2458
2459 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2460 let mut this = self.project();
2461 loop {
2462 match ready!(this.stream.as_mut().poll_next(cx)) {
2463 Some(v) => match this.predicate {
2464 Some(p) => {
2465 if !p(&v) {
2466 *this.predicate = None;
2467 return Poll::Ready(Some(v));
2468 }
2469 }
2470 None => return Poll::Ready(Some(v)),
2471 },
2472 None => return Poll::Ready(None),
2473 }
2474 }
2475 }
2476}
2477
2478pin_project! {
2479 #[derive(Clone, Debug)]
2481 #[must_use = "streams do nothing unless polled"]
2482 pub struct StepBy<S> {
2483 #[pin]
2484 stream: S,
2485 step: usize,
2486 i: usize,
2487 }
2488}
2489
2490impl<S: Stream> Stream for StepBy<S> {
2491 type Item = S::Item;
2492
2493 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2494 let mut this = self.project();
2495 loop {
2496 match ready!(this.stream.as_mut().poll_next(cx)) {
2497 Some(v) => {
2498 if *this.i == 0 {
2499 *this.i = *this.step - 1;
2500 return Poll::Ready(Some(v));
2501 } else {
2502 *this.i -= 1;
2503 }
2504 }
2505 None => return Poll::Ready(None),
2506 }
2507 }
2508 }
2509}
2510
2511pin_project! {
2512 #[derive(Clone, Debug)]
2514 #[must_use = "streams do nothing unless polled"]
2515 pub struct Chain<S, U> {
2516 #[pin]
2517 first: Fuse<S>,
2518 #[pin]
2519 second: Fuse<U>,
2520 }
2521}
2522
2523impl<S: Stream, U: Stream<Item = S::Item>> Stream for Chain<S, U> {
2524 type Item = S::Item;
2525
2526 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2527 let mut this = self.project();
2528
2529 if !this.first.done {
2530 let next = ready!(this.first.as_mut().poll_next(cx));
2531 if let Some(next) = next {
2532 return Poll::Ready(Some(next));
2533 }
2534 }
2535
2536 if !this.second.done {
2537 let next = ready!(this.second.as_mut().poll_next(cx));
2538 if let Some(next) = next {
2539 return Poll::Ready(Some(next));
2540 }
2541 }
2542
2543 if this.first.done && this.second.done {
2544 Poll::Ready(None)
2545 } else {
2546 Poll::Pending
2547 }
2548 }
2549}
2550
2551pin_project! {
2552 #[derive(Clone, Debug)]
2554 #[must_use = "streams do nothing unless polled"]
2555 pub struct Cloned<S> {
2556 #[pin]
2557 stream: S,
2558 }
2559}
2560
2561impl<'a, S, T: 'a> Stream for Cloned<S>
2562where
2563 S: Stream<Item = &'a T>,
2564 T: Clone,
2565{
2566 type Item = T;
2567
2568 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2569 let this = self.project();
2570 let next = ready!(this.stream.poll_next(cx));
2571 Poll::Ready(next.cloned())
2572 }
2573}
2574
2575pin_project! {
2576 #[derive(Clone, Debug)]
2578 #[must_use = "streams do nothing unless polled"]
2579 pub struct Copied<S> {
2580 #[pin]
2581 stream: S,
2582 }
2583}
2584
2585impl<'a, S, T: 'a> Stream for Copied<S>
2586where
2587 S: Stream<Item = &'a T>,
2588 T: Copy,
2589{
2590 type Item = T;
2591
2592 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2593 let this = self.project();
2594 let next = ready!(this.stream.poll_next(cx));
2595 Poll::Ready(next.copied())
2596 }
2597}
2598
2599pin_project! {
2600 #[derive(Clone, Debug)]
2602 #[must_use = "streams do nothing unless polled"]
2603 pub struct Cycle<S> {
2604 orig: S,
2605 #[pin]
2606 stream: S,
2607 }
2608}
2609
2610impl<S> Stream for Cycle<S>
2611where
2612 S: Stream + Clone,
2613{
2614 type Item = S::Item;
2615
2616 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2617 match ready!(self.as_mut().project().stream.as_mut().poll_next(cx)) {
2618 Some(item) => Poll::Ready(Some(item)),
2619 None => {
2620 let new = self.as_mut().orig.clone();
2621 self.as_mut().project().stream.set(new);
2622 self.project().stream.poll_next(cx)
2623 }
2624 }
2625 }
2626}
2627
2628pin_project! {
2629 #[derive(Clone, Debug)]
2631 #[must_use = "streams do nothing unless polled"]
2632 pub struct Enumerate<S> {
2633 #[pin]
2634 stream: S,
2635 i: usize,
2636 }
2637}
2638
2639impl<S> Stream for Enumerate<S>
2640where
2641 S: Stream,
2642{
2643 type Item = (usize, S::Item);
2644
2645 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2646 let this = self.project();
2647
2648 match ready!(this.stream.poll_next(cx)) {
2649 Some(v) => {
2650 let ret = (*this.i, v);
2651 *this.i += 1;
2652 Poll::Ready(Some(ret))
2653 }
2654 None => Poll::Ready(None),
2655 }
2656 }
2657}
2658
2659pin_project! {
2660 #[derive(Clone, Debug)]
2662 #[must_use = "streams do nothing unless polled"]
2663 pub struct Inspect<S, F> {
2664 #[pin]
2665 stream: S,
2666 f: F,
2667 }
2668}
2669
2670impl<S, F> Stream for Inspect<S, F>
2671where
2672 S: Stream,
2673 F: FnMut(&S::Item),
2674{
2675 type Item = S::Item;
2676
2677 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2678 let mut this = self.project();
2679 let next = ready!(this.stream.as_mut().poll_next(cx));
2680 if let Some(x) = &next {
2681 (this.f)(x);
2682 }
2683 Poll::Ready(next)
2684 }
2685}
2686
2687#[derive(Debug)]
2689#[must_use = "futures do nothing unless you `.await` or poll them"]
2690pub struct NthFuture<'a, S: ?Sized> {
2691 stream: &'a mut S,
2692 n: usize,
2693}
2694
2695impl<S: Unpin + ?Sized> Unpin for NthFuture<'_, S> {}
2696
2697impl<'a, S> Future for NthFuture<'a, S>
2698where
2699 S: Stream + Unpin + ?Sized,
2700{
2701 type Output = Option<S::Item>;
2702
2703 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2704 loop {
2705 match ready!(self.stream.poll_next(cx)) {
2706 Some(v) => match self.n {
2707 0 => return Poll::Ready(Some(v)),
2708 _ => self.n -= 1,
2709 },
2710 None => return Poll::Ready(None),
2711 }
2712 }
2713 }
2714}
2715
2716pin_project! {
2717 #[derive(Debug)]
2719 #[must_use = "futures do nothing unless you `.await` or poll them"]
2720 pub struct LastFuture<S: Stream> {
2721 #[pin]
2722 stream: S,
2723 last: Option<S::Item>,
2724 }
2725}
2726
2727impl<S: Stream> Future for LastFuture<S> {
2728 type Output = Option<S::Item>;
2729
2730 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2731 let mut this = self.project();
2732 loop {
2733 match ready!(this.stream.as_mut().poll_next(cx)) {
2734 Some(new) => *this.last = Some(new),
2735 None => return Poll::Ready(this.last.take()),
2736 }
2737 }
2738 }
2739}
2740
2741#[derive(Debug)]
2743#[must_use = "futures do nothing unless you `.await` or poll them"]
2744pub struct FindFuture<'a, S: ?Sized, P> {
2745 stream: &'a mut S,
2746 predicate: P,
2747}
2748
2749impl<S: Unpin + ?Sized, P> Unpin for FindFuture<'_, S, P> {}
2750
2751impl<'a, S, P> Future for FindFuture<'a, S, P>
2752where
2753 S: Stream + Unpin + ?Sized,
2754 P: FnMut(&S::Item) -> bool,
2755{
2756 type Output = Option<S::Item>;
2757
2758 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2759 loop {
2760 match ready!(self.stream.poll_next(cx)) {
2761 Some(v) if (&mut self.predicate)(&v) => return Poll::Ready(Some(v)),
2762 Some(_) => {}
2763 None => return Poll::Ready(None),
2764 }
2765 }
2766 }
2767}
2768
2769#[derive(Debug)]
2771#[must_use = "futures do nothing unless you `.await` or poll them"]
2772pub struct FindMapFuture<'a, S: ?Sized, F> {
2773 stream: &'a mut S,
2774 f: F,
2775}
2776
2777impl<S: Unpin + ?Sized, F> Unpin for FindMapFuture<'_, S, F> {}
2778
2779impl<'a, S, B, F> Future for FindMapFuture<'a, S, F>
2780where
2781 S: Stream + Unpin + ?Sized,
2782 F: FnMut(S::Item) -> Option<B>,
2783{
2784 type Output = Option<B>;
2785
2786 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2787 loop {
2788 match ready!(self.stream.poll_next(cx)) {
2789 Some(v) => {
2790 if let Some(v) = (&mut self.f)(v) {
2791 return Poll::Ready(Some(v));
2792 }
2793 }
2794 None => return Poll::Ready(None),
2795 }
2796 }
2797 }
2798}
2799
2800#[derive(Debug)]
2802#[must_use = "futures do nothing unless you `.await` or poll them"]
2803pub struct PositionFuture<'a, S: ?Sized, P> {
2804 stream: &'a mut S,
2805 predicate: P,
2806 index: usize,
2807}
2808
2809impl<'a, S: Unpin + ?Sized, P> Unpin for PositionFuture<'a, S, P> {}
2810
2811impl<'a, S, P> Future for PositionFuture<'a, S, P>
2812where
2813 S: Stream + Unpin + ?Sized,
2814 P: FnMut(S::Item) -> bool,
2815{
2816 type Output = Option<usize>;
2817
2818 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2819 loop {
2820 match ready!(self.stream.poll_next(cx)) {
2821 Some(v) => {
2822 if (&mut self.predicate)(v) {
2823 return Poll::Ready(Some(self.index));
2824 } else {
2825 self.index += 1;
2826 }
2827 }
2828 None => return Poll::Ready(None),
2829 }
2830 }
2831 }
2832}
2833
2834#[derive(Debug)]
2836#[must_use = "futures do nothing unless you `.await` or poll them"]
2837pub struct AllFuture<'a, S: ?Sized, P> {
2838 stream: &'a mut S,
2839 predicate: P,
2840}
2841
2842impl<S: Unpin + ?Sized, P> Unpin for AllFuture<'_, S, P> {}
2843
2844impl<S, P> Future for AllFuture<'_, S, P>
2845where
2846 S: Stream + Unpin + ?Sized,
2847 P: FnMut(S::Item) -> bool,
2848{
2849 type Output = bool;
2850
2851 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2852 loop {
2853 match ready!(self.stream.poll_next(cx)) {
2854 Some(v) => {
2855 if !(&mut self.predicate)(v) {
2856 return Poll::Ready(false);
2857 }
2858 }
2859 None => return Poll::Ready(true),
2860 }
2861 }
2862 }
2863}
2864
2865#[derive(Debug)]
2867#[must_use = "futures do nothing unless you `.await` or poll them"]
2868pub struct AnyFuture<'a, S: ?Sized, P> {
2869 stream: &'a mut S,
2870 predicate: P,
2871}
2872
2873impl<S: Unpin + ?Sized, P> Unpin for AnyFuture<'_, S, P> {}
2874
2875impl<S, P> Future for AnyFuture<'_, S, P>
2876where
2877 S: Stream + Unpin + ?Sized,
2878 P: FnMut(S::Item) -> bool,
2879{
2880 type Output = bool;
2881
2882 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2883 loop {
2884 match ready!(self.stream.poll_next(cx)) {
2885 Some(v) => {
2886 if (&mut self.predicate)(v) {
2887 return Poll::Ready(true);
2888 }
2889 }
2890 None => return Poll::Ready(false),
2891 }
2892 }
2893 }
2894}
2895
2896pin_project! {
2897 #[derive(Debug)]
2899 #[must_use = "futures do nothing unless you `.await` or poll them"]
2900 pub struct ForEachFuture<S, F> {
2901 #[pin]
2902 stream: S,
2903 f: F,
2904 }
2905}
2906
2907impl<S, F> Future for ForEachFuture<S, F>
2908where
2909 S: Stream,
2910 F: FnMut(S::Item),
2911{
2912 type Output = ();
2913
2914 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2915 let mut this = self.project();
2916 loop {
2917 match ready!(this.stream.as_mut().poll_next(cx)) {
2918 Some(v) => (this.f)(v),
2919 None => return Poll::Ready(()),
2920 }
2921 }
2922 }
2923}
2924
2925#[derive(Debug)]
2927#[must_use = "futures do nothing unless you `.await` or poll them"]
2928pub struct TryForEachFuture<'a, S: ?Sized, F> {
2929 stream: &'a mut S,
2930 f: F,
2931}
2932
2933impl<'a, S: Unpin + ?Sized, F> Unpin for TryForEachFuture<'a, S, F> {}
2934
2935impl<'a, S, F, E> Future for TryForEachFuture<'a, S, F>
2936where
2937 S: Stream + Unpin + ?Sized,
2938 F: FnMut(S::Item) -> Result<(), E>,
2939{
2940 type Output = Result<(), E>;
2941
2942 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2943 loop {
2944 match ready!(self.stream.poll_next(cx)) {
2945 None => return Poll::Ready(Ok(())),
2946 Some(v) => (&mut self.f)(v)?,
2947 }
2948 }
2949 }
2950}
2951
2952pin_project! {
2953 #[derive(Clone, Debug)]
2955 #[must_use = "streams do nothing unless polled"]
2956 pub struct Zip<A: Stream, B> {
2957 item_slot: Option<A::Item>,
2958 #[pin]
2959 first: A,
2960 #[pin]
2961 second: B,
2962 }
2963}
2964
2965impl<A: Stream, B: Stream> Stream for Zip<A, B> {
2966 type Item = (A::Item, B::Item);
2967
2968 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2969 let this = self.project();
2970
2971 if this.item_slot.is_none() {
2972 match this.first.poll_next(cx) {
2973 Poll::Pending => return Poll::Pending,
2974 Poll::Ready(None) => return Poll::Ready(None),
2975 Poll::Ready(Some(item)) => *this.item_slot = Some(item),
2976 }
2977 }
2978
2979 let second_item = ready!(this.second.poll_next(cx));
2980 let first_item = this.item_slot.take().unwrap();
2981 Poll::Ready(second_item.map(|second_item| (first_item, second_item)))
2982 }
2983}
2984
2985pin_project! {
2986 #[derive(Debug)]
2988 #[must_use = "futures do nothing unless you `.await` or poll them"]
2989 pub struct UnzipFuture<S, FromA, FromB> {
2990 #[pin]
2991 stream: S,
2992 res: Option<(FromA, FromB)>,
2993 }
2994}
2995
2996impl<S, A, B, FromA, FromB> Future for UnzipFuture<S, FromA, FromB>
2997where
2998 S: Stream<Item = (A, B)>,
2999 FromA: Default + Extend<A>,
3000 FromB: Default + Extend<B>,
3001{
3002 type Output = (FromA, FromB);
3003
3004 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3005 let mut this = self.project();
3006
3007 loop {
3008 match ready!(this.stream.as_mut().poll_next(cx)) {
3009 Some((a, b)) => {
3010 let res = this.res.as_mut().unwrap();
3011 res.0.extend(Some(a));
3012 res.1.extend(Some(b));
3013 }
3014 None => return Poll::Ready(this.res.take().unwrap()),
3015 }
3016 }
3017 }
3018}