async_utils/stream.rs
1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4//! Streams always signal exhaustion with `None` return values. A stream epitaph can be used when
5//! a specific value is desired as the last item returned by a stream before it is exhausted.
6//!
7//! Example usecase: often streams will be used without having direct access to the stream itself
8//! such as from a `streammap::StreamMap` or a `futures::stream::FuturesUnordered`. Occasionally,
9//! it is necessary to perform some cleanup procedure outside of a stream when it is exhausted. An
10//! `epitaph` can be used to uniquely identify which stream has ended within a collection of
11//! streams.
12
13use core::pin::Pin;
14use core::task::{Context, Poll};
15use futures::stream::{FusedStream, Stream};
16use futures::Future;
17use pin_project::pin_project;
18
19mod future_map;
20mod one_or_many;
21mod short_circuit;
22mod stream_map;
23
24pub use future_map::FutureMap;
25pub use one_or_many::OneOrMany;
26pub use short_circuit::ShortCircuit;
27pub use stream_map::StreamMap;
28
29/// Values returned from a stream with an epitaph are of type `StreamItem`.
30#[derive(Debug, PartialEq)]
31pub enum StreamItem<T, E> {
32 /// Item polled from the underlying `Stream`
33 Item(T),
34 /// Epitaph value returned after the underlying `Stream` is exhausted.
35 Epitaph(E),
36}
37
38/// A `Stream` that returns the values of the wrapped stream until the wrapped stream is exhausted.
39/// Then it returns a single epitaph value before being exhausted
40#[cfg_attr(test, derive(Debug))]
41pub struct StreamWithEpitaph<S, E> {
42 inner: S,
43 epitaph: Option<E>,
44}
45
46impl<S, E> StreamWithEpitaph<S, E> {
47 /// Provide immutable access to the inner stream.
48 /// This is safe as if the stream were being polled, we would not be able to access a
49 /// reference to self to pass to this method.
50 pub fn inner(&self) -> &S {
51 &self.inner
52 }
53
54 /// Provide mutable access to the inner stream.
55 /// This is safe as if the stream were being polled, we would not be able to access a mutable
56 /// reference to self to pass to this method.
57 pub fn inner_mut(&mut self) -> &mut S {
58 &mut self.inner
59 }
60}
61
62// The `Unpin` bounds are not strictly necessary, but make for a more convenient
63// implementation. The bounds can be relaxed if !Unpin support is desired.
64impl<S, T, E> Stream for StreamWithEpitaph<S, E>
65where
66 S: Stream<Item = T> + Unpin,
67 E: Unpin,
68 T: Unpin,
69{
70 type Item = StreamItem<T, E>;
71 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
72 if self.epitaph.is_none() {
73 return Poll::Ready(None);
74 }
75 match Pin::new(&mut self.inner).poll_next(cx) {
76 Poll::Ready(None) => {
77 let this = self.get_mut();
78 let ep = this.epitaph.take().map(StreamItem::Epitaph);
79 assert!(ep.is_some(), "epitaph must be present if stream is not terminated");
80 Poll::Ready(ep)
81 }
82 Poll::Ready(item) => Poll::Ready(item.map(StreamItem::Item)),
83 Poll::Pending => Poll::Pending,
84 }
85 }
86}
87
88impl<S, T, E> FusedStream for StreamWithEpitaph<S, E>
89where
90 S: Stream<Item = T> + FusedStream + Unpin,
91 E: Unpin,
92 T: Unpin,
93{
94 fn is_terminated(&self) -> bool {
95 self.epitaph.is_none()
96 }
97}
98
99/// Extension trait to allow for easy creation of a `StreamWithEpitaph` from a `Stream`.
100pub trait WithEpitaph: Sized {
101 /// Map this stream to one producing a `StreamItem::Item` value for each item of the stream
102 /// followed by a single `StreamItem::Epitaph` value with the provided `epitaph`.
103 fn with_epitaph<E>(self, epitaph: E) -> StreamWithEpitaph<Self, E>;
104}
105
106impl<T> WithEpitaph for T
107where
108 T: Stream,
109{
110 fn with_epitaph<E>(self, epitaph: E) -> StreamWithEpitaph<T, E> {
111 StreamWithEpitaph { inner: self, epitaph: Some(epitaph) }
112 }
113}
114
115/// A Stream where each yielded item is tagged with a uniform key
116/// Items yielded are (K, St::Item)
117///
118/// Tagged streams can be easily created by using the `.tagged()` function on the `WithTag` trait.
119/// The stream produced by:
120/// stream.tagged(k)
121/// is equivalent to that created by
122/// stream.map(move |v|, (k.clone(), v)
123/// BUT the Tagged type combinator provides a statically nameable type that can easily be expressed
124/// in type signatures such as `IndexedStreams` below.
125#[cfg_attr(test, derive(Debug))]
126#[pin_project]
127pub struct Tagged<K, St> {
128 tag: K,
129 #[pin]
130 stream: St,
131}
132
133impl<K: Clone, St> Tagged<K, St> {
134 /// Get a clone of the tag associated with this `Stream`.
135 pub fn tag(&self) -> K {
136 self.tag.clone()
137 }
138}
139
140/// Extension trait to allow for easy creation of a `Tagged` stream from a `Stream`.
141pub trait WithTag: Sized {
142 /// Produce a new stream from this one which yields item tupled with a constant tag
143 fn tagged<T>(self, tag: T) -> Tagged<T, Self>;
144}
145
146impl<St: Sized> WithTag for St {
147 fn tagged<T>(self, tag: T) -> Tagged<T, Self> {
148 Tagged { tag, stream: self }
149 }
150}
151
152impl<K: Clone, Fut: Future> Future for Tagged<K, Fut> {
153 type Output = (K, Fut::Output);
154
155 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
156 let k = self.tag.clone();
157 match self.project().stream.poll(cx) {
158 Poll::Ready(out) => Poll::Ready((k, out)),
159 Poll::Pending => Poll::Pending,
160 }
161 }
162}
163
164impl<K: Clone, St: Stream> Stream for Tagged<K, St> {
165 type Item = (K, St::Item);
166
167 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
168 let k = self.tag.clone();
169 match self.project().stream.poll_next(cx) {
170 Poll::Ready(Some(item)) => Poll::Ready(Some((k, item))),
171 Poll::Ready(None) => Poll::Ready(None),
172 Poll::Pending => Poll::Pending,
173 }
174 }
175}
176
177/// Convenient alias for a collection of Streams indexed by key where each message is tagged and
178/// stream termination is notified by key. This is especially useful for maintaining a collection
179/// of fidl client request streams, and being notified when each terminates
180pub type IndexedStreams<K, St> = StreamMap<K, StreamWithEpitaph<Tagged<K, St>, K>>;
181
182#[cfg(test)]
183mod test {
184 //! We validate the behavior of the StreamMap stream by enumerating all possible external
185 //! events, and then generating permutations of valid sequences of those events. These model
186 //! the possible executions sequences the stream could go through in program execution. We
187 //! then assert that:
188 //! a) At all points during execution, all invariants are held
189 //! b) The final result is as expected
190 //!
191 //! In this case, the invariants are:
192 //! * If the map is empty, it is pending
193 //! * If all streams are pending, the map is pending
194 //! * otherwise the map is ready
195 //!
196 //! The result is:
197 //! * All test messages have been injected
198 //! * All test messages have been yielded
199 //! * All test streams have terminated
200 //! * No event is yielded with a given key after the stream for that key has terminated
201 //!
202 //! Together these show:
203 //! * Progress is always eventually made - the Stream cannot be stalled
204 //! * All inserted elements will eventually be yielded
205 //! * Elements are never duplicated
206 use super::*;
207 use core::hash::Hash;
208 use fuchsia_async as fasync;
209 use futures::channel::mpsc;
210 use futures::future::ready;
211 use futures::stream::{empty, iter, once, Empty, StreamExt};
212 use proptest::prelude::*;
213 use std::collections::HashSet;
214 use std::fmt::Debug;
215
216 #[fasync::run_until_stalled(test)]
217 async fn empty_stream_returns_epitaph_only() {
218 let s: Empty<i32> = empty();
219 let s = s.with_epitaph(0i64);
220 let actual: Vec<_> = s.collect().await;
221 let expected = vec![StreamItem::Epitaph(0i64)];
222 assert_eq!(actual, expected);
223 }
224
225 #[fasync::run_until_stalled(test)]
226 async fn populated_stream_returns_items_and_epitaph() {
227 let s = iter(0i32..3).fuse().with_epitaph(3i64);
228 let actual: Vec<_> = StreamExt::collect::<Vec<_>>(s).await;
229 let expected = vec![
230 StreamItem::Item(0),
231 StreamItem::Item(1),
232 StreamItem::Item(2),
233 StreamItem::Epitaph(3i64),
234 ];
235 assert_eq!(actual, expected);
236 }
237
238 #[fasync::run_until_stalled(test)]
239 async fn stream_is_terminated_after_end() {
240 let mut s = once(ready(0i32)).with_epitaph(3i64);
241 assert_eq!(s.next().await, Some(StreamItem::Item(0)));
242 assert_eq!(s.next().await, Some(StreamItem::Epitaph(3)));
243 assert!(s.is_terminated());
244 }
245
246 // We validate the behavior of the StreamMap stream by enumerating all possible external
247 // events, and then generating permutations of valid sequences of those events. These model
248 // the possible executions sequences the stream could go through in program execution. We
249 // then assert that:
250 // a) At all points during execution, all invariants are held
251 // b) The final result is as expected
252 //
253 // In this case, the invariants are:
254 // * If the map is empty, it is pending
255 // * If all streams are pending, the map is pending
256 // * otherwise the map is ready
257 //
258 // The result is:
259 // * All test messages have been injected
260 // * All test messages have been yielded
261 // * All test streams have terminated
262 // * No event is yielded with a given key after the stream for that key has terminated
263 //
264 // Together these show:
265 // * Progress is always eventually made - the Stream cannot be stalled
266 // * All inserted elements will eventually be yielded
267 // * Elements are never duplicated
268
269 /// Possible actions to take in evaluating the stream
270 enum Event<K> {
271 /// Insert a new request stream
272 InsertStream(K, mpsc::Receiver<Result<u64, ()>>),
273 /// Send a new request
274 SendRequest(K, mpsc::Sender<Result<u64, ()>>),
275 /// Close an existing request stream
276 CloseStream(K, mpsc::Sender<Result<u64, ()>>),
277 /// Schedule the executor. The executor will only run the task if awoken, otherwise it will
278 /// do nothing
279 Execute,
280 }
281
282 impl<K: Debug> Debug for Event<K> {
283 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
284 match self {
285 Event::InsertStream(k, _) => write!(f, "InsertStream({:?})", k),
286 Event::SendRequest(k, _) => write!(f, "SendRequest({:?})", k),
287 Event::CloseStream(k, _) => write!(f, "CloseStream({:?})", k),
288 Event::Execute => write!(f, "Execute"),
289 }
290 }
291 }
292
293 fn stream_events<K: Clone + Eq + Hash>(key: K) -> Vec<Event<K>> {
294 // Ensure that the channel is big enough to always handle all the Sends we make
295 let (sender, receiver) = mpsc::channel::<Result<u64, ()>>(10);
296 vec![
297 Event::InsertStream(key.clone(), receiver),
298 Event::SendRequest(key.clone(), sender.clone()),
299 Event::CloseStream(key, sender),
300 ]
301 }
302
303 /// Determine how many events are sent on open channels (a channel is open if it has not been
304 /// closed, even if it has not yet been inserted into the StreamMap)
305 fn expected_yield<K: Eq + Hash>(events: &Vec<Event<K>>) -> usize {
306 events
307 .iter()
308 .fold((HashSet::new(), 0), |(mut terminated, closed), event| match event {
309 Event::CloseStream(k, _) => {
310 let _: bool = terminated.insert(k);
311 (terminated, closed)
312 }
313 Event::SendRequest(k, _) if !terminated.contains(k) => (terminated, closed + 1),
314 _ => (terminated, closed),
315 })
316 .1
317 }
318
319 /// Strategy that produces random permutations of a set of events, corresponding to inserting,
320 /// sending and completing up to n different streams in random order, also interspersed with
321 /// running the executor
322 fn execution_sequences(n: u64) -> impl Strategy<Value = Vec<Event<u64>>> {
323 fn generate_events(n: u64) -> Vec<Event<u64>> {
324 let mut events = (0..n).flat_map(|n| stream_events(n)).collect::<Vec<_>>();
325 events.extend(std::iter::repeat_with(|| Event::Execute).take((n * 3) as usize));
326 events
327 }
328
329 // We want to produce random permutations of these events
330 (0..n).prop_map(generate_events).prop_shuffle()
331 }
332
333 proptest! {
334 #[test]
335 fn test_invariants(mut execution in execution_sequences(4)) {
336 let expected = expected_yield(&execution);
337 let expected_count:u64 = execution.iter()
338 .filter(|event| match event {
339 Event::CloseStream(_, _) => true,
340 _ => false,
341 }).count() as u64;
342
343 // Add enough execution events to ensure we will complete, no matter the order
344 execution.extend(std::iter::repeat_with(|| Event::Execute).take((expected_count * 3) as usize));
345
346 let (waker, count) = futures_test::task::new_count_waker();
347 let send_waker = futures_test::task::noop_waker();
348 let mut streams = StreamMap::empty();
349 let mut next_wake = 0;
350 let mut yielded = 0;
351 let mut inserted = 0;
352 let mut closed = 0;
353 let mut events = vec![];
354 for event in execution {
355 match event {
356 Event::InsertStream(key, stream) => {
357 assert_matches::assert_matches!(streams.insert(key, stream.tagged(key).with_epitaph(key)), None);
358 // StreamMap does *not* wake on inserting new streams, matching the
359 // behavior of streams::SelectAll. The client *must* arrange for it to be
360 // polled again after a stream is inserted; we model that here by forcing a
361 // wake up
362 next_wake = count.get();
363 }
364 Event::SendRequest(_, mut sender) => {
365 if let Poll::Ready(Ok(())) = sender.poll_ready(&mut Context::from_waker(&send_waker)) {
366 prop_assert_eq!(sender.start_send(Ok(1)), Ok(()));
367 inserted = inserted + 1;
368 }
369 }
370 Event::CloseStream(_, mut stream) => {
371 stream.close_channel();
372 }
373 Event::Execute if count.get() >= next_wake => {
374 match Pin::new(&mut streams.next()).poll(&mut Context::from_waker(&waker)) {
375 Poll::Ready(Some(StreamItem::Item((k, v)))) => {
376 events.push(StreamItem::Item((k, v)));
377 yielded = yielded + 1;
378 // Ensure that we wake up next time;
379 next_wake = count.get();
380 // Invariant: stream(k) must be in the map
381 prop_assert!(streams.contains_key(&k))
382 }
383 Poll::Ready(Some(StreamItem::Epitaph(k))) => {
384 events.push(StreamItem::Epitaph(k));
385 closed = closed + 1;
386 // Ensure that we wake up next time;
387 next_wake = count.get();
388 // stream(k) is now terminated, but until polled again (Yielding
389 // `None`), will still be in the map
390 }
391 Poll::Ready(None) => {
392 // the Stream impl for StreamMap never completes
393 unreachable!()
394 }
395 Poll::Pending => {
396 next_wake = count.get() + 1;
397 }
398 };
399 }
400 Event::Execute => (),
401 }
402 }
403 prop_assert_eq!(inserted, expected, "All expected requests inserted");
404 prop_assert_eq!((next_wake, count.get(), yielded), (next_wake, count.get(), expected), "All expected requests yielded");
405 prop_assert_eq!(closed, expected_count, "All streams closed");
406 let not_terminated =
407 |key: u64, e: &StreamItem<(u64, Result<u64, ()>), u64>| match e {
408 StreamItem::Epitaph(k) if *k == key => false,
409 _ => true,
410 };
411 let event_of =
412 |key: u64, e: &StreamItem<(u64, Result<u64, ()>), u64>| match e {
413 StreamItem::Item((k, _)) if *k == key => true,
414 _ => false,
415 };
416 let all_keys = 0..expected_count;
417 for k in all_keys {
418 prop_assert!(!streams.contains_key(&k), "All streams should now have been removed");
419 prop_assert!(!events.iter().skip_while(|e| not_terminated(k, e)).any(|e| event_of(k, e)), "No events should have been yielded from a stream after it terminated");
420 }
421 }
422 }
423}