async_utils/
stream.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4//! Streams always signal exhaustion with `None` return values. A stream epitaph can be used when
5//! a specific value is desired as the last item returned by a stream before it is exhausted.
6//!
7//! Example usecase: often streams will be used without having direct access to the stream itself
8//! such as from a `streammap::StreamMap` or a `futures::stream::FuturesUnordered`. Occasionally,
9//! it is necessary to perform some cleanup procedure outside of a stream when it is exhausted. An
10//! `epitaph` can be used to uniquely identify which stream has ended within a collection of
11//! streams.
12
13use core::pin::Pin;
14use core::task::{Context, Poll};
15use futures::stream::{FusedStream, Stream};
16use futures::Future;
17use pin_project::pin_project;
18
19mod future_map;
20mod one_or_many;
21mod short_circuit;
22mod stream_map;
23
24pub use future_map::FutureMap;
25pub use one_or_many::OneOrMany;
26pub use short_circuit::ShortCircuit;
27pub use stream_map::StreamMap;
28
29/// Values returned from a stream with an epitaph are of type `StreamItem`.
30#[derive(Debug, PartialEq)]
31pub enum StreamItem<T, E> {
32    /// Item polled from the underlying `Stream`
33    Item(T),
34    /// Epitaph value returned after the underlying `Stream` is exhausted.
35    Epitaph(E),
36}
37
38/// A `Stream` that returns the values of the wrapped stream until the wrapped stream is exhausted.
39/// Then it returns a single epitaph value before being exhausted
40#[cfg_attr(test, derive(Debug))]
41pub struct StreamWithEpitaph<S, E> {
42    inner: S,
43    epitaph: Option<E>,
44}
45
46impl<S, E> StreamWithEpitaph<S, E> {
47    /// Provide immutable access to the inner stream.
48    /// This is safe as if the stream were being polled, we would not be able to access a
49    /// reference to self to pass to this method.
50    pub fn inner(&self) -> &S {
51        &self.inner
52    }
53
54    /// Provide mutable access to the inner stream.
55    /// This is safe as if the stream were being polled, we would not be able to access a mutable
56    /// reference to self to pass to this method.
57    pub fn inner_mut(&mut self) -> &mut S {
58        &mut self.inner
59    }
60}
61
62// The `Unpin` bounds are not strictly necessary, but make for a more convenient
63// implementation. The bounds can be relaxed if !Unpin support is desired.
64impl<S, T, E> Stream for StreamWithEpitaph<S, E>
65where
66    S: Stream<Item = T> + Unpin,
67    E: Unpin,
68    T: Unpin,
69{
70    type Item = StreamItem<T, E>;
71    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
72        if self.epitaph.is_none() {
73            return Poll::Ready(None);
74        }
75        match Pin::new(&mut self.inner).poll_next(cx) {
76            Poll::Ready(None) => {
77                let this = self.get_mut();
78                let ep = this.epitaph.take().map(StreamItem::Epitaph);
79                assert!(ep.is_some(), "epitaph must be present if stream is not terminated");
80                Poll::Ready(ep)
81            }
82            Poll::Ready(item) => Poll::Ready(item.map(StreamItem::Item)),
83            Poll::Pending => Poll::Pending,
84        }
85    }
86}
87
88impl<S, T, E> FusedStream for StreamWithEpitaph<S, E>
89where
90    S: Stream<Item = T> + FusedStream + Unpin,
91    E: Unpin,
92    T: Unpin,
93{
94    fn is_terminated(&self) -> bool {
95        self.epitaph.is_none()
96    }
97}
98
99/// Extension trait to allow for easy creation of a `StreamWithEpitaph` from a `Stream`.
100pub trait WithEpitaph: Sized {
101    /// Map this stream to one producing a `StreamItem::Item` value for each item of the stream
102    /// followed by a single `StreamItem::Epitaph` value with the provided `epitaph`.
103    fn with_epitaph<E>(self, epitaph: E) -> StreamWithEpitaph<Self, E>;
104}
105
106impl<T> WithEpitaph for T
107where
108    T: Stream,
109{
110    fn with_epitaph<E>(self, epitaph: E) -> StreamWithEpitaph<T, E> {
111        StreamWithEpitaph { inner: self, epitaph: Some(epitaph) }
112    }
113}
114
115/// A Stream where each yielded item is tagged with a uniform key
116/// Items yielded are (K, St::Item)
117///
118/// Tagged streams can be easily created by using the `.tagged()` function on the `WithTag` trait.
119/// The stream produced by:
120///   stream.tagged(k)
121/// is equivalent to that created by
122///   stream.map(move |v|, (k.clone(), v)
123/// BUT the Tagged type combinator provides a statically nameable type that can easily be expressed
124/// in type signatures such as `IndexedStreams` below.
125#[cfg_attr(test, derive(Debug))]
126#[pin_project]
127pub struct Tagged<K, St> {
128    tag: K,
129    #[pin]
130    stream: St,
131}
132
133impl<K: Clone, St> Tagged<K, St> {
134    /// Get a clone of the tag associated with this `Stream`.
135    pub fn tag(&self) -> K {
136        self.tag.clone()
137    }
138}
139
140/// Extension trait to allow for easy creation of a `Tagged` stream from a `Stream`.
141pub trait WithTag: Sized {
142    /// Produce a new stream from this one which yields item tupled with a constant tag
143    fn tagged<T>(self, tag: T) -> Tagged<T, Self>;
144}
145
146impl<St: Sized> WithTag for St {
147    fn tagged<T>(self, tag: T) -> Tagged<T, Self> {
148        Tagged { tag, stream: self }
149    }
150}
151
152impl<K: Clone, Fut: Future> Future for Tagged<K, Fut> {
153    type Output = (K, Fut::Output);
154
155    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
156        let k = self.tag.clone();
157        match self.project().stream.poll(cx) {
158            Poll::Ready(out) => Poll::Ready((k, out)),
159            Poll::Pending => Poll::Pending,
160        }
161    }
162}
163
164impl<K: Clone, St: Stream> Stream for Tagged<K, St> {
165    type Item = (K, St::Item);
166
167    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
168        let k = self.tag.clone();
169        match self.project().stream.poll_next(cx) {
170            Poll::Ready(Some(item)) => Poll::Ready(Some((k, item))),
171            Poll::Ready(None) => Poll::Ready(None),
172            Poll::Pending => Poll::Pending,
173        }
174    }
175}
176
177/// Convenient alias for a collection of Streams indexed by key where each message is tagged and
178/// stream termination is notified by key. This is especially useful for maintaining a collection
179/// of fidl client request streams, and being notified when each terminates
180pub type IndexedStreams<K, St> = StreamMap<K, StreamWithEpitaph<Tagged<K, St>, K>>;
181
182#[cfg(test)]
183mod test {
184    //! We validate the behavior of the StreamMap stream by enumerating all possible external
185    //! events, and then generating permutations of valid sequences of those events. These model
186    //! the possible executions sequences the stream could go through in program execution. We
187    //! then assert that:
188    //!   a) At all points during execution, all invariants are held
189    //!   b) The final result is as expected
190    //!
191    //! In this case, the invariants are:
192    //!   * If the map is empty, it is pending
193    //!   * If all streams are pending, the map is pending
194    //!   * otherwise the map is ready
195    //!
196    //! The result is:
197    //!   * All test messages have been injected
198    //!   * All test messages have been yielded
199    //!   * All test streams have terminated
200    //!   * No event is yielded with a given key after the stream for that key has terminated
201    //!
202    //! Together these show:
203    //!   * Progress is always eventually made - the Stream cannot be stalled
204    //!   * All inserted elements will eventually be yielded
205    //!   * Elements are never duplicated
206    use super::*;
207    use core::hash::Hash;
208    use fuchsia_async as fasync;
209    use futures::channel::mpsc;
210    use futures::future::ready;
211    use futures::stream::{empty, iter, once, Empty, StreamExt};
212    use proptest::prelude::*;
213    use std::collections::HashSet;
214    use std::fmt::Debug;
215
216    #[fasync::run_until_stalled(test)]
217    async fn empty_stream_returns_epitaph_only() {
218        let s: Empty<i32> = empty();
219        let s = s.with_epitaph(0i64);
220        let actual: Vec<_> = s.collect().await;
221        let expected = vec![StreamItem::Epitaph(0i64)];
222        assert_eq!(actual, expected);
223    }
224
225    #[fasync::run_until_stalled(test)]
226    async fn populated_stream_returns_items_and_epitaph() {
227        let s = iter(0i32..3).fuse().with_epitaph(3i64);
228        let actual: Vec<_> = StreamExt::collect::<Vec<_>>(s).await;
229        let expected = vec![
230            StreamItem::Item(0),
231            StreamItem::Item(1),
232            StreamItem::Item(2),
233            StreamItem::Epitaph(3i64),
234        ];
235        assert_eq!(actual, expected);
236    }
237
238    #[fasync::run_until_stalled(test)]
239    async fn stream_is_terminated_after_end() {
240        let mut s = once(ready(0i32)).with_epitaph(3i64);
241        assert_eq!(s.next().await, Some(StreamItem::Item(0)));
242        assert_eq!(s.next().await, Some(StreamItem::Epitaph(3)));
243        assert!(s.is_terminated());
244    }
245
246    // We validate the behavior of the StreamMap stream by enumerating all possible external
247    // events, and then generating permutations of valid sequences of those events. These model
248    // the possible executions sequences the stream could go through in program execution. We
249    // then assert that:
250    //   a) At all points during execution, all invariants are held
251    //   b) The final result is as expected
252    //
253    // In this case, the invariants are:
254    //   * If the map is empty, it is pending
255    //   * If all streams are pending, the map is pending
256    //   * otherwise the map is ready
257    //
258    // The result is:
259    //   * All test messages have been injected
260    //   * All test messages have been yielded
261    //   * All test streams have terminated
262    //   * No event is yielded with a given key after the stream for that key has terminated
263    //
264    // Together these show:
265    //   * Progress is always eventually made - the Stream cannot be stalled
266    //   * All inserted elements will eventually be yielded
267    //   * Elements are never duplicated
268
269    /// Possible actions to take in evaluating the stream
270    enum Event<K> {
271        /// Insert a new request stream
272        InsertStream(K, mpsc::Receiver<Result<u64, ()>>),
273        /// Send a new request
274        SendRequest(K, mpsc::Sender<Result<u64, ()>>),
275        /// Close an existing request stream
276        CloseStream(K, mpsc::Sender<Result<u64, ()>>),
277        /// Schedule the executor. The executor will only run the task if awoken, otherwise it will
278        /// do nothing
279        Execute,
280    }
281
282    impl<K: Debug> Debug for Event<K> {
283        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
284            match self {
285                Event::InsertStream(k, _) => write!(f, "InsertStream({:?})", k),
286                Event::SendRequest(k, _) => write!(f, "SendRequest({:?})", k),
287                Event::CloseStream(k, _) => write!(f, "CloseStream({:?})", k),
288                Event::Execute => write!(f, "Execute"),
289            }
290        }
291    }
292
293    fn stream_events<K: Clone + Eq + Hash>(key: K) -> Vec<Event<K>> {
294        // Ensure that the channel is big enough to always handle all the Sends we make
295        let (sender, receiver) = mpsc::channel::<Result<u64, ()>>(10);
296        vec![
297            Event::InsertStream(key.clone(), receiver),
298            Event::SendRequest(key.clone(), sender.clone()),
299            Event::CloseStream(key, sender),
300        ]
301    }
302
303    /// Determine how many events are sent on open channels (a channel is open if it has not been
304    /// closed, even if it has not yet been inserted into the StreamMap)
305    fn expected_yield<K: Eq + Hash>(events: &Vec<Event<K>>) -> usize {
306        events
307            .iter()
308            .fold((HashSet::new(), 0), |(mut terminated, closed), event| match event {
309                Event::CloseStream(k, _) => {
310                    let _: bool = terminated.insert(k);
311                    (terminated, closed)
312                }
313                Event::SendRequest(k, _) if !terminated.contains(k) => (terminated, closed + 1),
314                _ => (terminated, closed),
315            })
316            .1
317    }
318
319    /// Strategy that produces random permutations of a set of events, corresponding to inserting,
320    /// sending and completing up to n different streams in random order, also interspersed with
321    /// running the executor
322    fn execution_sequences(n: u64) -> impl Strategy<Value = Vec<Event<u64>>> {
323        fn generate_events(n: u64) -> Vec<Event<u64>> {
324            let mut events = (0..n).flat_map(|n| stream_events(n)).collect::<Vec<_>>();
325            events.extend(std::iter::repeat_with(|| Event::Execute).take((n * 3) as usize));
326            events
327        }
328
329        // We want to produce random permutations of these events
330        (0..n).prop_map(generate_events).prop_shuffle()
331    }
332
333    proptest! {
334        #[test]
335        fn test_invariants(mut execution in execution_sequences(4)) {
336            let expected = expected_yield(&execution);
337            let expected_count:u64 = execution.iter()
338                .filter(|event| match event {
339                    Event::CloseStream(_, _) => true,
340                    _ => false,
341                }).count() as u64;
342
343            // Add enough execution events to ensure we will complete, no matter the order
344            execution.extend(std::iter::repeat_with(|| Event::Execute).take((expected_count * 3) as usize));
345
346            let (waker, count) = futures_test::task::new_count_waker();
347            let send_waker = futures_test::task::noop_waker();
348            let mut streams = StreamMap::empty();
349            let mut next_wake = 0;
350            let mut yielded = 0;
351            let mut inserted = 0;
352            let mut closed = 0;
353            let mut events = vec![];
354            for event in execution {
355                match event {
356                    Event::InsertStream(key, stream) => {
357                        assert_matches::assert_matches!(streams.insert(key, stream.tagged(key).with_epitaph(key)), None);
358                        // StreamMap does *not* wake on inserting new streams, matching the
359                        // behavior of streams::SelectAll. The client *must* arrange for it to be
360                        // polled again after a stream is inserted; we model that here by forcing a
361                        // wake up
362                        next_wake = count.get();
363                    }
364                    Event::SendRequest(_, mut sender) => {
365                        if let Poll::Ready(Ok(())) = sender.poll_ready(&mut Context::from_waker(&send_waker)) {
366                            prop_assert_eq!(sender.start_send(Ok(1)), Ok(()));
367                            inserted = inserted + 1;
368                        }
369                    }
370                    Event::CloseStream(_, mut stream) => {
371                        stream.close_channel();
372                    }
373                    Event::Execute if count.get() >= next_wake => {
374                        match Pin::new(&mut streams.next()).poll(&mut Context::from_waker(&waker)) {
375                            Poll::Ready(Some(StreamItem::Item((k, v)))) => {
376                                events.push(StreamItem::Item((k, v)));
377                                yielded = yielded + 1;
378                                // Ensure that we wake up next time;
379                                next_wake = count.get();
380                                // Invariant: stream(k) must be in the map
381                                prop_assert!(streams.contains_key(&k))
382                            }
383                            Poll::Ready(Some(StreamItem::Epitaph(k))) => {
384                                events.push(StreamItem::Epitaph(k));
385                                closed = closed + 1;
386                                // Ensure that we wake up next time;
387                                next_wake = count.get();
388                                // stream(k) is now terminated, but until polled again (Yielding
389                                // `None`), will still be in the map
390                            }
391                            Poll::Ready(None) => {
392                                // the Stream impl for StreamMap never completes
393                                unreachable!()
394                            }
395                            Poll::Pending => {
396                                next_wake = count.get() + 1;
397                            }
398                        };
399                    }
400                    Event::Execute => (),
401                }
402            }
403            prop_assert_eq!(inserted, expected, "All expected requests inserted");
404            prop_assert_eq!((next_wake, count.get(), yielded), (next_wake, count.get(), expected), "All expected requests yielded");
405            prop_assert_eq!(closed, expected_count, "All streams closed");
406            let not_terminated =
407                |key: u64, e: &StreamItem<(u64, Result<u64, ()>), u64>| match e {
408                    StreamItem::Epitaph(k) if *k == key => false,
409                    _ => true,
410                };
411            let event_of =
412                |key: u64, e: &StreamItem<(u64, Result<u64, ()>), u64>| match e {
413                    StreamItem::Item((k, _)) if *k == key => true,
414                    _ => false,
415                };
416            let all_keys = 0..expected_count;
417            for k in all_keys {
418                prop_assert!(!streams.contains_key(&k), "All streams should now have been removed");
419                prop_assert!(!events.iter().skip_while(|e| not_terminated(k, e)).any(|e| event_of(k, e)), "No events should have been yielded from a stream after it terminated");
420            }
421        }
422    }
423}