work_queue/
lib.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![deny(missing_docs)]
6#![allow(clippy::type_complexity)]
7#![allow(clippy::let_unit_value)]
8
9//! Concurrent work queue helpers
10
11use fuchsia_sync::Mutex;
12use futures::channel::mpsc;
13use futures::future::{BoxFuture, Shared};
14use futures::prelude::*;
15use futures::ready;
16use futures::stream::{FusedStream, FuturesUnordered};
17use pin_project::pin_project;
18use std::collections::HashMap;
19use std::hash::Hash;
20use std::pin::Pin;
21use std::sync::{Arc, Weak};
22use std::task::{Context, Poll};
23use thiserror::Error;
24
25mod state;
26use state::{make_canceled_receiver, TaskFuture, TaskVariants};
27
28/// Error type indicating a task failed because the queue was dropped before completing the task.
29#[derive(Debug, PartialEq, Eq, Clone, Error)]
30#[error("The queue was dropped before processing this task")]
31pub struct Closed;
32
33/// Trait for merging context for work tasks with the same key.
34///
35/// Implementations must satisfy (for all `a` and `b`):
36/// * `a == b` implies `a.try_merge(b) == Ok(()) && a` was not modified; and
37/// * `a.try_merge(b) == Err(b)` implies `a` was not modified and `b` was returned unmodified.
38pub trait TryMerge: Eq + Sized {
39    /// Attempts to try_merge `other` into `self`, returning `other` if such an operation is not
40    /// possible.
41    ///
42    /// Implementations should return `Ok(())` if other was fully merged into `self`, or return
43    /// `Err(other)`, leaving `self` unchanged if the instances could not be merged.
44    fn try_merge(&mut self, other: Self) -> Result<(), Self>;
45}
46
47impl TryMerge for () {
48    fn try_merge(&mut self, _: ()) -> Result<(), Self> {
49        Ok(())
50    }
51}
52
53/// Creates an unbounded queue of work tasks that will execute up to `concurrency` `worker`s at once.
54///
55/// # Examples
56///
57/// ```
58/// # use queue::*;
59/// # use futures::prelude::*;
60///
61/// #[derive(Debug, Clone)]
62/// enum DownloadError {}
63///
64/// async fn download_file(url: String, _context: ()) -> Result<Vec<u8>, DownloadError> {
65///     // ...
66/// #     Ok(url.bytes().collect())
67/// }
68///
69/// let mut executor = futures::executor::LocalPool::new();
70/// executor.run_until(async move {
71///     let (mut processor, sender) = work_queue(2, download_file);
72///     let mut join_handles = vec![];
73///     for crate_name in vec!["rand", "lazy_static", "serde", "regex"] {
74///         let fut = sender.push(format!("https://crates.io/api/v1/crates/{}", crate_name), ());
75///         join_handles.push((crate_name, fut));
76///     }
77///
78///     // The queue stream won't terminate until all sender clones are dropped.
79///     drop(sender);
80///
81///     while let Some(key) = processor.next().await {
82///         println!("Finished processing {}", key);
83///     }
84///
85///     for (crate_name, fut) in join_handles {
86///         let res = fut
87///             .await
88///             .expect("queue to execute the task")
89///             .expect("downloads can't fail, right?");
90///         println!("Contents of {}: {:?}", crate_name, res);
91///     }
92/// });
93/// ```
94pub fn work_queue<W, K, C>(
95    concurrency: usize,
96    work_fn: W,
97) -> (WorkQueue<W, K, C>, WorkSender<K, C, <W::Future as Future>::Output>)
98where
99    W: Work<K, C>,
100    K: Clone + Eq + Hash,
101    C: TryMerge,
102{
103    let tasks = Arc::new(Mutex::new(HashMap::new()));
104    let (sender, receiver) = mpsc::unbounded();
105    let sender = WorkSender { sender, tasks: Arc::downgrade(&tasks) };
106    (
107        WorkQueue {
108            work_fn,
109            concurrency,
110            pending: receiver,
111            tasks,
112            running: FuturesUnordered::new(),
113        },
114        sender,
115    )
116}
117
118/// Trait that creates a work future from a key and context.
119pub trait Work<K, C> {
120    /// The future that is executed by the WorkQueue.
121    type Future: Future;
122
123    /// Create a new `Future` to be executed by the WorkQueue.
124    fn start(&self, key: K, context: C) -> Self::Future;
125}
126
127impl<F, K, C, WF> Work<K, C> for F
128where
129    F: Fn(K, C) -> WF,
130    WF: Future,
131{
132    type Future = WF;
133
134    fn start(&self, key: K, context: C) -> Self::Future {
135        (self)(key, context)
136    }
137}
138
139/// A work queue that processes a configurable number of tasks concurrently, deduplicating work
140/// with the same key.
141///
142/// Items are yielded from the stream in the order that they are processed, which may differ from
143/// the order that items are enqueued, depending on which concurrent tasks complete first.
144#[pin_project]
145pub struct WorkQueue<W, K, C>
146where
147    W: Work<K, C>,
148{
149    /// The work callback function.
150    work_fn: W,
151    /// Maximum number of tasks to run concurrently.
152    concurrency: usize,
153    /// Metadata about pending and running work. Modified by the queue itself when running tasks
154    /// and by [WorkSender] to add new tasks to the queue.
155    tasks: Arc<Mutex<HashMap<K, TaskVariants<C, <W::Future as Future>::Output>>>>,
156
157    /// Receiving end of the queue.
158    #[pin]
159    pending: mpsc::UnboundedReceiver<K>,
160    /// Tasks currently being run. Will contain [0, concurrency] futures at any given time.
161    #[pin]
162    running: FuturesUnordered<RunningTask<K, W::Future>>,
163}
164
165impl<W, K, C> WorkQueue<W, K, C>
166where
167    W: Work<K, C>,
168    K: Clone + Eq + Hash,
169{
170    /// Converts this stream of K into a single future that resolves when the stream is
171    /// terminated.
172    pub fn into_future(self) -> impl Future<Output = ()> {
173        self.map(|_res| ()).collect::<()>()
174    }
175
176    /// Starts new work if under the concurrency limit and work is enqueued.
177    /// Returns:
178    /// * Poll::Ready(None) if the input work queue is empty and closed.
179    /// * Poll::Ready(Some(())) if new work was started.
180    /// * Poll::Pending if at the concurrency limit or no work is enqueued.
181    fn find_work(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
182        let mut this = self.project();
183
184        // Nothing to do if the stream of requests is EOF.
185        if this.pending.is_terminated() {
186            return Poll::Ready(None);
187        }
188
189        let mut found = false;
190        while this.running.len() < *this.concurrency {
191            match ready!(this.pending.as_mut().poll_next(cx)) {
192                None => break,
193                Some(key) => {
194                    found = true;
195
196                    // Transition the work info to the running state, claiming the context.
197                    let context = this
198                        .tasks
199                        .lock()
200                        .get_mut(&key)
201                        .expect("map entry to exist if in pending queue")
202                        .start();
203
204                    // WorkSender::push_entry guarantees that key will only be pushed into pending
205                    // if it created the entry in the map, so it is guaranteed here that multiple
206                    // instances of the same key will not be executed concurrently.
207                    let work = this.work_fn.start(key.clone(), context);
208                    let fut = futures::future::join(futures::future::ready(key), work);
209
210                    this.running.push(fut);
211                }
212            }
213        }
214        if found {
215            Poll::Ready(Some(()))
216        } else {
217            Poll::Pending
218        }
219    }
220
221    fn do_work(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<K>> {
222        let mut this = self.project();
223
224        match this.running.as_mut().poll_next(cx) {
225            Poll::Pending => Poll::Pending,
226            Poll::Ready(None) => {
227                // this.running is now terminated, but unlike the guarantees given by other
228                // FusedStream implementations, FuturesUnordered can continue to be polled (unless
229                // new work comes in, it will continue to return Poll::Ready(None)), and new
230                // futures can be pushed into it. Pushing new work on a terminated
231                // FuturesUnordered will cause is_terminated to return false, and polling the
232                // stream will start the task.
233                if this.pending.is_terminated() {
234                    Poll::Ready(None)
235                } else {
236                    Poll::Pending
237                }
238            }
239            Poll::Ready(Some((key, res))) => {
240                let mut tasks = this.tasks.lock();
241                let infos: &mut TaskVariants<_, _> =
242                    tasks.get_mut(&key).expect("key to exist in map if not resolved");
243
244                if let Some(next_context) = infos.done(res) {
245                    // start the next operation immediately
246                    let work = this.work_fn.start(key.clone(), next_context);
247                    let key_clone = key.clone();
248                    let fut = futures::future::join(futures::future::ready(key_clone), work);
249
250                    drop(tasks);
251                    this.running.push(fut);
252                } else {
253                    // last pending operation with this key
254                    tasks.remove(&key);
255                }
256
257                // Yield the key that was processed to the stream, indicating if processing that
258                // value was successful or not.
259                Poll::Ready(Some(key))
260            }
261        }
262    }
263}
264
265impl<W, K, C> WorkQueue<W, K, C>
266where
267    W: Work<K, C>,
268    <<W as Work<K, C>>::Future as futures::Future>::Output: Send + Sync + 'static,
269    K: std::fmt::Debug + Send + 'static,
270    C: Send + 'static,
271{
272    /// Returns a callback to be given to `fuchsia_inspect::Node::record_lazy_child`.
273    /// Records the keys of the queue using their Debug format along with the number of
274    /// corresponding tasks that are running and pending.
275    pub fn record_lazy_inspect(
276        &self,
277    ) -> impl Fn() -> BoxFuture<'static, Result<fuchsia_inspect::Inspector, anyhow::Error>>
278           + Send
279           + Sync
280           + 'static {
281        let tasks = Arc::downgrade(&self.tasks);
282        move || {
283            let tasks = tasks.clone();
284            async move {
285                let inspector = fuchsia_inspect::Inspector::default();
286                if let Some(tasks) = tasks.upgrade() {
287                    // Drop the lock before the inspect operations in case they are slow.
288                    let tasks = {
289                        tasks
290                            .lock()
291                            .iter()
292                            .map(|(k, v)| (format!("{k:?}"), (v.running(), v.pending())))
293                            .collect::<Vec<_>>()
294                    };
295                    let root = inspector.root();
296                    for (k, (running, pending)) in tasks {
297                        root.record_child(k, |n| {
298                            n.record_uint("running", running as u64);
299                            n.record_uint("pending", pending as u64);
300                        })
301                    }
302                }
303                Ok(inspector)
304            }
305            .boxed()
306        }
307    }
308}
309
310impl<W, K, C> Stream for WorkQueue<W, K, C>
311where
312    W: Work<K, C>,
313    K: Clone + Eq + Hash,
314{
315    type Item = K;
316    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
317        match (self.as_mut().find_work(cx), self.as_mut().do_work(cx)) {
318            (Poll::Ready(None), Poll::Ready(None)) => {
319                // There input queues are empty and closed, and all running work has been
320                // completed. This work queue is now (or was already) terminated.
321                Poll::Ready(None)
322            }
323            (Poll::Ready(Some(())), Poll::Ready(Some(res))) => {
324                // The input queue made progress this iteration and a work item completed.
325                // find_work again to either start more work or register for a wakeup when work
326                // becomes available.
327                let _ = self.as_mut().find_work(cx);
328                Poll::Ready(Some(res))
329            }
330            (_not_ready_none, Poll::Ready(None)) => {
331                // Our active task queue is empty, but more work can still come in. Report this
332                // poll as pending.
333                Poll::Pending
334            }
335            (_, poll) => poll,
336        }
337    }
338}
339
340type RunningTask<K, WF> = futures::future::Join<futures::future::Ready<K>, WF>;
341
342/// A clonable handle to the work queue.  When all clones of [WorkSender] are dropped, the queue
343/// will process all remaining requests and terminate its output stream.
344pub struct WorkSender<K, C, O> {
345    sender: mpsc::UnboundedSender<K>,
346    // Weak reference to ensure that if the queue is dropped, the now unused sender end of the
347    // completion callback will be dropped too, canceling the request.
348    tasks: Weak<Mutex<HashMap<K, TaskVariants<C, O>>>>,
349}
350
351impl<K, C, O> Clone for WorkSender<K, C, O> {
352    fn clone(&self) -> Self {
353        Self { sender: self.sender.clone(), tasks: self.tasks.clone() }
354    }
355}
356
357impl<K, C, O> std::fmt::Debug for WorkSender<K, C, O> {
358    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
359        f.debug_struct("WorkSender").finish()
360    }
361}
362
363impl<K, C, O> WorkSender<K, C, O>
364where
365    K: Clone + Eq + Hash,
366    C: TryMerge,
367    O: Clone,
368{
369    /// Enqueue the given key to be processed by a worker, or attach to an existing request to
370    /// process this key.
371    pub fn push(&self, key: K, context: C) -> impl Future<Output = Result<O, Closed>> {
372        let tasks = match self.tasks.upgrade() {
373            Some(tasks) => tasks,
374            None => {
375                // Work queue no longer exists. Immediately cancel this request.
376                return make_canceled_receiver();
377            }
378        };
379        let mut tasks = tasks.lock();
380
381        Self::push_entry(&mut *tasks, &self.sender, key, context)
382    }
383
384    /// Enqueue all the given keys to be processed by a worker, merging them with existing known
385    /// tasks if possible, returning an iterator of the futures that will resolve to the results of
386    /// processing the keys.
387    ///
388    /// This method is similar to, but more efficient than, mapping an iterator to
389    /// `WorkSender::push`.
390    pub fn push_all(
391        &self,
392        entries: impl Iterator<Item = (K, C)>,
393    ) -> impl Iterator<Item = impl Future<Output = Result<O, Closed>>> {
394        let mut tasks = self.tasks.upgrade();
395        let mut tasks = tasks.as_mut().map(|tasks| tasks.lock());
396
397        entries
398            .map(move |(key, context)| {
399                if let Some(ref mut tasks) = tasks {
400                    Self::push_entry(&mut *tasks, &self.sender, key, context)
401                } else {
402                    // Work queue no longer exists. Immediately cancel this request.
403                    make_canceled_receiver()
404                }
405            })
406            .collect::<Vec<_>>()
407            .into_iter()
408    }
409
410    fn push_entry(
411        tasks: &mut HashMap<K, TaskVariants<C, O>>,
412        self_sender: &mpsc::UnboundedSender<K>,
413        key: K,
414        context: C,
415    ) -> Shared<TaskFuture<O>> {
416        use std::collections::hash_map::Entry;
417
418        match tasks.entry(key.clone()) {
419            Entry::Vacant(entry) => {
420                // No other variant of this task is running or pending. Reserve our
421                // spot in line and configure the task's metadata.
422                if let Ok(()) = self_sender.unbounded_send(key) {
423                    let (infos, fut) = TaskVariants::new(context);
424                    entry.insert(infos);
425                    fut
426                } else {
427                    // Work queue no longer exists. Immediately cancel this request.
428                    make_canceled_receiver()
429                }
430            }
431            Entry::Occupied(entry) => entry.into_mut().push(context),
432        }
433    }
434}
435
436#[cfg(test)]
437mod tests {
438    use super::*;
439    use futures::channel::oneshot;
440    use futures::executor::{block_on, LocalSpawner};
441    use futures::task::{LocalSpawnExt, SpawnExt};
442    use std::borrow::Borrow;
443    use std::fmt;
444
445    #[test]
446    fn basic_usage() {
447        async fn do_work(_key: String, _context: ()) -> Result<(), ()> {
448            Ok(())
449        }
450
451        let (processor, enqueue) = work_queue(3, do_work);
452
453        let tasks = FuturesUnordered::new();
454
455        tasks.push(enqueue.push("a".into(), ()));
456        tasks.push(enqueue.push("a".into(), ()));
457        tasks.push(enqueue.push("b".into(), ()));
458        tasks.push(enqueue.push("a".into(), ()));
459        tasks.push(enqueue.push("c".into(), ()));
460
461        drop(enqueue);
462
463        block_on(async move {
464            let (keys, res) = futures::future::join(
465                processor.collect::<Vec<String>>(),
466                tasks.collect::<Vec<Result<Result<(), ()>, _>>>(),
467            )
468            .await;
469            assert_eq!(keys, vec!["a".to_string(), "b".into(), "c".into()]);
470            assert_eq!(res, vec![Ok(Ok(())), Ok(Ok(())), Ok(Ok(())), Ok(Ok(())), Ok(Ok(()))]);
471        });
472    }
473
474    #[test]
475    fn into_future() {
476        async fn nop(key: i32, _context: ()) -> i32 {
477            key
478        }
479        let (processor, enqueue) = work_queue(1, nop);
480
481        let res_fut =
482            future::join3(processor.into_future(), enqueue.push(1, ()), enqueue.push(2, ()));
483        drop(enqueue);
484
485        let res = block_on(res_fut);
486        assert_eq!(res, ((), Ok(1), Ok(2)));
487    }
488
489    #[derive(Debug, PartialEq, Eq)]
490    pub(crate) struct MergeEqual(pub(crate) i32);
491
492    impl TryMerge for MergeEqual {
493        fn try_merge(&mut self, other: Self) -> Result<(), Self> {
494            if self.0 == other.0 {
495                Ok(())
496            } else {
497                Err(other)
498            }
499        }
500    }
501
502    #[test]
503    fn dropping_queue_fails_requests() {
504        async fn do_work(_key: &str, _context: MergeEqual) -> Result<(), ()> {
505            Ok(())
506        }
507
508        let (processor, enqueue) = work_queue(1, do_work);
509
510        let fut_early_a = enqueue.push("a", MergeEqual(0));
511        let fut_early_b = enqueue.push("a", MergeEqual(1));
512        let fut_early_c = enqueue.push("a", MergeEqual(0));
513        drop(processor);
514        let fut_late = enqueue.push("b", MergeEqual(0));
515
516        block_on(async move {
517            assert_eq!(fut_early_a.await, Err(Closed));
518            assert_eq!(fut_early_b.await, Err(Closed));
519            assert_eq!(fut_early_c.await, Err(Closed));
520            assert_eq!(fut_late.await, Err(Closed));
521
522            let requests = vec![("1", MergeEqual(0)), ("2", MergeEqual(1)), ("1", MergeEqual(0))];
523            for fut in enqueue.push_all(requests.into_iter()) {
524                assert_eq!(fut.await, Err(Closed));
525            }
526        });
527    }
528
529    #[derive(Debug)]
530    struct TestRunningTask<C, O> {
531        unblocker: oneshot::Sender<O>,
532        context: C,
533    }
534
535    #[derive(Debug)]
536    struct TestRunningTasks<K, C, O>
537    where
538        K: Eq + Hash,
539    {
540        tasks: Arc<Mutex<HashMap<K, TestRunningTask<C, O>>>>,
541    }
542
543    impl<K, C, O> TestRunningTasks<K, C, O>
544    where
545        K: fmt::Debug + Eq + Hash + Sized + Clone,
546        C: fmt::Debug,
547        O: fmt::Debug,
548    {
549        fn new() -> Self {
550            Self { tasks: Arc::new(Mutex::new(HashMap::new())) }
551        }
552
553        fn resolve<Q>(&self, key: &Q, res: O) -> C
554        where
555            Q: Eq + Hash + ?Sized,
556            K: Borrow<Q>,
557        {
558            let task =
559                self.tasks.lock().remove(key.borrow()).expect("key to exist in running work");
560            task.unblocker.send(res).unwrap();
561            task.context
562        }
563
564        fn peek(&self) -> Option<K> {
565            self.tasks.lock().keys().next().cloned()
566        }
567
568        fn keys(&self) -> Vec<K> {
569            self.tasks.lock().keys().cloned().collect()
570        }
571
572        fn assert_empty(&self) {
573            assert_eq!(
574                self.tasks.lock().keys().collect::<Vec<&K>>(),
575                Vec::<&K>::new(),
576                "expect queue to be empty"
577            );
578        }
579    }
580
581    #[derive(Debug)]
582    struct TestQueueResults<K> {
583        done: Arc<Mutex<Vec<K>>>,
584        terminated: Arc<Mutex<bool>>,
585    }
586
587    impl<K> TestQueueResults<K> {
588        fn take(&self) -> Vec<K> {
589            std::mem::take(&mut *self.done.lock())
590        }
591
592        fn is_terminated(&self) -> bool {
593            *self.terminated.lock()
594        }
595    }
596
597    #[test]
598    fn check_works_with_sendable_types() {
599        struct TestWork;
600
601        impl Work<(), ()> for TestWork {
602            type Future = futures::future::Ready<()>;
603
604            fn start(&self, _key: (), _context: ()) -> Self::Future {
605                futures::future::ready(())
606            }
607        }
608
609        let (processor, enqueue) = work_queue(3, TestWork);
610
611        let tasks = FuturesUnordered::new();
612        tasks.push(enqueue.push((), ()));
613
614        drop(enqueue);
615
616        let mut executor = futures::executor::LocalPool::new();
617        let handle = executor
618            .spawner()
619            .spawn_with_handle(async move {
620                let (keys, res) =
621                    futures::future::join(processor.collect::<Vec<_>>(), tasks.collect::<Vec<_>>())
622                        .await;
623                assert_eq!(keys, vec![()]);
624                assert_eq!(res, vec![Ok(())]);
625            })
626            .expect("spawn to work");
627        let () = executor.run_until(handle);
628    }
629
630    #[test]
631    fn check_works_with_unsendable_types() {
632        use std::rc::Rc;
633
634        // Unfortunately `impl !Send for $Type` is unstable, so use Rc<()> to make sure WorkQueue
635        // still works.
636        struct TestWork(#[expect(dead_code)] Rc<()>);
637        #[derive(Clone, Debug, PartialEq, Eq, Hash)]
638        struct TestKey(Rc<()>);
639        #[derive(PartialEq, Eq)]
640        struct TestContext(Rc<()>);
641        #[derive(Clone, Debug, PartialEq)]
642        struct TestOutput(Rc<()>);
643
644        impl Work<TestKey, TestContext> for TestWork {
645            type Future = futures::future::Ready<TestOutput>;
646
647            fn start(&self, _key: TestKey, _context: TestContext) -> Self::Future {
648                futures::future::ready(TestOutput(Rc::new(())))
649            }
650        }
651
652        impl TryMerge for TestContext {
653            fn try_merge(&mut self, _: Self) -> Result<(), Self> {
654                Ok(())
655            }
656        }
657
658        let (processor, enqueue) = work_queue(3, TestWork(Rc::new(())));
659
660        let tasks = FuturesUnordered::new();
661        tasks.push(enqueue.push(TestKey(Rc::new(())), TestContext(Rc::new(()))));
662
663        drop(enqueue);
664
665        let mut executor = futures::executor::LocalPool::new();
666        let handle = executor
667            .spawner()
668            .spawn_local_with_handle(async move {
669                let (keys, res) =
670                    futures::future::join(processor.collect::<Vec<_>>(), tasks.collect::<Vec<_>>())
671                        .await;
672                assert_eq!(keys, vec![TestKey(Rc::new(()))]);
673                assert_eq!(res, vec![Ok(TestOutput(Rc::new(())))]);
674            })
675            .expect("spawn to work");
676        let () = executor.run_until(handle);
677    }
678
679    fn spawn_test_work_queue<K, C, O>(
680        spawner: LocalSpawner,
681        concurrency: usize,
682    ) -> (WorkSender<K, C, O>, TestRunningTasks<K, C, O>, TestQueueResults<K>)
683    where
684        K: Send + Clone + fmt::Debug + Eq + Hash + 'static,
685        C: TryMerge + Send + fmt::Debug + 'static,
686        O: Send + Clone + fmt::Debug + 'static,
687    {
688        let running = TestRunningTasks::<K, C, O>::new();
689        let running_tasks = running.tasks.clone();
690        let do_work = move |key: K, context: C| {
691            // wait for the test driver to resolve this work item and return the result it
692            // provides.
693            let (sender, receiver) = oneshot::channel();
694            assert!(running_tasks
695                .lock()
696                .insert(key, TestRunningTask::<C, O> { unblocker: sender, context })
697                .is_none());
698            async move { receiver.await.unwrap() }
699        };
700
701        let (mut processor, enqueue) = work_queue(concurrency, do_work);
702        let done = Arc::new(Mutex::new(Vec::new()));
703        let terminated = Arc::new(Mutex::new(false));
704        let results =
705            TestQueueResults { done: Arc::clone(&done), terminated: Arc::clone(&terminated) };
706
707        spawner
708            .spawn_local(async move {
709                while let Some(res) = processor.next().await {
710                    done.lock().push(res);
711                }
712                *terminated.lock() = true;
713            })
714            .expect("spawn to succeed");
715
716        (enqueue, running, results)
717    }
718
719    #[test]
720    fn processes_known_work_before_stalling() {
721        let mut executor = futures::executor::LocalPool::new();
722
723        let (enqueue, running, done) =
724            spawn_test_work_queue::<&str, (), usize>(executor.spawner(), 2);
725
726        let task_hello = enqueue.push("hello", ());
727        let task_world = enqueue.push("world!", ());
728        let task_test = enqueue.push("test", ());
729        executor.run_until_stalled();
730        assert_eq!(done.take(), Vec::<&str>::new());
731
732        running.resolve("hello", 5);
733        running.resolve("world!", 6);
734        running.assert_empty();
735        executor.run_until_stalled();
736        assert_eq!(done.take(), vec!["hello", "world!"]);
737
738        assert_eq!(executor.run_until(task_hello), Ok(5));
739        assert_eq!(executor.run_until(task_world), Ok(6));
740
741        running.resolve("test", 4);
742        assert_eq!(executor.run_until(task_test), Ok(4));
743        assert_eq!(done.take(), vec!["test"]);
744    }
745
746    #[test]
747    fn restarts_after_draining_input_queue() {
748        let mut executor = futures::executor::LocalPool::new();
749
750        let (enqueue, running, done) = spawn_test_work_queue::<&str, (), ()>(executor.spawner(), 2);
751
752        // Process a few tasks to completion through the queue.
753        let task_a = enqueue.push("a", ());
754        let task_b = enqueue.push("b", ());
755        executor.run_until_stalled();
756        running.resolve("a", ());
757        running.resolve("b", ());
758        assert_eq!(executor.run_until(task_a), Ok(()));
759        assert_eq!(executor.run_until(task_b), Ok(()));
760        assert_eq!(done.take(), vec!["a", "b"]);
761
762        // Ensure the queue processes more tasks after its inner FuturesUnordered queue has
763        // previously terminated.
764        let task_c = enqueue.push("c", ());
765        executor.run_until_stalled();
766        running.resolve("c", ());
767        assert_eq!(executor.run_until(task_c), Ok(()));
768        assert_eq!(done.take(), vec!["c"]);
769
770        // Also ensure the queue itself terminates once all send handles are dropped and all tasks
771        // are complete.
772        let task_a = enqueue.push("a", ());
773        let task_d = enqueue.push("d", ());
774        drop(enqueue);
775        executor.run_until_stalled();
776        assert!(!done.is_terminated());
777        assert_eq!(done.take(), Vec::<&str>::new());
778        running.resolve("a", ());
779        running.resolve("d", ());
780        assert_eq!(executor.run_until(task_a), Ok(()));
781        assert_eq!(executor.run_until(task_d), Ok(()));
782        assert_eq!(done.take(), vec!["a", "d"]);
783        assert!(done.is_terminated());
784    }
785
786    #[test]
787    fn push_all() {
788        let mut executor = futures::executor::LocalPool::new();
789
790        let (enqueue, running, done) =
791            spawn_test_work_queue::<&str, (), usize>(executor.spawner(), 2);
792
793        let mut futs =
794            enqueue.push_all(vec![("a", ()), ("b", ()), ("c", ()), ("b", ())].into_iter());
795        running.assert_empty();
796
797        executor.run_until_stalled();
798        running.resolve("a", 1);
799        running.resolve("b", 2);
800        running.assert_empty();
801        assert_eq!(executor.run_until(futs.next().unwrap()), Ok(1));
802        assert_eq!(executor.run_until(futs.next().unwrap()), Ok(2));
803
804        running.resolve("c", 3);
805        running.assert_empty();
806        assert_eq!(executor.run_until(futs.next().unwrap()), Ok(3));
807        assert_eq!(executor.run_until(futs.next().unwrap()), Ok(2));
808        assert!(futs.next().is_none());
809
810        assert_eq!(done.take(), vec!["a", "b", "c"]);
811    }
812
813    #[test]
814    fn handles_many_tasks() {
815        let mut executor = futures::executor::LocalPool::new();
816
817        let (enqueue, running, done) =
818            spawn_test_work_queue::<String, (), ()>(executor.spawner(), 5);
819
820        let mut tasks = FuturesUnordered::new();
821
822        for i in 0..10000 {
823            let key = format!("task_{i}");
824            tasks.push(enqueue.push(key, ()));
825        }
826
827        // also queue up some duplicate tasks.
828        let task_dups = enqueue
829            .push_all((0..10000).filter(|i| i % 2 == 0).map(|i| {
830                let key = format!("task_{i}");
831                (key, ())
832            }))
833            .collect::<FuturesUnordered<_>>();
834
835        executor.run_until_stalled();
836
837        while let Some(key) = running.peek() {
838            running.resolve(&key, ());
839            assert_eq!(executor.run_until(tasks.next()), Some(Ok(())));
840            assert_eq!(done.take(), vec![key]);
841        }
842
843        assert_eq!(executor.run_until(task_dups.collect::<Vec<_>>()), vec![Ok(()); 5000]);
844    }
845
846    #[test]
847    fn dedups_compound_keys() {
848        let mut executor = futures::executor::LocalPool::new();
849
850        #[derive(Debug, Clone, PartialEq, Eq, Hash)]
851        struct Params<'a> {
852            key: &'a str,
853            options: &'a [&'a str],
854        }
855
856        let (enqueue, running, done) =
857            spawn_test_work_queue::<Params<'_>, (), &str>(executor.spawner(), 5);
858
859        let key_a = Params { key: "first", options: &[] };
860        let key_b = Params { key: "first", options: &["unique"] };
861        let task_a1 = enqueue.push(key_a.clone(), ());
862        let task_b = enqueue.push(key_b.clone(), ());
863        let task_a2 = enqueue.push(key_a.clone(), ());
864
865        executor.run_until_stalled();
866
867        running.resolve(&key_b, "first_unique");
868        executor.run_until_stalled();
869        assert_eq!(done.take(), vec![key_b]);
870        assert_eq!(executor.run_until(task_b), Ok("first_unique"));
871
872        running.resolve(&key_a, "first_no_options");
873        executor.run_until_stalled();
874        assert_eq!(done.take(), vec![key_a]);
875        assert_eq!(executor.run_until(task_a2), Ok("first_no_options"));
876        assert_eq!(executor.run_until(task_a1), Ok("first_no_options"));
877    }
878
879    #[test]
880    fn merges_context_of_pending_tasks() {
881        let mut executor = futures::executor::LocalPool::new();
882
883        #[derive(Default, Debug, PartialEq, Eq)]
884        struct MyContext(String);
885
886        impl TryMerge for MyContext {
887            fn try_merge(&mut self, other: Self) -> Result<(), Self> {
888                self.0.push_str(&other.0);
889                Ok(())
890            }
891        }
892
893        let (enqueue, running, done) =
894            spawn_test_work_queue::<&str, MyContext, ()>(executor.spawner(), 1);
895
896        let task_a = enqueue.push("dup", MyContext("a".into()));
897        let task_unique = enqueue.push("unique", MyContext("not-deduped".into()));
898        let task_b = enqueue.push("dup", MyContext("b".into()));
899        executor.run_until_stalled();
900        let task_c1 = enqueue.push("dup", MyContext("c".into()));
901        executor.run_until_stalled();
902
903        // "c" not merged in since "dup" was already running with different context.
904        assert_eq!(running.resolve("dup", ()), MyContext("ab".into()));
905        assert_eq!(executor.run_until(task_a), Ok(()));
906        assert_eq!(executor.run_until(task_b), Ok(()));
907        assert_eq!(done.take(), vec!["dup"]);
908
909        // even though "unique" was added to the queue before "dup"/"c", "dup" is given priority
910        // since it was already running.
911        assert_eq!(running.keys(), vec!["dup"]);
912        assert_eq!(running.resolve("dup", ()), MyContext("c".into()));
913        assert_eq!(executor.run_until(task_c1), Ok(()));
914        assert_eq!(done.take(), vec!["dup"]);
915
916        assert_eq!(running.resolve("unique", ()), MyContext("not-deduped".into()));
917        assert_eq!(executor.run_until(task_unique), Ok(()));
918        assert_eq!(done.take(), vec!["unique"]);
919        running.assert_empty();
920
921        // ensure re-running a previously completed item executes it again.
922        let task_c2 = enqueue.push("dup", MyContext("c".into()));
923        executor.run_until_stalled();
924        assert_eq!(running.keys(), vec!["dup"]);
925        assert_eq!(running.resolve("dup", ()), MyContext("c".into()));
926        assert_eq!(executor.run_until(task_c2), Ok(()));
927        assert_eq!(done.take(), vec!["dup"]);
928        running.assert_empty();
929    }
930
931    #[fuchsia::test]
932    async fn inspect() {
933        // Notify the test when the tasks are running.
934        let (sender, mut receiver) = futures::channel::mpsc::channel(0);
935        let do_work = move |_: String, _: MergeEqual| {
936            let mut sender = sender.clone();
937            async move {
938                let () = sender.send(()).await.unwrap();
939                let () = futures::future::pending().await;
940                Ok::<_, ()>(())
941            }
942        };
943        let (mut processor, enqueue) = work_queue(2, do_work);
944
945        let inspector = fuchsia_inspect::Inspector::default();
946        inspector.root().record_lazy_child("queue", processor.record_lazy_inspect());
947        fuchsia_async::Task::spawn(async move { while (processor.next().await).is_some() {} })
948            .detach();
949
950        // Inspect empty before queue is used.
951        diagnostics_assertions::assert_data_tree!(inspector, root: {
952            "queue": {}
953        });
954
955        // Inspect populated when queue non-empty.
956        let _a0 = enqueue.push("a".into(), MergeEqual(0));
957        let _b0 = enqueue.push("b".into(), MergeEqual(0));
958        let _a1 = enqueue.push("a".into(), MergeEqual(1));
959        let _c0 = enqueue.push("c".into(), MergeEqual(0));
960
961        let () = receiver.next().await.unwrap();
962        let () = receiver.next().await.unwrap();
963
964        diagnostics_assertions::assert_data_tree!(inspector, root: {
965            "queue": {
966                r#""a""#: {
967                    "running": 1u64,
968                    "pending": 1u64,
969                },
970                r#""b""#: {
971                    "running": 1u64,
972                    "pending": 0u64,
973                },
974                r#""c""#: {
975                    "running": 0u64,
976                    "pending": 1u64,
977                },
978            }
979        });
980    }
981}