work_queue/
state.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::{Closed, TryMerge};
6use futures::channel::oneshot;
7use futures::future::{Either, Ready, Shared};
8use futures::prelude::*;
9use pin_project::pin_project;
10use std::collections::VecDeque;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14#[pin_project]
15pub(crate) struct TaskFuture<O> {
16    #[pin]
17    fut: Either<oneshot::Receiver<O>, Ready<Result<O, oneshot::Canceled>>>,
18}
19
20impl<O> Future for TaskFuture<O> {
21    type Output = Result<O, Closed>;
22
23    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
24        match self.project().fut.poll(cx) {
25            Poll::Pending => Poll::Pending,
26            Poll::Ready(res) => Poll::Ready(res.map_err(|oneshot::Canceled| Closed)),
27        }
28    }
29}
30
31/// Shared state for a pending work item. Contains the context parameter to be provided to the task
32/// when running it and other metadata present when running the task.
33pub struct PendingWorkInfo<C, O> {
34    context: C,
35    running: RunningWorkInfo<O>,
36}
37
38/// Shared state for a single running work item. Contains the sending end of the shared future for sending
39/// the result of the task and the clonable shared future that will resolve to that result.
40pub struct RunningWorkInfo<O> {
41    cb: oneshot::Sender<O>,
42    fut: Shared<TaskFuture<O>>,
43}
44
45/// Metadata about pending and running instances of a task.
46pub struct TaskVariants<C, O> {
47    running: Option<RunningWorkInfo<O>>,
48    pending: VecDeque<PendingWorkInfo<C, O>>,
49}
50
51impl<C, O> TaskVariants<C, O>
52where
53    C: TryMerge,
54    O: Clone,
55{
56    /// Creates a new TaskVariants with a single, pending instance of this task, returning Self and
57    /// the completion future for the initial instance of the task.
58    pub(crate) fn new(context: C) -> (Self, Shared<TaskFuture<O>>) {
59        let mut res = Self { running: None, pending: VecDeque::new() };
60        let fut = res.push_back(context);
61        (res, fut)
62    }
63
64    /// Attempts to merge the given context with an existing pending instance of this task, or
65    /// enqueues a new instance of this task with the given context. Returns the completion future
66    /// for the instance of the task.
67    pub(crate) fn push(&mut self, mut context: C) -> Shared<TaskFuture<O>> {
68        // First try to merge this task with another queued or running task.
69        for info in self.pending.iter_mut() {
70            if let Err(unmerged) = info.context.try_merge(context) {
71                context = unmerged;
72            } else {
73                return info.running.fut.clone();
74            }
75        }
76
77        // Otherwise, enqueue a new task.
78        self.push_back(context)
79    }
80
81    fn push_back(&mut self, context: C) -> Shared<TaskFuture<O>> {
82        let (sender, fut) = make_broadcast_pair();
83
84        self.pending.push_back(PendingWorkInfo {
85            context,
86            running: RunningWorkInfo { cb: sender, fut: fut.clone() },
87        });
88        fut
89    }
90}
91
92impl<C, O> TaskVariants<C, O> {
93    /// Returns true iff an instance of the task is running.
94    pub(crate) fn running(&self) -> bool {
95        self.running.is_some()
96    }
97
98    /// Returns the number of pending instances of the task.
99    pub(crate) fn pending(&self) -> usize {
100        self.pending.len()
101    }
102}
103
104impl<C, O> TaskVariants<C, O> {
105    /// Starts the first instance of this task, claiming its context.
106    ///
107    /// # Panics
108    ///
109    /// Panics if the task has already been started.
110    pub(crate) fn start(&mut self) -> C {
111        self.try_start().expect("context to not yet be claimed")
112    }
113
114    /// Completes the running instance of this task, notifying waiters of the result and returning
115    /// the context for the next instance of this task, if one exists.
116    ///
117    /// # Panics
118    ///
119    /// Panics if this method has previously returned `None`.
120    pub(crate) fn done(&mut self, res: O) -> Option<C> {
121        let cb = self.running.take().expect("running item to mark done").cb;
122
123        // As the shared future was just removed from the running task, if all clones of that
124        // future have also been dropped, this send can fail. Silently ignore that error.
125        let _ = cb.send(res);
126
127        // If further work for this key is queued, take and return its context.
128        self.try_start()
129    }
130
131    fn try_start(&mut self) -> Option<C> {
132        assert!(self.running.is_none());
133        if let Some(PendingWorkInfo { context, running }) = self.pending.pop_front() {
134            self.running = Some(running);
135            Some(context)
136        } else {
137            None
138        }
139    }
140}
141
142/// Creates a sender and clonable receiver channel pair, where the receiver maps Canceled errors to
143/// a [crate::TaskError].
144pub(crate) fn make_broadcast_pair<O>() -> (oneshot::Sender<O>, Shared<TaskFuture<O>>)
145where
146    O: Clone,
147{
148    let (sender, receiver) = oneshot::channel();
149    let fut = TaskFuture { fut: Either::Left(receiver) }.shared();
150
151    (sender, fut)
152}
153
154/// Creates a clonable receiver of the same type as the receiving end of a broadcast receiver that
155/// always reports that the task was canceled.
156pub(crate) fn make_canceled_receiver<O>() -> Shared<TaskFuture<O>>
157where
158    O: Clone,
159{
160    TaskFuture { fut: Either::Right(futures::future::err(oneshot::Canceled)) }.shared()
161}
162
163#[cfg(test)]
164mod tests {
165    use super::super::tests::MergeEqual;
166    use super::*;
167    use futures::executor::block_on;
168
169    #[test]
170    fn merges() {
171        let (mut infos, fut0_a) = TaskVariants::<MergeEqual, i32>::new(MergeEqual(0));
172
173        let fut0_b = infos.push(MergeEqual(0));
174        let fut1 = infos.push(MergeEqual(1));
175        let fut0_c = infos.push(MergeEqual(0));
176        let fut2 = infos.push(MergeEqual(2));
177
178        // Start the first task. Start a dup of the first task that won't be merged.
179        assert_eq!(infos.start(), MergeEqual(0));
180        let fut0_d = infos.push(MergeEqual(0));
181
182        // Complete the first instance and verify futures resolve.
183        assert_eq!(infos.done(0), Some(MergeEqual(1)));
184        assert_eq!(block_on(fut0_a), Ok(0));
185        assert_eq!(block_on(fut0_b), Ok(0));
186        assert_eq!(block_on(fut0_c), Ok(0));
187
188        // Completing the second instance starts the third.
189        assert_eq!(infos.done(1), Some(MergeEqual(2)));
190        assert_eq!(block_on(fut1), Ok(1));
191
192        // Completing the third instance starts the unmerged dup of the first.
193        assert_eq!(infos.done(2), Some(MergeEqual(0)));
194        assert_eq!(block_on(fut2), Ok(2));
195
196        // The unmerged dup resolves with the result here, and no work is left.
197        assert_eq!(infos.done(3), None);
198        assert_eq!(block_on(fut0_d), Ok(3));
199    }
200
201    #[test]
202    fn task_variants() {
203        let (mut infos, fut1) = TaskVariants::<(), ()>::new(());
204        let fut2 = infos.push(());
205
206        let () = infos.start();
207        assert_eq!(infos.done(()), None);
208
209        block_on(async move {
210            assert_eq!(fut1.await, Ok(()));
211            assert_eq!(fut2.await, Ok(()));
212        });
213    }
214}