1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
45use super::{Closed, TryMerge};
6use futures::channel::oneshot;
7use futures::future::{Either, Ready, Shared};
8use futures::prelude::*;
9use pin_project::pin_project;
10use std::collections::VecDeque;
11use std::pin::Pin;
12use std::task::{Context, Poll};
1314#[pin_project]
15pub(crate) struct TaskFuture<O> {
16#[pin]
17fut: Either<oneshot::Receiver<O>, Ready<Result<O, oneshot::Canceled>>>,
18}
1920impl<O> Future for TaskFuture<O> {
21type Output = Result<O, Closed>;
2223fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
24match self.project().fut.poll(cx) {
25 Poll::Pending => Poll::Pending,
26 Poll::Ready(res) => Poll::Ready(res.map_err(|oneshot::Canceled| Closed)),
27 }
28 }
29}
3031/// Shared state for a pending work item. Contains the context parameter to be provided to the task
32/// when running it and other metadata present when running the task.
33pub struct PendingWorkInfo<C, O> {
34 context: C,
35 running: RunningWorkInfo<O>,
36}
3738/// Shared state for a single running work item. Contains the sending end of the shared future for sending
39/// the result of the task and the clonable shared future that will resolve to that result.
40pub struct RunningWorkInfo<O> {
41 cb: oneshot::Sender<O>,
42 fut: Shared<TaskFuture<O>>,
43}
4445/// Metadata about pending and running instances of a task.
46pub struct TaskVariants<C, O> {
47 running: Option<RunningWorkInfo<O>>,
48 pending: VecDeque<PendingWorkInfo<C, O>>,
49}
5051impl<C, O> TaskVariants<C, O>
52where
53C: TryMerge,
54 O: Clone,
55{
56/// Creates a new TaskVariants with a single, pending instance of this task, returning Self and
57 /// the completion future for the initial instance of the task.
58pub(crate) fn new(context: C) -> (Self, Shared<TaskFuture<O>>) {
59let mut res = Self { running: None, pending: VecDeque::new() };
60let fut = res.push_back(context);
61 (res, fut)
62 }
6364/// Attempts to merge the given context with an existing pending instance of this task, or
65 /// enqueues a new instance of this task with the given context. Returns the completion future
66 /// for the instance of the task.
67pub(crate) fn push(&mut self, mut context: C) -> Shared<TaskFuture<O>> {
68// First try to merge this task with another queued or running task.
69for info in self.pending.iter_mut() {
70if let Err(unmerged) = info.context.try_merge(context) {
71 context = unmerged;
72 } else {
73return info.running.fut.clone();
74 }
75 }
7677// Otherwise, enqueue a new task.
78self.push_back(context)
79 }
8081fn push_back(&mut self, context: C) -> Shared<TaskFuture<O>> {
82let (sender, fut) = make_broadcast_pair();
8384self.pending.push_back(PendingWorkInfo {
85 context,
86 running: RunningWorkInfo { cb: sender, fut: fut.clone() },
87 });
88 fut
89 }
90}
9192impl<C, O> TaskVariants<C, O> {
93/// Returns true iff an instance of the task is running.
94pub(crate) fn running(&self) -> bool {
95self.running.is_some()
96 }
9798/// Returns the number of pending instances of the task.
99pub(crate) fn pending(&self) -> usize {
100self.pending.len()
101 }
102}
103104impl<C, O> TaskVariants<C, O> {
105/// Starts the first instance of this task, claiming its context.
106 ///
107 /// # Panics
108 ///
109 /// Panics if the task has already been started.
110pub(crate) fn start(&mut self) -> C {
111self.try_start().expect("context to not yet be claimed")
112 }
113114/// Completes the running instance of this task, notifying waiters of the result and returning
115 /// the context for the next instance of this task, if one exists.
116 ///
117 /// # Panics
118 ///
119 /// Panics if this method has previously returned `None`.
120pub(crate) fn done(&mut self, res: O) -> Option<C> {
121let cb = self.running.take().expect("running item to mark done").cb;
122123// As the shared future was just removed from the running task, if all clones of that
124 // future have also been dropped, this send can fail. Silently ignore that error.
125let _ = cb.send(res);
126127// If further work for this key is queued, take and return its context.
128self.try_start()
129 }
130131fn try_start(&mut self) -> Option<C> {
132assert!(self.running.is_none());
133if let Some(PendingWorkInfo { context, running }) = self.pending.pop_front() {
134self.running = Some(running);
135Some(context)
136 } else {
137None
138}
139 }
140}
141142/// Creates a sender and clonable receiver channel pair, where the receiver maps Canceled errors to
143/// a [crate::TaskError].
144pub(crate) fn make_broadcast_pair<O>() -> (oneshot::Sender<O>, Shared<TaskFuture<O>>)
145where
146O: Clone,
147{
148let (sender, receiver) = oneshot::channel();
149let fut = TaskFuture { fut: Either::Left(receiver) }.shared();
150151 (sender, fut)
152}
153154/// Creates a clonable receiver of the same type as the receiving end of a broadcast receiver that
155/// always reports that the task was canceled.
156pub(crate) fn make_canceled_receiver<O>() -> Shared<TaskFuture<O>>
157where
158O: Clone,
159{
160 TaskFuture { fut: Either::Right(futures::future::err(oneshot::Canceled)) }.shared()
161}
162163#[cfg(test)]
164mod tests {
165use super::super::tests::MergeEqual;
166use super::*;
167use futures::executor::block_on;
168169#[test]
170fn merges() {
171let (mut infos, fut0_a) = TaskVariants::<MergeEqual, i32>::new(MergeEqual(0));
172173let fut0_b = infos.push(MergeEqual(0));
174let fut1 = infos.push(MergeEqual(1));
175let fut0_c = infos.push(MergeEqual(0));
176let fut2 = infos.push(MergeEqual(2));
177178// Start the first task. Start a dup of the first task that won't be merged.
179assert_eq!(infos.start(), MergeEqual(0));
180let fut0_d = infos.push(MergeEqual(0));
181182// Complete the first instance and verify futures resolve.
183assert_eq!(infos.done(0), Some(MergeEqual(1)));
184assert_eq!(block_on(fut0_a), Ok(0));
185assert_eq!(block_on(fut0_b), Ok(0));
186assert_eq!(block_on(fut0_c), Ok(0));
187188// Completing the second instance starts the third.
189assert_eq!(infos.done(1), Some(MergeEqual(2)));
190assert_eq!(block_on(fut1), Ok(1));
191192// Completing the third instance starts the unmerged dup of the first.
193assert_eq!(infos.done(2), Some(MergeEqual(0)));
194assert_eq!(block_on(fut2), Ok(2));
195196// The unmerged dup resolves with the result here, and no work is left.
197assert_eq!(infos.done(3), None);
198assert_eq!(block_on(fut0_d), Ok(3));
199 }
200201#[test]
202fn task_variants() {
203let (mut infos, fut1) = TaskVariants::<(), ()>::new(());
204let fut2 = infos.push(());
205206let () = infos.start();
207assert_eq!(infos.done(()), None);
208209 block_on(async move {
210assert_eq!(fut1.await, Ok(()));
211assert_eq!(fut2.await, Ok(()));
212 });
213 }
214}