futures_util/stream/try_stream/
mod.rs

1//! Streams
2//!
3//! This module contains a number of functions for working with `Streams`s
4//! that return `Result`s, allowing for short-circuiting computations.
5
6#[cfg(feature = "compat")]
7use crate::compat::Compat;
8use crate::fns::{
9    inspect_err_fn, inspect_ok_fn, into_fn, map_err_fn, map_ok_fn, InspectErrFn, InspectOkFn,
10    IntoFn, MapErrFn, MapOkFn,
11};
12use crate::future::assert_future;
13use crate::stream::assert_stream;
14use crate::stream::{Inspect, Map};
15#[cfg(feature = "alloc")]
16use alloc::vec::Vec;
17use core::pin::Pin;
18
19use futures_core::{
20    future::{Future, TryFuture},
21    stream::TryStream,
22    task::{Context, Poll},
23};
24
25mod and_then;
26#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
27pub use self::and_then::AndThen;
28
29delegate_all!(
30    /// Stream for the [`err_into`](super::TryStreamExt::err_into) method.
31    ErrInto<St, E>(
32        MapErr<St, IntoFn<E>>
33    ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St| MapErr::new(x, into_fn())]
34);
35
36delegate_all!(
37    /// Stream for the [`inspect_ok`](super::TryStreamExt::inspect_ok) method.
38    InspectOk<St, F>(
39        Inspect<IntoStream<St>, InspectOkFn<F>>
40    ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_ok_fn(f))]
41);
42
43delegate_all!(
44    /// Stream for the [`inspect_err`](super::TryStreamExt::inspect_err) method.
45    InspectErr<St, F>(
46        Inspect<IntoStream<St>, InspectErrFn<F>>
47    ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_err_fn(f))]
48);
49
50mod into_stream;
51#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
52pub use self::into_stream::IntoStream;
53
54delegate_all!(
55    /// Stream for the [`map_ok`](super::TryStreamExt::map_ok) method.
56    MapOk<St, F>(
57        Map<IntoStream<St>, MapOkFn<F>>
58    ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_ok_fn(f))]
59);
60
61delegate_all!(
62    /// Stream for the [`map_err`](super::TryStreamExt::map_err) method.
63    MapErr<St, F>(
64        Map<IntoStream<St>, MapErrFn<F>>
65    ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_err_fn(f))]
66);
67
68mod or_else;
69#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
70pub use self::or_else::OrElse;
71
72mod try_next;
73#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
74pub use self::try_next::TryNext;
75
76mod try_for_each;
77#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
78pub use self::try_for_each::TryForEach;
79
80mod try_filter;
81#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
82pub use self::try_filter::TryFilter;
83
84mod try_filter_map;
85#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
86pub use self::try_filter_map::TryFilterMap;
87
88mod try_flatten;
89#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
90pub use self::try_flatten::TryFlatten;
91
92#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
93#[cfg(feature = "alloc")]
94mod try_flatten_unordered;
95#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
96#[cfg(feature = "alloc")]
97#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
98pub use self::try_flatten_unordered::TryFlattenUnordered;
99
100mod try_collect;
101#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
102pub use self::try_collect::TryCollect;
103
104mod try_concat;
105#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
106pub use self::try_concat::TryConcat;
107
108#[cfg(feature = "alloc")]
109mod try_chunks;
110#[cfg(feature = "alloc")]
111#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
112pub use self::try_chunks::{TryChunks, TryChunksError};
113
114#[cfg(feature = "alloc")]
115mod try_ready_chunks;
116#[cfg(feature = "alloc")]
117#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
118pub use self::try_ready_chunks::{TryReadyChunks, TryReadyChunksError};
119
120mod try_fold;
121#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
122pub use self::try_fold::TryFold;
123
124mod try_unfold;
125#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
126pub use self::try_unfold::{try_unfold, TryUnfold};
127
128mod try_skip_while;
129#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
130pub use self::try_skip_while::TrySkipWhile;
131
132mod try_take_while;
133#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
134pub use self::try_take_while::TryTakeWhile;
135
136#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
137#[cfg(feature = "alloc")]
138mod try_buffer_unordered;
139#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
140#[cfg(feature = "alloc")]
141#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
142pub use self::try_buffer_unordered::TryBufferUnordered;
143
144#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
145#[cfg(feature = "alloc")]
146mod try_buffered;
147#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
148#[cfg(feature = "alloc")]
149#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
150pub use self::try_buffered::TryBuffered;
151
152#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
153#[cfg(feature = "alloc")]
154mod try_for_each_concurrent;
155#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
156#[cfg(feature = "alloc")]
157#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
158pub use self::try_for_each_concurrent::TryForEachConcurrent;
159
160#[cfg(feature = "io")]
161#[cfg(feature = "std")]
162mod into_async_read;
163#[cfg(feature = "io")]
164#[cfg_attr(docsrs, doc(cfg(feature = "io")))]
165#[cfg(feature = "std")]
166#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
167pub use self::into_async_read::IntoAsyncRead;
168
169mod try_all;
170#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
171pub use self::try_all::TryAll;
172
173mod try_any;
174#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
175pub use self::try_any::TryAny;
176
177impl<S: ?Sized + TryStream> TryStreamExt for S {}
178
179/// Adapters specific to `Result`-returning streams
180pub trait TryStreamExt: TryStream {
181    /// Wraps the current stream in a new stream which converts the error type
182    /// into the one provided.
183    ///
184    /// # Examples
185    ///
186    /// ```
187    /// # futures::executor::block_on(async {
188    /// use futures::stream::{self, TryStreamExt};
189    ///
190    /// let mut stream =
191    ///     stream::iter(vec![Ok(()), Err(5i32)])
192    ///         .err_into::<i64>();
193    ///
194    /// assert_eq!(stream.try_next().await, Ok(Some(())));
195    /// assert_eq!(stream.try_next().await, Err(5i64));
196    /// # })
197    /// ```
198    fn err_into<E>(self) -> ErrInto<Self, E>
199    where
200        Self: Sized,
201        Self::Error: Into<E>,
202    {
203        assert_stream::<Result<Self::Ok, E>, _>(ErrInto::new(self))
204    }
205
206    /// Wraps the current stream in a new stream which maps the success value
207    /// using the provided closure.
208    ///
209    /// # Examples
210    ///
211    /// ```
212    /// # futures::executor::block_on(async {
213    /// use futures::stream::{self, TryStreamExt};
214    ///
215    /// let mut stream =
216    ///     stream::iter(vec![Ok(5), Err(0)])
217    ///         .map_ok(|x| x + 2);
218    ///
219    /// assert_eq!(stream.try_next().await, Ok(Some(7)));
220    /// assert_eq!(stream.try_next().await, Err(0));
221    /// # })
222    /// ```
223    fn map_ok<T, F>(self, f: F) -> MapOk<Self, F>
224    where
225        Self: Sized,
226        F: FnMut(Self::Ok) -> T,
227    {
228        assert_stream::<Result<T, Self::Error>, _>(MapOk::new(self, f))
229    }
230
231    /// Wraps the current stream in a new stream which maps the error value
232    /// using the provided closure.
233    ///
234    /// # Examples
235    ///
236    /// ```
237    /// # futures::executor::block_on(async {
238    /// use futures::stream::{self, TryStreamExt};
239    ///
240    /// let mut stream =
241    ///     stream::iter(vec![Ok(5), Err(0)])
242    ///         .map_err(|x| x + 2);
243    ///
244    /// assert_eq!(stream.try_next().await, Ok(Some(5)));
245    /// assert_eq!(stream.try_next().await, Err(2));
246    /// # })
247    /// ```
248    fn map_err<E, F>(self, f: F) -> MapErr<Self, F>
249    where
250        Self: Sized,
251        F: FnMut(Self::Error) -> E,
252    {
253        assert_stream::<Result<Self::Ok, E>, _>(MapErr::new(self, f))
254    }
255
256    /// Chain on a computation for when a value is ready, passing the successful
257    /// results to the provided closure `f`.
258    ///
259    /// This function can be used to run a unit of work when the next successful
260    /// value on a stream is ready. The closure provided will be yielded a value
261    /// when ready, and the returned future will then be run to completion to
262    /// produce the next value on this stream.
263    ///
264    /// Any errors produced by this stream will not be passed to the closure,
265    /// and will be passed through.
266    ///
267    /// The returned value of the closure must implement the `TryFuture` trait
268    /// and can represent some more work to be done before the composed stream
269    /// is finished.
270    ///
271    /// Note that this function consumes the receiving stream and returns a
272    /// wrapped version of it.
273    ///
274    /// To process the entire stream and return a single future representing
275    /// success or error, use `try_for_each` instead.
276    ///
277    /// # Examples
278    ///
279    /// ```
280    /// use futures::channel::mpsc;
281    /// use futures::future;
282    /// use futures::stream::TryStreamExt;
283    ///
284    /// let (_tx, rx) = mpsc::channel::<Result<i32, ()>>(1);
285    ///
286    /// let rx = rx.and_then(|result| {
287    ///     future::ok(if result % 2 == 0 {
288    ///         Some(result)
289    ///     } else {
290    ///         None
291    ///     })
292    /// });
293    /// ```
294    fn and_then<Fut, F>(self, f: F) -> AndThen<Self, Fut, F>
295    where
296        F: FnMut(Self::Ok) -> Fut,
297        Fut: TryFuture<Error = Self::Error>,
298        Self: Sized,
299    {
300        assert_stream::<Result<Fut::Ok, Fut::Error>, _>(AndThen::new(self, f))
301    }
302
303    /// Chain on a computation for when an error happens, passing the
304    /// erroneous result to the provided closure `f`.
305    ///
306    /// This function can be used to run a unit of work and attempt to recover from
307    /// an error if one happens. The closure provided will be yielded an error
308    /// when one appears, and the returned future will then be run to completion
309    /// to produce the next value on this stream.
310    ///
311    /// Any successful values produced by this stream will not be passed to the
312    /// closure, and will be passed through.
313    ///
314    /// The returned value of the closure must implement the [`TryFuture`](futures_core::future::TryFuture) trait
315    /// and can represent some more work to be done before the composed stream
316    /// is finished.
317    ///
318    /// Note that this function consumes the receiving stream and returns a
319    /// wrapped version of it.
320    fn or_else<Fut, F>(self, f: F) -> OrElse<Self, Fut, F>
321    where
322        F: FnMut(Self::Error) -> Fut,
323        Fut: TryFuture<Ok = Self::Ok>,
324        Self: Sized,
325    {
326        assert_stream::<Result<Self::Ok, Fut::Error>, _>(OrElse::new(self, f))
327    }
328
329    /// Do something with the success value of this stream, afterwards passing
330    /// it on.
331    ///
332    /// This is similar to the `StreamExt::inspect` method where it allows
333    /// easily inspecting the success value as it passes through the stream, for
334    /// example to debug what's going on.
335    fn inspect_ok<F>(self, f: F) -> InspectOk<Self, F>
336    where
337        F: FnMut(&Self::Ok),
338        Self: Sized,
339    {
340        assert_stream::<Result<Self::Ok, Self::Error>, _>(InspectOk::new(self, f))
341    }
342
343    /// Do something with the error value of this stream, afterwards passing it on.
344    ///
345    /// This is similar to the `StreamExt::inspect` method where it allows
346    /// easily inspecting the error value as it passes through the stream, for
347    /// example to debug what's going on.
348    fn inspect_err<F>(self, f: F) -> InspectErr<Self, F>
349    where
350        F: FnMut(&Self::Error),
351        Self: Sized,
352    {
353        assert_stream::<Result<Self::Ok, Self::Error>, _>(InspectErr::new(self, f))
354    }
355
356    /// Wraps a [`TryStream`] into a type that implements
357    /// [`Stream`](futures_core::stream::Stream)
358    ///
359    /// [`TryStream`]s currently do not implement the
360    /// [`Stream`](futures_core::stream::Stream) trait because of limitations
361    /// of the compiler.
362    ///
363    /// # Examples
364    ///
365    /// ```
366    /// use futures::stream::{Stream, TryStream, TryStreamExt};
367    ///
368    /// # type T = i32;
369    /// # type E = ();
370    /// fn make_try_stream() -> impl TryStream<Ok = T, Error = E> { // ... }
371    /// # futures::stream::empty()
372    /// # }
373    /// fn take_stream(stream: impl Stream<Item = Result<T, E>>) { /* ... */ }
374    ///
375    /// take_stream(make_try_stream().into_stream());
376    /// ```
377    fn into_stream(self) -> IntoStream<Self>
378    where
379        Self: Sized,
380    {
381        assert_stream::<Result<Self::Ok, Self::Error>, _>(IntoStream::new(self))
382    }
383
384    /// Creates a future that attempts to resolve the next item in the stream.
385    /// If an error is encountered before the next item, the error is returned
386    /// instead.
387    ///
388    /// This is similar to the `Stream::next` combinator, but returns a
389    /// `Result<Option<T>, E>` rather than an `Option<Result<T, E>>`, making
390    /// for easy use with the `?` operator.
391    ///
392    /// # Examples
393    ///
394    /// ```
395    /// # futures::executor::block_on(async {
396    /// use futures::stream::{self, TryStreamExt};
397    ///
398    /// let mut stream = stream::iter(vec![Ok(()), Err(())]);
399    ///
400    /// assert_eq!(stream.try_next().await, Ok(Some(())));
401    /// assert_eq!(stream.try_next().await, Err(()));
402    /// # })
403    /// ```
404    fn try_next(&mut self) -> TryNext<'_, Self>
405    where
406        Self: Unpin,
407    {
408        assert_future::<Result<Option<Self::Ok>, Self::Error>, _>(TryNext::new(self))
409    }
410
411    /// Attempts to run this stream to completion, executing the provided
412    /// asynchronous closure for each element on the stream.
413    ///
414    /// The provided closure will be called for each item this stream produces,
415    /// yielding a future. That future will then be executed to completion
416    /// before moving on to the next item.
417    ///
418    /// The returned value is a [`Future`](futures_core::future::Future) where the
419    /// [`Output`](futures_core::future::Future::Output) type is
420    /// `Result<(), Self::Error>`. If any of the intermediate
421    /// futures or the stream returns an error, this future will return
422    /// immediately with an error.
423    ///
424    /// # Examples
425    ///
426    /// ```
427    /// # futures::executor::block_on(async {
428    /// use futures::future;
429    /// use futures::stream::{self, TryStreamExt};
430    ///
431    /// let mut x = 0i32;
432    ///
433    /// {
434    ///     let fut = stream::repeat(Ok(1)).try_for_each(|item| {
435    ///         x += item;
436    ///         future::ready(if x == 3 { Err(()) } else { Ok(()) })
437    ///     });
438    ///     assert_eq!(fut.await, Err(()));
439    /// }
440    ///
441    /// assert_eq!(x, 3);
442    /// # })
443    /// ```
444    fn try_for_each<Fut, F>(self, f: F) -> TryForEach<Self, Fut, F>
445    where
446        F: FnMut(Self::Ok) -> Fut,
447        Fut: TryFuture<Ok = (), Error = Self::Error>,
448        Self: Sized,
449    {
450        assert_future::<Result<(), Self::Error>, _>(TryForEach::new(self, f))
451    }
452
453    /// Skip elements on this stream while the provided asynchronous predicate
454    /// resolves to `true`.
455    ///
456    /// This function is similar to
457    /// [`StreamExt::skip_while`](crate::stream::StreamExt::skip_while) but exits
458    /// early if an error occurs.
459    ///
460    /// # Examples
461    ///
462    /// ```
463    /// # futures::executor::block_on(async {
464    /// use futures::future;
465    /// use futures::stream::{self, TryStreamExt};
466    ///
467    /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(3), Ok(2)]);
468    /// let stream = stream.try_skip_while(|x| future::ready(Ok(*x < 3)));
469    ///
470    /// let output: Result<Vec<i32>, i32> = stream.try_collect().await;
471    /// assert_eq!(output, Ok(vec![3, 2]));
472    /// # })
473    /// ```
474    fn try_skip_while<Fut, F>(self, f: F) -> TrySkipWhile<Self, Fut, F>
475    where
476        F: FnMut(&Self::Ok) -> Fut,
477        Fut: TryFuture<Ok = bool, Error = Self::Error>,
478        Self: Sized,
479    {
480        assert_stream::<Result<Self::Ok, Self::Error>, _>(TrySkipWhile::new(self, f))
481    }
482
483    /// Take elements on this stream while the provided asynchronous predicate
484    /// resolves to `true`.
485    ///
486    /// This function is similar to
487    /// [`StreamExt::take_while`](crate::stream::StreamExt::take_while) but exits
488    /// early if an error occurs.
489    ///
490    /// # Examples
491    ///
492    /// ```
493    /// # futures::executor::block_on(async {
494    /// use futures::future;
495    /// use futures::stream::{self, TryStreamExt};
496    ///
497    /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Ok(2)]);
498    /// let stream = stream.try_take_while(|x| future::ready(Ok(*x < 3)));
499    ///
500    /// let output: Result<Vec<i32>, i32> = stream.try_collect().await;
501    /// assert_eq!(output, Ok(vec![1, 2]));
502    /// # })
503    /// ```
504    fn try_take_while<Fut, F>(self, f: F) -> TryTakeWhile<Self, Fut, F>
505    where
506        F: FnMut(&Self::Ok) -> Fut,
507        Fut: TryFuture<Ok = bool, Error = Self::Error>,
508        Self: Sized,
509    {
510        assert_stream::<Result<Self::Ok, Self::Error>, _>(TryTakeWhile::new(self, f))
511    }
512
513    /// Attempts to run this stream to completion, executing the provided asynchronous
514    /// closure for each element on the stream concurrently as elements become
515    /// available, exiting as soon as an error occurs.
516    ///
517    /// This is similar to
518    /// [`StreamExt::for_each_concurrent`](crate::stream::StreamExt::for_each_concurrent),
519    /// but will resolve to an error immediately if the underlying stream or the provided
520    /// closure return an error.
521    ///
522    /// This method is only available when the `std` or `alloc` feature of this
523    /// library is activated, and it is activated by default.
524    ///
525    /// # Examples
526    ///
527    /// ```
528    /// # futures::executor::block_on(async {
529    /// use futures::channel::oneshot;
530    /// use futures::stream::{self, StreamExt, TryStreamExt};
531    ///
532    /// let (tx1, rx1) = oneshot::channel();
533    /// let (tx2, rx2) = oneshot::channel();
534    /// let (_tx3, rx3) = oneshot::channel();
535    ///
536    /// let stream = stream::iter(vec![rx1, rx2, rx3]);
537    /// let fut = stream.map(Ok).try_for_each_concurrent(
538    ///     /* limit */ 2,
539    ///     |rx| async move {
540    ///         let res: Result<(), oneshot::Canceled> = rx.await;
541    ///         res
542    ///     }
543    /// );
544    ///
545    /// tx1.send(()).unwrap();
546    /// // Drop the second sender so that `rx2` resolves to `Canceled`.
547    /// drop(tx2);
548    ///
549    /// // The final result is an error because the second future
550    /// // resulted in an error.
551    /// assert_eq!(Err(oneshot::Canceled), fut.await);
552    /// # })
553    /// ```
554    #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
555    #[cfg(feature = "alloc")]
556    fn try_for_each_concurrent<Fut, F>(
557        self,
558        limit: impl Into<Option<usize>>,
559        f: F,
560    ) -> TryForEachConcurrent<Self, Fut, F>
561    where
562        F: FnMut(Self::Ok) -> Fut,
563        Fut: Future<Output = Result<(), Self::Error>>,
564        Self: Sized,
565    {
566        assert_future::<Result<(), Self::Error>, _>(TryForEachConcurrent::new(
567            self,
568            limit.into(),
569            f,
570        ))
571    }
572
573    /// Attempt to transform a stream into a collection,
574    /// returning a future representing the result of that computation.
575    ///
576    /// This combinator will collect all successful results of this stream and
577    /// collect them into the specified collection type. If an error happens then all
578    /// collected elements will be dropped and the error will be returned.
579    ///
580    /// The returned future will be resolved when the stream terminates.
581    ///
582    /// # Examples
583    ///
584    /// ```
585    /// # futures::executor::block_on(async {
586    /// use futures::channel::mpsc;
587    /// use futures::stream::TryStreamExt;
588    /// use std::thread;
589    ///
590    /// let (tx, rx) = mpsc::unbounded();
591    ///
592    /// thread::spawn(move || {
593    ///     for i in 1..=5 {
594    ///         tx.unbounded_send(Ok(i)).unwrap();
595    ///     }
596    ///     tx.unbounded_send(Err(6)).unwrap();
597    /// });
598    ///
599    /// let output: Result<Vec<i32>, i32> = rx.try_collect().await;
600    /// assert_eq!(output, Err(6));
601    /// # })
602    /// ```
603    fn try_collect<C: Default + Extend<Self::Ok>>(self) -> TryCollect<Self, C>
604    where
605        Self: Sized,
606    {
607        assert_future::<Result<C, Self::Error>, _>(TryCollect::new(self))
608    }
609
610    /// An adaptor for chunking up successful items of the stream inside a vector.
611    ///
612    /// This combinator will attempt to pull successful items from this stream and buffer
613    /// them into a local vector. At most `capacity` items will get buffered
614    /// before they're yielded from the returned stream.
615    ///
616    /// Note that the vectors returned from this iterator may not always have
617    /// `capacity` elements. If the underlying stream ended and only a partial
618    /// vector was created, it'll be returned. Additionally if an error happens
619    /// from the underlying stream then the currently buffered items will be
620    /// yielded.
621    ///
622    /// This method is only available when the `std` or `alloc` feature of this
623    /// library is activated, and it is activated by default.
624    ///
625    /// This function is similar to
626    /// [`StreamExt::chunks`](crate::stream::StreamExt::chunks) but exits
627    /// early if an error occurs.
628    ///
629    /// # Examples
630    ///
631    /// ```
632    /// # futures::executor::block_on(async {
633    /// use futures::stream::{self, TryChunksError, TryStreamExt};
634    ///
635    /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Err(4), Ok(5), Ok(6)]);
636    /// let mut stream = stream.try_chunks(2);
637    ///
638    /// assert_eq!(stream.try_next().await, Ok(Some(vec![1, 2])));
639    /// assert_eq!(stream.try_next().await, Err(TryChunksError(vec![3], 4)));
640    /// assert_eq!(stream.try_next().await, Ok(Some(vec![5, 6])));
641    /// # })
642    /// ```
643    ///
644    /// # Panics
645    ///
646    /// This method will panic if `capacity` is zero.
647    #[cfg(feature = "alloc")]
648    fn try_chunks(self, capacity: usize) -> TryChunks<Self>
649    where
650        Self: Sized,
651    {
652        assert_stream::<Result<Vec<Self::Ok>, TryChunksError<Self::Ok, Self::Error>>, _>(
653            TryChunks::new(self, capacity),
654        )
655    }
656
657    /// An adaptor for chunking up successful, ready items of the stream inside a vector.
658    ///
659    /// This combinator will attempt to pull successful items from this stream and buffer
660    /// them into a local vector. At most `capacity` items will get buffered
661    /// before they're yielded from the returned stream. If the underlying stream
662    /// returns `Poll::Pending`, and the collected chunk is not empty, it will
663    /// be immidiatly returned.
664    ///
665    /// Note that the vectors returned from this iterator may not always have
666    /// `capacity` elements. If the underlying stream ended and only a partial
667    /// vector was created, it'll be returned. Additionally if an error happens
668    /// from the underlying stream then the currently buffered items will be
669    /// yielded.
670    ///
671    /// This method is only available when the `std` or `alloc` feature of this
672    /// library is activated, and it is activated by default.
673    ///
674    /// This function is similar to
675    /// [`StreamExt::ready_chunks`](crate::stream::StreamExt::ready_chunks) but exits
676    /// early if an error occurs.
677    ///
678    /// # Examples
679    ///
680    /// ```
681    /// # futures::executor::block_on(async {
682    /// use futures::stream::{self, TryReadyChunksError, TryStreamExt};
683    ///
684    /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Err(4), Ok(5), Ok(6)]);
685    /// let mut stream = stream.try_ready_chunks(2);
686    ///
687    /// assert_eq!(stream.try_next().await, Ok(Some(vec![1, 2])));
688    /// assert_eq!(stream.try_next().await, Err(TryReadyChunksError(vec![3], 4)));
689    /// assert_eq!(stream.try_next().await, Ok(Some(vec![5, 6])));
690    /// # })
691    /// ```
692    ///
693    /// # Panics
694    ///
695    /// This method will panic if `capacity` is zero.
696    #[cfg(feature = "alloc")]
697    fn try_ready_chunks(self, capacity: usize) -> TryReadyChunks<Self>
698    where
699        Self: Sized,
700    {
701        assert_stream::<Result<Vec<Self::Ok>, TryReadyChunksError<Self::Ok, Self::Error>>, _>(
702            TryReadyChunks::new(self, capacity),
703        )
704    }
705
706    /// Attempt to filter the values produced by this stream according to the
707    /// provided asynchronous closure.
708    ///
709    /// As values of this stream are made available, the provided predicate `f`
710    /// will be run on them. If the predicate returns a `Future` which resolves
711    /// to `true`, then the stream will yield the value, but if the predicate
712    /// return a `Future` which resolves to `false`, then the value will be
713    /// discarded and the next value will be produced.
714    ///
715    /// All errors are passed through without filtering in this combinator.
716    ///
717    /// Note that this function consumes the stream passed into it and returns a
718    /// wrapped version of it, similar to the existing `filter` methods in
719    /// the standard library.
720    ///
721    /// # Examples
722    /// ```
723    /// # futures::executor::block_on(async {
724    /// use futures::future;
725    /// use futures::stream::{self, StreamExt, TryStreamExt};
726    ///
727    /// let stream = stream::iter(vec![Ok(1i32), Ok(2i32), Ok(3i32), Err("error")]);
728    /// let mut evens = stream.try_filter(|x| {
729    ///     future::ready(x % 2 == 0)
730    /// });
731    ///
732    /// assert_eq!(evens.next().await, Some(Ok(2)));
733    /// assert_eq!(evens.next().await, Some(Err("error")));
734    /// # })
735    /// ```
736    fn try_filter<Fut, F>(self, f: F) -> TryFilter<Self, Fut, F>
737    where
738        Fut: Future<Output = bool>,
739        F: FnMut(&Self::Ok) -> Fut,
740        Self: Sized,
741    {
742        assert_stream::<Result<Self::Ok, Self::Error>, _>(TryFilter::new(self, f))
743    }
744
745    /// Attempt to filter the values produced by this stream while
746    /// simultaneously mapping them to a different type according to the
747    /// provided asynchronous closure.
748    ///
749    /// As values of this stream are made available, the provided function will
750    /// be run on them. If the future returned by the predicate `f` resolves to
751    /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
752    /// it resolves to [`None`] then the next value will be produced.
753    ///
754    /// All errors are passed through without filtering in this combinator.
755    ///
756    /// Note that this function consumes the stream passed into it and returns a
757    /// wrapped version of it, similar to the existing `filter_map` methods in
758    /// the standard library.
759    ///
760    /// # Examples
761    /// ```
762    /// # futures::executor::block_on(async {
763    /// use futures::stream::{self, StreamExt, TryStreamExt};
764    /// use futures::pin_mut;
765    ///
766    /// let stream = stream::iter(vec![Ok(1i32), Ok(6i32), Err("error")]);
767    /// let halves = stream.try_filter_map(|x| async move {
768    ///     let ret = if x % 2 == 0 { Some(x / 2) } else { None };
769    ///     Ok(ret)
770    /// });
771    ///
772    /// pin_mut!(halves);
773    /// assert_eq!(halves.next().await, Some(Ok(3)));
774    /// assert_eq!(halves.next().await, Some(Err("error")));
775    /// # })
776    /// ```
777    fn try_filter_map<Fut, F, T>(self, f: F) -> TryFilterMap<Self, Fut, F>
778    where
779        Fut: TryFuture<Ok = Option<T>, Error = Self::Error>,
780        F: FnMut(Self::Ok) -> Fut,
781        Self: Sized,
782    {
783        assert_stream::<Result<T, Self::Error>, _>(TryFilterMap::new(self, f))
784    }
785
786    /// Flattens a stream of streams into just one continuous stream. Produced streams
787    /// will be polled concurrently and any errors will be passed through without looking at them.
788    /// If the underlying base stream returns an error, it will be **immediately** propagated.
789    ///
790    /// The only argument is an optional limit on the number of concurrently
791    /// polled streams. If this limit is not `None`, no more than `limit` streams
792    /// will be polled at the same time. The `limit` argument is of type
793    /// `Into<Option<usize>>`, and so can be provided as either `None`,
794    /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
795    /// no limit at all, and will have the same result as passing in `None`.
796    ///
797    /// # Examples
798    ///
799    /// ```
800    /// # futures::executor::block_on(async {
801    /// use futures::channel::mpsc;
802    /// use futures::stream::{StreamExt, TryStreamExt};
803    /// use std::thread;
804    ///
805    /// let (tx1, rx1) = mpsc::unbounded();
806    /// let (tx2, rx2) = mpsc::unbounded();
807    /// let (tx3, rx3) = mpsc::unbounded();
808    ///
809    /// thread::spawn(move || {
810    ///     tx1.unbounded_send(Ok(1)).unwrap();
811    /// });
812    /// thread::spawn(move || {
813    ///     tx2.unbounded_send(Ok(2)).unwrap();
814    ///     tx2.unbounded_send(Err(3)).unwrap();
815    ///     tx2.unbounded_send(Ok(4)).unwrap();
816    /// });
817    /// thread::spawn(move || {
818    ///     tx3.unbounded_send(Ok(rx1)).unwrap();
819    ///     tx3.unbounded_send(Ok(rx2)).unwrap();
820    ///     tx3.unbounded_send(Err(5)).unwrap();
821    /// });
822    ///
823    /// let stream = rx3.try_flatten_unordered(None);
824    /// let mut values: Vec<_> = stream.collect().await;
825    /// values.sort();
826    ///
827    /// assert_eq!(values, vec![Ok(1), Ok(2), Ok(4), Err(3), Err(5)]);
828    /// # });
829    /// ```
830    #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
831    #[cfg(feature = "alloc")]
832    fn try_flatten_unordered(self, limit: impl Into<Option<usize>>) -> TryFlattenUnordered<Self>
833    where
834        Self::Ok: TryStream + Unpin,
835        <Self::Ok as TryStream>::Error: From<Self::Error>,
836        Self: Sized,
837    {
838        assert_stream::<Result<<Self::Ok as TryStream>::Ok, <Self::Ok as TryStream>::Error>, _>(
839            TryFlattenUnordered::new(self, limit),
840        )
841    }
842
843    /// Flattens a stream of streams into just one continuous stream.
844    ///
845    /// If this stream's elements are themselves streams then this combinator
846    /// will flatten out the entire stream to one long chain of elements. Any
847    /// errors are passed through without looking at them, but otherwise each
848    /// individual stream will get exhausted before moving on to the next.
849    ///
850    /// # Examples
851    ///
852    /// ```
853    /// # futures::executor::block_on(async {
854    /// use futures::channel::mpsc;
855    /// use futures::stream::{StreamExt, TryStreamExt};
856    /// use std::thread;
857    ///
858    /// let (tx1, rx1) = mpsc::unbounded();
859    /// let (tx2, rx2) = mpsc::unbounded();
860    /// let (tx3, rx3) = mpsc::unbounded();
861    ///
862    /// thread::spawn(move || {
863    ///     tx1.unbounded_send(Ok(1)).unwrap();
864    /// });
865    /// thread::spawn(move || {
866    ///     tx2.unbounded_send(Ok(2)).unwrap();
867    ///     tx2.unbounded_send(Err(3)).unwrap();
868    ///     tx2.unbounded_send(Ok(4)).unwrap();
869    /// });
870    /// thread::spawn(move || {
871    ///     tx3.unbounded_send(Ok(rx1)).unwrap();
872    ///     tx3.unbounded_send(Ok(rx2)).unwrap();
873    ///     tx3.unbounded_send(Err(5)).unwrap();
874    /// });
875    ///
876    /// let mut stream = rx3.try_flatten();
877    /// assert_eq!(stream.next().await, Some(Ok(1)));
878    /// assert_eq!(stream.next().await, Some(Ok(2)));
879    /// assert_eq!(stream.next().await, Some(Err(3)));
880    /// assert_eq!(stream.next().await, Some(Ok(4)));
881    /// assert_eq!(stream.next().await, Some(Err(5)));
882    /// assert_eq!(stream.next().await, None);
883    /// # });
884    /// ```
885    fn try_flatten(self) -> TryFlatten<Self>
886    where
887        Self::Ok: TryStream,
888        <Self::Ok as TryStream>::Error: From<Self::Error>,
889        Self: Sized,
890    {
891        assert_stream::<Result<<Self::Ok as TryStream>::Ok, <Self::Ok as TryStream>::Error>, _>(
892            TryFlatten::new(self),
893        )
894    }
895
896    /// Attempt to execute an accumulating asynchronous computation over a
897    /// stream, collecting all the values into one final result.
898    ///
899    /// This combinator will accumulate all values returned by this stream
900    /// according to the closure provided. The initial state is also provided to
901    /// this method and then is returned again by each execution of the closure.
902    /// Once the entire stream has been exhausted the returned future will
903    /// resolve to this value.
904    ///
905    /// This method is similar to [`fold`](crate::stream::StreamExt::fold), but will
906    /// exit early if an error is encountered in either the stream or the
907    /// provided closure.
908    ///
909    /// # Examples
910    ///
911    /// ```
912    /// # futures::executor::block_on(async {
913    /// use futures::stream::{self, TryStreamExt};
914    ///
915    /// let number_stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2)]);
916    /// let sum = number_stream.try_fold(0, |acc, x| async move { Ok(acc + x) });
917    /// assert_eq!(sum.await, Ok(3));
918    ///
919    /// let number_stream_with_err = stream::iter(vec![Ok::<i32, i32>(1), Err(2), Ok(1)]);
920    /// let sum = number_stream_with_err.try_fold(0, |acc, x| async move { Ok(acc + x) });
921    /// assert_eq!(sum.await, Err(2));
922    /// # })
923    /// ```
924    fn try_fold<T, Fut, F>(self, init: T, f: F) -> TryFold<Self, Fut, T, F>
925    where
926        F: FnMut(T, Self::Ok) -> Fut,
927        Fut: TryFuture<Ok = T, Error = Self::Error>,
928        Self: Sized,
929    {
930        assert_future::<Result<T, Self::Error>, _>(TryFold::new(self, f, init))
931    }
932
933    /// Attempt to concatenate all items of a stream into a single
934    /// extendable destination, returning a future representing the end result.
935    ///
936    /// This combinator will extend the first item with the contents of all
937    /// the subsequent successful results of the stream. If the stream is empty,
938    /// the default value will be returned.
939    ///
940    /// Works with all collections that implement the [`Extend`](std::iter::Extend) trait.
941    ///
942    /// This method is similar to [`concat`](crate::stream::StreamExt::concat), but will
943    /// exit early if an error is encountered in the stream.
944    ///
945    /// # Examples
946    ///
947    /// ```
948    /// # futures::executor::block_on(async {
949    /// use futures::channel::mpsc;
950    /// use futures::stream::TryStreamExt;
951    /// use std::thread;
952    ///
953    /// let (tx, rx) = mpsc::unbounded::<Result<Vec<i32>, ()>>();
954    ///
955    /// thread::spawn(move || {
956    ///     for i in (0..3).rev() {
957    ///         let n = i * 3;
958    ///         tx.unbounded_send(Ok(vec![n + 1, n + 2, n + 3])).unwrap();
959    ///     }
960    /// });
961    ///
962    /// let result = rx.try_concat().await;
963    ///
964    /// assert_eq!(result, Ok(vec![7, 8, 9, 4, 5, 6, 1, 2, 3]));
965    /// # });
966    /// ```
967    fn try_concat(self) -> TryConcat<Self>
968    where
969        Self: Sized,
970        Self::Ok: Extend<<<Self as TryStream>::Ok as IntoIterator>::Item> + IntoIterator + Default,
971    {
972        assert_future::<Result<Self::Ok, Self::Error>, _>(TryConcat::new(self))
973    }
974
975    /// Attempt to execute several futures from a stream concurrently (unordered).
976    ///
977    /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type
978    /// that matches the stream's `Error` type.
979    ///
980    /// This adaptor will buffer up to `n` futures and then return their
981    /// outputs in the order in which they complete. If the underlying stream
982    /// returns an error, it will be immediately propagated.
983    ///
984    /// The returned stream will be a stream of results, each containing either
985    /// an error or a future's output. An error can be produced either by the
986    /// underlying stream itself or by one of the futures it yielded.
987    ///
988    /// This method is only available when the `std` or `alloc` feature of this
989    /// library is activated, and it is activated by default.
990    ///
991    /// # Examples
992    ///
993    /// Results are returned in the order of completion:
994    /// ```
995    /// # futures::executor::block_on(async {
996    /// use futures::channel::oneshot;
997    /// use futures::stream::{self, StreamExt, TryStreamExt};
998    ///
999    /// let (send_one, recv_one) = oneshot::channel();
1000    /// let (send_two, recv_two) = oneshot::channel();
1001    ///
1002    /// let stream_of_futures = stream::iter(vec![Ok(recv_one), Ok(recv_two)]);
1003    ///
1004    /// let mut buffered = stream_of_futures.try_buffer_unordered(10);
1005    ///
1006    /// send_two.send(2i32)?;
1007    /// assert_eq!(buffered.next().await, Some(Ok(2i32)));
1008    ///
1009    /// send_one.send(1i32)?;
1010    /// assert_eq!(buffered.next().await, Some(Ok(1i32)));
1011    ///
1012    /// assert_eq!(buffered.next().await, None);
1013    /// # Ok::<(), i32>(()) }).unwrap();
1014    /// ```
1015    ///
1016    /// Errors from the underlying stream itself are propagated:
1017    /// ```
1018    /// # futures::executor::block_on(async {
1019    /// use futures::channel::mpsc;
1020    /// use futures::stream::{StreamExt, TryStreamExt};
1021    ///
1022    /// let (sink, stream_of_futures) = mpsc::unbounded();
1023    /// let mut buffered = stream_of_futures.try_buffer_unordered(10);
1024    ///
1025    /// sink.unbounded_send(Ok(async { Ok(7i32) }))?;
1026    /// assert_eq!(buffered.next().await, Some(Ok(7i32)));
1027    ///
1028    /// sink.unbounded_send(Err("error in the stream"))?;
1029    /// assert_eq!(buffered.next().await, Some(Err("error in the stream")));
1030    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
1031    /// ```
1032    #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
1033    #[cfg(feature = "alloc")]
1034    fn try_buffer_unordered(self, n: usize) -> TryBufferUnordered<Self>
1035    where
1036        Self::Ok: TryFuture<Error = Self::Error>,
1037        Self: Sized,
1038    {
1039        assert_stream::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>(
1040            TryBufferUnordered::new(self, n),
1041        )
1042    }
1043
1044    /// Attempt to execute several futures from a stream concurrently.
1045    ///
1046    /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type
1047    /// that matches the stream's `Error` type.
1048    ///
1049    /// This adaptor will buffer up to `n` futures and then return their
1050    /// outputs in the same order as the underlying stream. If the underlying stream returns an error, it will
1051    /// be immediately propagated.
1052    ///
1053    /// The returned stream will be a stream of results, each containing either
1054    /// an error or a future's output. An error can be produced either by the
1055    /// underlying stream itself or by one of the futures it yielded.
1056    ///
1057    /// This method is only available when the `std` or `alloc` feature of this
1058    /// library is activated, and it is activated by default.
1059    ///
1060    /// # Examples
1061    ///
1062    /// Results are returned in the order of addition:
1063    /// ```
1064    /// # futures::executor::block_on(async {
1065    /// use futures::channel::oneshot;
1066    /// use futures::future::lazy;
1067    /// use futures::stream::{self, StreamExt, TryStreamExt};
1068    ///
1069    /// let (send_one, recv_one) = oneshot::channel();
1070    /// let (send_two, recv_two) = oneshot::channel();
1071    ///
1072    /// let mut buffered = lazy(move |cx| {
1073    ///     let stream_of_futures = stream::iter(vec![Ok(recv_one), Ok(recv_two)]);
1074    ///
1075    ///     let mut buffered = stream_of_futures.try_buffered(10);
1076    ///
1077    ///     assert!(buffered.try_poll_next_unpin(cx).is_pending());
1078    ///
1079    ///     send_two.send(2i32)?;
1080    ///     assert!(buffered.try_poll_next_unpin(cx).is_pending());
1081    ///     Ok::<_, i32>(buffered)
1082    /// }).await?;
1083    ///
1084    /// send_one.send(1i32)?;
1085    /// assert_eq!(buffered.next().await, Some(Ok(1i32)));
1086    /// assert_eq!(buffered.next().await, Some(Ok(2i32)));
1087    ///
1088    /// assert_eq!(buffered.next().await, None);
1089    /// # Ok::<(), i32>(()) }).unwrap();
1090    /// ```
1091    ///
1092    /// Errors from the underlying stream itself are propagated:
1093    /// ```
1094    /// # futures::executor::block_on(async {
1095    /// use futures::channel::mpsc;
1096    /// use futures::stream::{StreamExt, TryStreamExt};
1097    ///
1098    /// let (sink, stream_of_futures) = mpsc::unbounded();
1099    /// let mut buffered = stream_of_futures.try_buffered(10);
1100    ///
1101    /// sink.unbounded_send(Ok(async { Ok(7i32) }))?;
1102    /// assert_eq!(buffered.next().await, Some(Ok(7i32)));
1103    ///
1104    /// sink.unbounded_send(Err("error in the stream"))?;
1105    /// assert_eq!(buffered.next().await, Some(Err("error in the stream")));
1106    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
1107    /// ```
1108    #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
1109    #[cfg(feature = "alloc")]
1110    fn try_buffered(self, n: usize) -> TryBuffered<Self>
1111    where
1112        Self::Ok: TryFuture<Error = Self::Error>,
1113        Self: Sized,
1114    {
1115        assert_stream::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>(TryBuffered::new(
1116            self, n,
1117        ))
1118    }
1119
1120    // TODO: false positive warning from rustdoc. Verify once #43466 settles
1121    //
1122    /// A convenience method for calling [`TryStream::try_poll_next`] on [`Unpin`]
1123    /// stream types.
1124    fn try_poll_next_unpin(
1125        &mut self,
1126        cx: &mut Context<'_>,
1127    ) -> Poll<Option<Result<Self::Ok, Self::Error>>>
1128    where
1129        Self: Unpin,
1130    {
1131        Pin::new(self).try_poll_next(cx)
1132    }
1133
1134    /// Wraps a [`TryStream`] into a stream compatible with libraries using
1135    /// futures 0.1 `Stream`. Requires the `compat` feature to be enabled.
1136    /// ```
1137    /// # if cfg!(miri) { return; } // Miri does not support epoll
1138    /// use futures::future::{FutureExt, TryFutureExt};
1139    /// # let (tx, rx) = futures::channel::oneshot::channel();
1140    ///
1141    /// let future03 = async {
1142    ///     println!("Running on the pool");
1143    ///     tx.send(42).unwrap();
1144    /// };
1145    ///
1146    /// let future01 = future03
1147    ///     .unit_error() // Make it a TryFuture
1148    ///     .boxed()  // Make it Unpin
1149    ///     .compat();
1150    ///
1151    /// tokio::run(future01);
1152    /// # assert_eq!(42, futures::executor::block_on(rx).unwrap());
1153    /// ```
1154    #[cfg(feature = "compat")]
1155    #[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
1156    fn compat(self) -> Compat<Self>
1157    where
1158        Self: Sized + Unpin,
1159    {
1160        Compat::new(self)
1161    }
1162
1163    /// Adapter that converts this stream into an [`AsyncBufRead`](crate::io::AsyncBufRead).
1164    ///
1165    /// This method is only available when the `std` feature of this
1166    /// library is activated, and it is activated by default.
1167    ///
1168    /// # Examples
1169    ///
1170    /// ```
1171    /// # futures::executor::block_on(async {
1172    /// use futures::stream::{self, TryStreamExt};
1173    /// use futures::io::AsyncReadExt;
1174    ///
1175    /// let stream = stream::iter([Ok(vec![1, 2, 3]), Ok(vec![4, 5])]);
1176    /// let mut reader = stream.into_async_read();
1177    ///
1178    /// let mut buf = Vec::new();
1179    /// reader.read_to_end(&mut buf).await.unwrap();
1180    /// assert_eq!(buf, [1, 2, 3, 4, 5]);
1181    /// # })
1182    /// ```
1183    #[cfg(feature = "io")]
1184    #[cfg_attr(docsrs, doc(cfg(feature = "io")))]
1185    #[cfg(feature = "std")]
1186    fn into_async_read(self) -> IntoAsyncRead<Self>
1187    where
1188        Self: Sized + TryStreamExt<Error = std::io::Error>,
1189        Self::Ok: AsRef<[u8]>,
1190    {
1191        crate::io::assert_read(IntoAsyncRead::new(self))
1192    }
1193
1194    /// Attempt to execute a predicate over an asynchronous stream and evaluate if all items
1195    /// satisfy the predicate. Exits early if an `Err` is encountered or if an `Ok` item is found
1196    /// that does not satisfy the predicate.
1197    ///
1198    /// # Examples
1199    ///
1200    /// ```
1201    /// # futures::executor::block_on(async {
1202    /// use futures::stream::{self, StreamExt, TryStreamExt};
1203    /// use std::convert::Infallible;
1204    ///
1205    /// let number_stream = stream::iter(1..10).map(Ok::<_, Infallible>);
1206    /// let positive = number_stream.try_all(|i| async move { i > 0 });
1207    /// assert_eq!(positive.await, Ok(true));
1208    ///
1209    /// let stream_with_errors = stream::iter([Ok(1), Err("err"), Ok(3)]);
1210    /// let positive = stream_with_errors.try_all(|i| async move { i > 0 });
1211    /// assert_eq!(positive.await, Err("err"));
1212    /// # });
1213    /// ```
1214    fn try_all<Fut, F>(self, f: F) -> TryAll<Self, Fut, F>
1215    where
1216        Self: Sized,
1217        F: FnMut(Self::Ok) -> Fut,
1218        Fut: Future<Output = bool>,
1219    {
1220        assert_future::<Result<bool, Self::Error>, _>(TryAll::new(self, f))
1221    }
1222
1223    /// Attempt to execute a predicate over an asynchronous stream and evaluate if any items
1224    /// satisfy the predicate. Exits early if an `Err` is encountered or if an `Ok` item is found
1225    /// that satisfies the predicate.
1226    ///
1227    /// # Examples
1228    ///
1229    /// ```
1230    /// # futures::executor::block_on(async {
1231    /// use futures::stream::{self, StreamExt, TryStreamExt};
1232    /// use std::convert::Infallible;
1233    ///
1234    /// let number_stream = stream::iter(0..10).map(Ok::<_, Infallible>);
1235    /// let contain_three = number_stream.try_any(|i| async move { i == 3 });
1236    /// assert_eq!(contain_three.await, Ok(true));
1237    ///
1238    /// let stream_with_errors = stream::iter([Ok(1), Err("err"), Ok(3)]);
1239    /// let contain_three = stream_with_errors.try_any(|i| async move { i == 3 });
1240    /// assert_eq!(contain_three.await, Err("err"));
1241    /// # });
1242    /// ```
1243    fn try_any<Fut, F>(self, f: F) -> TryAny<Self, Fut, F>
1244    where
1245        Self: Sized,
1246        F: FnMut(Self::Ok) -> Fut,
1247        Fut: Future<Output = bool>,
1248    {
1249        assert_future::<Result<bool, Self::Error>, _>(TryAny::new(self, f))
1250    }
1251}