async_utils/
futures.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

//! Provides utilities for working with futures.

use std::pin::Pin;

use futures::future::FusedFuture;
use futures::{task, Future};
use pin_project::pin_project;

/// Future for the [`FutureExt::replace_value`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[pin_project]
pub struct ReplaceValue<Fut: Future<Output = ()>, T> {
    #[pin]
    future: Fut,
    value: Option<T>,
}

/// An extension trait for [`futures::Future`] that provides specialized adapters.
pub trait FutureExt: Future<Output = ()> {
    /// Map this future's output to a different type, returning a new future of
    /// the resulting type.
    ///
    /// This function is similar to futures::FutureExt::map except:
    ///
    /// - it takes a value instead of a closure
    ///
    /// - it returns a type that can be named
    ///
    /// This function is useful when a mapped future is needed and boxing is not
    /// desired.
    fn replace_value<T>(self, value: T) -> ReplaceValue<Self, T>
    where
        Self: Sized,
    {
        ReplaceValue::new(self, value)
    }
}

impl<Fut: Future<Output = ()>, T> ReplaceValue<Fut, T> {
    fn new(future: Fut, value: T) -> Self {
        Self { future, value: Some(value) }
    }
}

impl<T: ?Sized + Future<Output = ()>> FutureExt for T {}

impl<Fut: Future<Output = ()>, T: Unpin> Future for ReplaceValue<Fut, T> {
    type Output = T;

    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
        let this = self.project();
        let () = futures::ready!(this.future.poll(cx));
        task::Poll::Ready(
            this.value
                .take()
                .expect("ReplaceValue must not be polled after it returned `Poll::Ready`"),
        )
    }
}

impl<Fut: Future<Output = ()> + Unpin, T: Unpin> FusedFuture for ReplaceValue<Fut, T> {
    fn is_terminated(&self) -> bool {
        self.value.is_none()
    }
}

/// A future that yields to the executor only once.
///
/// This future returns [`Poll::Pending`] the first time it's polled after
/// waking the context waker. This effectively yields the currently running task
/// to the executor, but puts it back in the executor's ready task queue.
///
/// Example:
/// ```
/// loop {
///   let read = read_big_thing().await;
///
///   while let Some(x) = read.next() {
///     process_one_thing(x);
///     YieldToExecutorOnce::new().await;
///   }
/// }
/// ```
#[derive(Default)]
pub struct YieldToExecutorOnce(YieldToExecutorOnceInner);

#[derive(Default)]
enum YieldToExecutorOnceInner {
    #[default]
    NotPolled,
    Ready,
    Terminated,
}

impl YieldToExecutorOnce {
    /// Creates a new `YieldToExecutorOnce`.
    pub fn new() -> Self {
        Self::default()
    }
}

impl Future for YieldToExecutorOnce {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
        let Self(inner) = self.get_mut();
        match *inner {
            YieldToExecutorOnceInner::NotPolled => {
                *inner = YieldToExecutorOnceInner::Ready;
                // Wake the executor before returning pending. We only want to yield
                // once.
                cx.waker().wake_by_ref();
                task::Poll::Pending
            }
            YieldToExecutorOnceInner::Ready => {
                *inner = YieldToExecutorOnceInner::Terminated;
                task::Poll::Ready(())
            }
            YieldToExecutorOnceInner::Terminated => {
                panic!("polled future after completion");
            }
        }
    }
}

impl FusedFuture for YieldToExecutorOnce {
    fn is_terminated(&self) -> bool {
        let Self(inner) = self;
        match inner {
            YieldToExecutorOnceInner::Ready | YieldToExecutorOnceInner::NotPolled => false,
            YieldToExecutorOnceInner::Terminated => true,
        }
    }
}

#[cfg(test)]
mod tests {
    use fuchsia_async as fasync;

    #[fasync::run_singlethreaded(test)]
    async fn replace_value_trivial() {
        use super::FutureExt as _;

        let value = "hello world";
        assert_eq!(futures::future::ready(()).replace_value(value).await, value);
    }

    #[test]
    fn replace_value_is_terminated() {
        use super::FutureExt as _;
        use futures::future::{FusedFuture as _, FutureExt as _};

        let fut = &mut futures::future::ready(()).replace_value(());
        assert!(!fut.is_terminated());
        assert_eq!(fut.now_or_never(), Some(()));
        assert!(fut.is_terminated());
    }

    #[test]
    fn yield_to_executor_once() {
        use futures::future::FusedFuture as _;
        use futures::FutureExt as _;

        let (waker, count) = futures_test::task::new_count_waker();
        let mut context = std::task::Context::from_waker(&waker);
        let mut fut = super::YieldToExecutorOnce::new();

        assert!(!fut.is_terminated());
        assert_eq!(count, 0);
        assert_eq!(fut.poll_unpin(&mut context), std::task::Poll::Pending);
        assert!(!fut.is_terminated());
        assert_eq!(count, 1);
        assert_eq!(fut.poll_unpin(&mut context), std::task::Poll::Ready(()));
        assert!(fut.is_terminated());
        // The waker is never hit again.
        assert_eq!(count, 1);
    }
}