work_queue/
lib.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![deny(missing_docs)]
6#![allow(clippy::type_complexity)]
7#![allow(clippy::let_unit_value)]
8
9//! Concurrent work queue helpers
10
11use fuchsia_sync::Mutex;
12use futures::channel::mpsc;
13use futures::future::{BoxFuture, Shared};
14use futures::prelude::*;
15use futures::ready;
16use futures::stream::{FusedStream, FuturesUnordered};
17use pin_project::pin_project;
18use std::collections::HashMap;
19use std::hash::Hash;
20use std::pin::Pin;
21use std::sync::{Arc, Weak};
22use std::task::{Context, Poll};
23use thiserror::Error;
24
25mod state;
26use state::{TaskFuture, TaskVariants, make_canceled_receiver};
27
28/// Error type indicating a task failed because the queue was dropped before completing the task.
29#[derive(Debug, PartialEq, Eq, Clone, Error)]
30#[error("The queue was dropped before processing this task")]
31pub struct Closed;
32
33/// Trait for merging context for work tasks with the same key.
34///
35/// Implementations must satisfy (for all `a` and `b`):
36/// * `a == b` implies `a.try_merge(b) == Ok(()) && a` was not modified; and
37/// * `a.try_merge(b) == Err(b)` implies `a` was not modified and `b` was returned unmodified.
38pub trait TryMerge: Eq + Sized {
39    /// Attempts to try_merge `other` into `self`, returning `other` if such an operation is not
40    /// possible.
41    ///
42    /// Implementations should return `Ok(())` if other was fully merged into `self`, or return
43    /// `Err(other)`, leaving `self` unchanged if the instances could not be merged.
44    fn try_merge(&mut self, other: Self) -> Result<(), Self>;
45}
46
47impl TryMerge for () {
48    fn try_merge(&mut self, _: ()) -> Result<(), Self> {
49        Ok(())
50    }
51}
52
53/// Creates an unbounded queue of work tasks that will execute up to `concurrency` `worker`s at once.
54///
55/// # Examples
56///
57/// ```
58/// # use queue::*;
59/// # use futures::prelude::*;
60///
61/// #[derive(Debug, Clone)]
62/// enum DownloadError {}
63///
64/// async fn download_file(url: String, _context: ()) -> Result<Vec<u8>, DownloadError> {
65///     // ...
66/// #     Ok(url.bytes().collect())
67/// }
68///
69/// let mut executor = futures::executor::LocalPool::new();
70/// executor.run_until(async move {
71///     let (mut processor, sender) = work_queue(2, download_file);
72///     let mut join_handles = vec![];
73///     for crate_name in vec!["rand", "lazy_static", "serde", "regex"] {
74///         let fut = sender.push(format!("https://crates.io/api/v1/crates/{}", crate_name), ());
75///         join_handles.push((crate_name, fut));
76///     }
77///
78///     // The queue stream won't terminate until all sender clones are dropped.
79///     drop(sender);
80///
81///     while let Some(key) = processor.next().await {
82///         println!("Finished processing {}", key);
83///     }
84///
85///     for (crate_name, fut) in join_handles {
86///         let res = fut
87///             .await
88///             .expect("queue to execute the task")
89///             .expect("downloads can't fail, right?");
90///         println!("Contents of {}: {:?}", crate_name, res);
91///     }
92/// });
93/// ```
94pub fn work_queue<W, K, C>(
95    concurrency: usize,
96    work_fn: W,
97) -> (WorkQueue<W, K, C>, WorkSender<K, C, <W::Future as Future>::Output>)
98where
99    W: Work<K, C>,
100    K: Clone + Eq + Hash,
101    C: TryMerge,
102{
103    let tasks = Arc::new(Mutex::new(HashMap::new()));
104    let (sender, receiver) = mpsc::unbounded();
105    let sender = WorkSender { sender, tasks: Arc::downgrade(&tasks) };
106    (
107        WorkQueue {
108            work_fn,
109            concurrency,
110            pending: receiver,
111            tasks,
112            running: FuturesUnordered::new(),
113        },
114        sender,
115    )
116}
117
118/// Trait that creates a work future from a key and context.
119pub trait Work<K, C> {
120    /// The future that is executed by the WorkQueue.
121    type Future: Future;
122
123    /// Create a new `Future` to be executed by the WorkQueue.
124    fn start(&self, key: K, context: C) -> Self::Future;
125}
126
127impl<F, K, C, WF> Work<K, C> for F
128where
129    F: Fn(K, C) -> WF,
130    WF: Future,
131{
132    type Future = WF;
133
134    fn start(&self, key: K, context: C) -> Self::Future {
135        (self)(key, context)
136    }
137}
138
139/// A work queue that processes a configurable number of tasks concurrently, deduplicating work
140/// with the same key.
141///
142/// Items are yielded from the stream in the order that they are processed, which may differ from
143/// the order that items are enqueued, depending on which concurrent tasks complete first.
144#[pin_project]
145pub struct WorkQueue<W, K, C>
146where
147    W: Work<K, C>,
148{
149    /// The work callback function.
150    work_fn: W,
151    /// Maximum number of tasks to run concurrently.
152    concurrency: usize,
153    /// Metadata about pending and running work. Modified by the queue itself when running tasks
154    /// and by [WorkSender] to add new tasks to the queue.
155    tasks: Arc<Mutex<HashMap<K, TaskVariants<C, <W::Future as Future>::Output>>>>,
156
157    /// Receiving end of the queue.
158    #[pin]
159    pending: mpsc::UnboundedReceiver<K>,
160    /// Tasks currently being run. Will contain [0, concurrency] futures at any given time.
161    #[pin]
162    running: FuturesUnordered<RunningTask<K, W::Future>>,
163}
164
165impl<W, K, C> WorkQueue<W, K, C>
166where
167    W: Work<K, C>,
168    K: Clone + Eq + Hash,
169{
170    /// Converts this stream of K into a single future that resolves when the stream is
171    /// terminated.
172    pub fn into_future(self) -> impl Future<Output = ()> {
173        self.map(|_res| ()).collect::<()>()
174    }
175
176    /// Starts new work if under the concurrency limit and work is enqueued.
177    /// Returns:
178    /// * Poll::Ready(None) if the input work queue is empty and closed.
179    /// * Poll::Ready(Some(())) if new work was started.
180    /// * Poll::Pending if at the concurrency limit or no work is enqueued.
181    fn find_work(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
182        let mut this = self.project();
183
184        // Nothing to do if the stream of requests is EOF.
185        if this.pending.is_terminated() {
186            return Poll::Ready(None);
187        }
188
189        let mut found = false;
190        while this.running.len() < *this.concurrency {
191            match ready!(this.pending.as_mut().poll_next(cx)) {
192                None => break,
193                Some(key) => {
194                    found = true;
195
196                    // Transition the work info to the running state, claiming the context.
197                    let context = this
198                        .tasks
199                        .lock()
200                        .get_mut(&key)
201                        .expect("map entry to exist if in pending queue")
202                        .start();
203
204                    // WorkSender::push_entry guarantees that key will only be pushed into pending
205                    // if it created the entry in the map, so it is guaranteed here that multiple
206                    // instances of the same key will not be executed concurrently.
207                    let work = this.work_fn.start(key.clone(), context);
208                    let fut = futures::future::join(futures::future::ready(key), work);
209
210                    this.running.push(fut);
211                }
212            }
213        }
214        if found { Poll::Ready(Some(())) } else { Poll::Pending }
215    }
216
217    fn do_work(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<K>> {
218        let mut this = self.project();
219
220        match this.running.as_mut().poll_next(cx) {
221            Poll::Pending => Poll::Pending,
222            Poll::Ready(None) => {
223                // this.running is now terminated, but unlike the guarantees given by other
224                // FusedStream implementations, FuturesUnordered can continue to be polled (unless
225                // new work comes in, it will continue to return Poll::Ready(None)), and new
226                // futures can be pushed into it. Pushing new work on a terminated
227                // FuturesUnordered will cause is_terminated to return false, and polling the
228                // stream will start the task.
229                if this.pending.is_terminated() { Poll::Ready(None) } else { Poll::Pending }
230            }
231            Poll::Ready(Some((key, res))) => {
232                let mut tasks = this.tasks.lock();
233                let infos: &mut TaskVariants<_, _> =
234                    tasks.get_mut(&key).expect("key to exist in map if not resolved");
235
236                if let Some(next_context) = infos.done(res) {
237                    // start the next operation immediately
238                    let work = this.work_fn.start(key.clone(), next_context);
239                    let key_clone = key.clone();
240                    let fut = futures::future::join(futures::future::ready(key_clone), work);
241
242                    drop(tasks);
243                    this.running.push(fut);
244                } else {
245                    // last pending operation with this key
246                    tasks.remove(&key);
247                }
248
249                // Yield the key that was processed to the stream, indicating if processing that
250                // value was successful or not.
251                Poll::Ready(Some(key))
252            }
253        }
254    }
255}
256
257impl<W, K, C> WorkQueue<W, K, C>
258where
259    W: Work<K, C>,
260    <<W as Work<K, C>>::Future as futures::Future>::Output: Send + Sync + 'static,
261    K: std::fmt::Debug + Send + 'static,
262    C: Send + 'static,
263{
264    /// Returns a callback to be given to `fuchsia_inspect::Node::record_lazy_child`.
265    /// Records the keys of the queue using their Debug format along with the number of
266    /// corresponding tasks that are running and pending.
267    pub fn record_lazy_inspect(
268        &self,
269    ) -> impl Fn() -> BoxFuture<'static, Result<fuchsia_inspect::Inspector, anyhow::Error>>
270    + Send
271    + Sync
272    + 'static {
273        let tasks = Arc::downgrade(&self.tasks);
274        move || {
275            let tasks = tasks.clone();
276            async move {
277                let inspector = fuchsia_inspect::Inspector::default();
278                if let Some(tasks) = tasks.upgrade() {
279                    // Drop the lock before the inspect operations in case they are slow.
280                    let tasks = {
281                        tasks
282                            .lock()
283                            .iter()
284                            .map(|(k, v)| (format!("{k:?}"), (v.running(), v.pending())))
285                            .collect::<Vec<_>>()
286                    };
287                    let root = inspector.root();
288                    for (k, (running, pending)) in tasks {
289                        root.record_child(k, |n| {
290                            n.record_uint("running", running as u64);
291                            n.record_uint("pending", pending as u64);
292                        })
293                    }
294                }
295                Ok(inspector)
296            }
297            .boxed()
298        }
299    }
300}
301
302impl<W, K, C> Stream for WorkQueue<W, K, C>
303where
304    W: Work<K, C>,
305    K: Clone + Eq + Hash,
306{
307    type Item = K;
308    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
309        match (self.as_mut().find_work(cx), self.as_mut().do_work(cx)) {
310            (Poll::Ready(None), Poll::Ready(None)) => {
311                // There input queues are empty and closed, and all running work has been
312                // completed. This work queue is now (or was already) terminated.
313                Poll::Ready(None)
314            }
315            (Poll::Ready(Some(())), Poll::Ready(Some(res))) => {
316                // The input queue made progress this iteration and a work item completed.
317                // find_work again to either start more work or register for a wakeup when work
318                // becomes available.
319                let _ = self.as_mut().find_work(cx);
320                Poll::Ready(Some(res))
321            }
322            (_not_ready_none, Poll::Ready(None)) => {
323                // Our active task queue is empty, but more work can still come in. Report this
324                // poll as pending.
325                Poll::Pending
326            }
327            (_, poll) => poll,
328        }
329    }
330}
331
332type RunningTask<K, WF> = futures::future::Join<futures::future::Ready<K>, WF>;
333
334/// A clonable handle to the work queue.  When all clones of [WorkSender] are dropped, the queue
335/// will process all remaining requests and terminate its output stream.
336pub struct WorkSender<K, C, O> {
337    sender: mpsc::UnboundedSender<K>,
338    // Weak reference to ensure that if the queue is dropped, the now unused sender end of the
339    // completion callback will be dropped too, canceling the request.
340    tasks: Weak<Mutex<HashMap<K, TaskVariants<C, O>>>>,
341}
342
343impl<K, C, O> Clone for WorkSender<K, C, O> {
344    fn clone(&self) -> Self {
345        Self { sender: self.sender.clone(), tasks: self.tasks.clone() }
346    }
347}
348
349impl<K, C, O> std::fmt::Debug for WorkSender<K, C, O> {
350    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
351        f.debug_struct("WorkSender").finish()
352    }
353}
354
355impl<K, C, O> WorkSender<K, C, O>
356where
357    K: Clone + Eq + Hash,
358    C: TryMerge,
359    O: Clone,
360{
361    /// Enqueue the given key to be processed by a worker, or attach to an existing request to
362    /// process this key.
363    pub fn push(
364        &self,
365        key: K,
366        context: C,
367    ) -> impl Future<Output = Result<O, Closed>> + use<K, C, O> {
368        let tasks = match self.tasks.upgrade() {
369            Some(tasks) => tasks,
370            None => {
371                // Work queue no longer exists. Immediately cancel this request.
372                return make_canceled_receiver();
373            }
374        };
375        let mut tasks = tasks.lock();
376
377        Self::push_entry(&mut *tasks, &self.sender, key, context)
378    }
379
380    /// Enqueue all the given keys to be processed by a worker, merging them with existing known
381    /// tasks if possible, returning an iterator of the futures that will resolve to the results of
382    /// processing the keys.
383    ///
384    /// This method is similar to, but more efficient than, mapping an iterator to
385    /// `WorkSender::push`.
386    pub fn push_all(
387        &self,
388        entries: impl Iterator<Item = (K, C)>,
389    ) -> impl Iterator<Item = impl Future<Output = Result<O, Closed>>> {
390        let mut tasks = self.tasks.upgrade();
391        let mut tasks = tasks.as_mut().map(|tasks| tasks.lock());
392
393        entries
394            .map(move |(key, context)| {
395                if let Some(ref mut tasks) = tasks {
396                    Self::push_entry(&mut *tasks, &self.sender, key, context)
397                } else {
398                    // Work queue no longer exists. Immediately cancel this request.
399                    make_canceled_receiver()
400                }
401            })
402            .collect::<Vec<_>>()
403            .into_iter()
404    }
405
406    fn push_entry(
407        tasks: &mut HashMap<K, TaskVariants<C, O>>,
408        self_sender: &mpsc::UnboundedSender<K>,
409        key: K,
410        context: C,
411    ) -> Shared<TaskFuture<O>> {
412        use std::collections::hash_map::Entry;
413
414        match tasks.entry(key.clone()) {
415            Entry::Vacant(entry) => {
416                // No other variant of this task is running or pending. Reserve our
417                // spot in line and configure the task's metadata.
418                if let Ok(()) = self_sender.unbounded_send(key) {
419                    let (infos, fut) = TaskVariants::new(context);
420                    entry.insert(infos);
421                    fut
422                } else {
423                    // Work queue no longer exists. Immediately cancel this request.
424                    make_canceled_receiver()
425                }
426            }
427            Entry::Occupied(entry) => entry.into_mut().push(context),
428        }
429    }
430}
431
432#[cfg(test)]
433mod tests {
434    use super::*;
435    use futures::channel::oneshot;
436    use futures::executor::{LocalSpawner, block_on};
437    use futures::task::{LocalSpawnExt, SpawnExt};
438    use std::borrow::Borrow;
439    use std::fmt;
440
441    #[test]
442    fn basic_usage() {
443        async fn do_work(_key: String, _context: ()) -> Result<(), ()> {
444            Ok(())
445        }
446
447        let (processor, enqueue) = work_queue(3, do_work);
448
449        let tasks = FuturesUnordered::new();
450
451        tasks.push(enqueue.push("a".into(), ()));
452        tasks.push(enqueue.push("a".into(), ()));
453        tasks.push(enqueue.push("b".into(), ()));
454        tasks.push(enqueue.push("a".into(), ()));
455        tasks.push(enqueue.push("c".into(), ()));
456
457        drop(enqueue);
458
459        block_on(async move {
460            let (keys, res) = futures::future::join(
461                processor.collect::<Vec<String>>(),
462                tasks.collect::<Vec<Result<Result<(), ()>, _>>>(),
463            )
464            .await;
465            assert_eq!(keys, vec!["a".to_string(), "b".into(), "c".into()]);
466            assert_eq!(res, vec![Ok(Ok(())), Ok(Ok(())), Ok(Ok(())), Ok(Ok(())), Ok(Ok(()))]);
467        });
468    }
469
470    #[test]
471    fn into_future() {
472        async fn nop(key: i32, _context: ()) -> i32 {
473            key
474        }
475        let (processor, enqueue) = work_queue(1, nop);
476
477        let res_fut =
478            future::join3(processor.into_future(), enqueue.push(1, ()), enqueue.push(2, ()));
479        drop(enqueue);
480
481        let res = block_on(res_fut);
482        assert_eq!(res, ((), Ok(1), Ok(2)));
483    }
484
485    #[derive(Debug, PartialEq, Eq)]
486    pub(crate) struct MergeEqual(pub(crate) i32);
487
488    impl TryMerge for MergeEqual {
489        fn try_merge(&mut self, other: Self) -> Result<(), Self> {
490            if self.0 == other.0 { Ok(()) } else { Err(other) }
491        }
492    }
493
494    #[test]
495    fn dropping_queue_fails_requests() {
496        async fn do_work(_key: &str, _context: MergeEqual) -> Result<(), ()> {
497            Ok(())
498        }
499
500        let (processor, enqueue) = work_queue(1, do_work);
501
502        let fut_early_a = enqueue.push("a", MergeEqual(0));
503        let fut_early_b = enqueue.push("a", MergeEqual(1));
504        let fut_early_c = enqueue.push("a", MergeEqual(0));
505        drop(processor);
506        let fut_late = enqueue.push("b", MergeEqual(0));
507
508        block_on(async move {
509            assert_eq!(fut_early_a.await, Err(Closed));
510            assert_eq!(fut_early_b.await, Err(Closed));
511            assert_eq!(fut_early_c.await, Err(Closed));
512            assert_eq!(fut_late.await, Err(Closed));
513
514            let requests = vec![("1", MergeEqual(0)), ("2", MergeEqual(1)), ("1", MergeEqual(0))];
515            for fut in enqueue.push_all(requests.into_iter()) {
516                assert_eq!(fut.await, Err(Closed));
517            }
518        });
519    }
520
521    #[derive(Debug)]
522    struct TestRunningTask<C, O> {
523        unblocker: oneshot::Sender<O>,
524        context: C,
525    }
526
527    #[derive(Debug)]
528    struct TestRunningTasks<K, C, O>
529    where
530        K: Eq + Hash,
531    {
532        tasks: Arc<Mutex<HashMap<K, TestRunningTask<C, O>>>>,
533    }
534
535    impl<K, C, O> TestRunningTasks<K, C, O>
536    where
537        K: fmt::Debug + Eq + Hash + Sized + Clone,
538        C: fmt::Debug,
539        O: fmt::Debug,
540    {
541        fn new() -> Self {
542            Self { tasks: Arc::new(Mutex::new(HashMap::new())) }
543        }
544
545        fn resolve<Q>(&self, key: &Q, res: O) -> C
546        where
547            Q: Eq + Hash + ?Sized,
548            K: Borrow<Q>,
549        {
550            let task =
551                self.tasks.lock().remove(key.borrow()).expect("key to exist in running work");
552            task.unblocker.send(res).unwrap();
553            task.context
554        }
555
556        fn peek(&self) -> Option<K> {
557            self.tasks.lock().keys().next().cloned()
558        }
559
560        fn keys(&self) -> Vec<K> {
561            self.tasks.lock().keys().cloned().collect()
562        }
563
564        fn assert_empty(&self) {
565            assert_eq!(
566                self.tasks.lock().keys().collect::<Vec<&K>>(),
567                Vec::<&K>::new(),
568                "expect queue to be empty"
569            );
570        }
571    }
572
573    #[derive(Debug)]
574    struct TestQueueResults<K> {
575        done: Arc<Mutex<Vec<K>>>,
576        terminated: Arc<Mutex<bool>>,
577    }
578
579    impl<K> TestQueueResults<K> {
580        fn take(&self) -> Vec<K> {
581            std::mem::take(&mut *self.done.lock())
582        }
583
584        fn is_terminated(&self) -> bool {
585            *self.terminated.lock()
586        }
587    }
588
589    #[test]
590    fn check_works_with_sendable_types() {
591        struct TestWork;
592
593        impl Work<(), ()> for TestWork {
594            type Future = futures::future::Ready<()>;
595
596            fn start(&self, _key: (), _context: ()) -> Self::Future {
597                futures::future::ready(())
598            }
599        }
600
601        let (processor, enqueue) = work_queue(3, TestWork);
602
603        let tasks = FuturesUnordered::new();
604        tasks.push(enqueue.push((), ()));
605
606        drop(enqueue);
607
608        let mut executor = futures::executor::LocalPool::new();
609        let handle = executor
610            .spawner()
611            .spawn_with_handle(async move {
612                let (keys, res) =
613                    futures::future::join(processor.collect::<Vec<_>>(), tasks.collect::<Vec<_>>())
614                        .await;
615                assert_eq!(keys, vec![()]);
616                assert_eq!(res, vec![Ok(())]);
617            })
618            .expect("spawn to work");
619        let () = executor.run_until(handle);
620    }
621
622    #[test]
623    fn check_works_with_unsendable_types() {
624        use std::rc::Rc;
625
626        // Unfortunately `impl !Send for $Type` is unstable, so use Rc<()> to make sure WorkQueue
627        // still works.
628        struct TestWork(#[expect(dead_code)] Rc<()>);
629        #[derive(Clone, Debug, PartialEq, Eq, Hash)]
630        struct TestKey(Rc<()>);
631        #[derive(PartialEq, Eq)]
632        struct TestContext(Rc<()>);
633        #[derive(Clone, Debug, PartialEq)]
634        struct TestOutput(Rc<()>);
635
636        impl Work<TestKey, TestContext> for TestWork {
637            type Future = futures::future::Ready<TestOutput>;
638
639            fn start(&self, _key: TestKey, _context: TestContext) -> Self::Future {
640                futures::future::ready(TestOutput(Rc::new(())))
641            }
642        }
643
644        impl TryMerge for TestContext {
645            fn try_merge(&mut self, _: Self) -> Result<(), Self> {
646                Ok(())
647            }
648        }
649
650        let (processor, enqueue) = work_queue(3, TestWork(Rc::new(())));
651
652        let tasks = FuturesUnordered::new();
653        tasks.push(enqueue.push(TestKey(Rc::new(())), TestContext(Rc::new(()))));
654
655        drop(enqueue);
656
657        let mut executor = futures::executor::LocalPool::new();
658        let handle = executor
659            .spawner()
660            .spawn_local_with_handle(async move {
661                let (keys, res) =
662                    futures::future::join(processor.collect::<Vec<_>>(), tasks.collect::<Vec<_>>())
663                        .await;
664                assert_eq!(keys, vec![TestKey(Rc::new(()))]);
665                assert_eq!(res, vec![Ok(TestOutput(Rc::new(())))]);
666            })
667            .expect("spawn to work");
668        let () = executor.run_until(handle);
669    }
670
671    fn spawn_test_work_queue<K, C, O>(
672        spawner: LocalSpawner,
673        concurrency: usize,
674    ) -> (WorkSender<K, C, O>, TestRunningTasks<K, C, O>, TestQueueResults<K>)
675    where
676        K: Send + Clone + fmt::Debug + Eq + Hash + 'static,
677        C: TryMerge + Send + fmt::Debug + 'static,
678        O: Send + Clone + fmt::Debug + 'static,
679    {
680        let running = TestRunningTasks::<K, C, O>::new();
681        let running_tasks = running.tasks.clone();
682        let do_work = move |key: K, context: C| {
683            // wait for the test driver to resolve this work item and return the result it
684            // provides.
685            let (sender, receiver) = oneshot::channel();
686            assert!(
687                running_tasks
688                    .lock()
689                    .insert(key, TestRunningTask::<C, O> { unblocker: sender, context })
690                    .is_none()
691            );
692            async move { receiver.await.unwrap() }
693        };
694
695        let (mut processor, enqueue) = work_queue(concurrency, do_work);
696        let done = Arc::new(Mutex::new(Vec::new()));
697        let terminated = Arc::new(Mutex::new(false));
698        let results =
699            TestQueueResults { done: Arc::clone(&done), terminated: Arc::clone(&terminated) };
700
701        spawner
702            .spawn_local(async move {
703                while let Some(res) = processor.next().await {
704                    done.lock().push(res);
705                }
706                *terminated.lock() = true;
707            })
708            .expect("spawn to succeed");
709
710        (enqueue, running, results)
711    }
712
713    #[test]
714    fn processes_known_work_before_stalling() {
715        let mut executor = futures::executor::LocalPool::new();
716
717        let (enqueue, running, done) =
718            spawn_test_work_queue::<&str, (), usize>(executor.spawner(), 2);
719
720        let task_hello = enqueue.push("hello", ());
721        let task_world = enqueue.push("world!", ());
722        let task_test = enqueue.push("test", ());
723        executor.run_until_stalled();
724        assert_eq!(done.take(), Vec::<&str>::new());
725
726        running.resolve("hello", 5);
727        running.resolve("world!", 6);
728        running.assert_empty();
729        executor.run_until_stalled();
730        assert_eq!(done.take(), vec!["hello", "world!"]);
731
732        assert_eq!(executor.run_until(task_hello), Ok(5));
733        assert_eq!(executor.run_until(task_world), Ok(6));
734
735        running.resolve("test", 4);
736        assert_eq!(executor.run_until(task_test), Ok(4));
737        assert_eq!(done.take(), vec!["test"]);
738    }
739
740    #[test]
741    fn restarts_after_draining_input_queue() {
742        let mut executor = futures::executor::LocalPool::new();
743
744        let (enqueue, running, done) = spawn_test_work_queue::<&str, (), ()>(executor.spawner(), 2);
745
746        // Process a few tasks to completion through the queue.
747        let task_a = enqueue.push("a", ());
748        let task_b = enqueue.push("b", ());
749        executor.run_until_stalled();
750        running.resolve("a", ());
751        running.resolve("b", ());
752        assert_eq!(executor.run_until(task_a), Ok(()));
753        assert_eq!(executor.run_until(task_b), Ok(()));
754        assert_eq!(done.take(), vec!["a", "b"]);
755
756        // Ensure the queue processes more tasks after its inner FuturesUnordered queue has
757        // previously terminated.
758        let task_c = enqueue.push("c", ());
759        executor.run_until_stalled();
760        running.resolve("c", ());
761        assert_eq!(executor.run_until(task_c), Ok(()));
762        assert_eq!(done.take(), vec!["c"]);
763
764        // Also ensure the queue itself terminates once all send handles are dropped and all tasks
765        // are complete.
766        let task_a = enqueue.push("a", ());
767        let task_d = enqueue.push("d", ());
768        drop(enqueue);
769        executor.run_until_stalled();
770        assert!(!done.is_terminated());
771        assert_eq!(done.take(), Vec::<&str>::new());
772        running.resolve("a", ());
773        running.resolve("d", ());
774        assert_eq!(executor.run_until(task_a), Ok(()));
775        assert_eq!(executor.run_until(task_d), Ok(()));
776        assert_eq!(done.take(), vec!["a", "d"]);
777        assert!(done.is_terminated());
778    }
779
780    #[test]
781    fn push_all() {
782        let mut executor = futures::executor::LocalPool::new();
783
784        let (enqueue, running, done) =
785            spawn_test_work_queue::<&str, (), usize>(executor.spawner(), 2);
786
787        let mut futs =
788            enqueue.push_all(vec![("a", ()), ("b", ()), ("c", ()), ("b", ())].into_iter());
789        running.assert_empty();
790
791        executor.run_until_stalled();
792        running.resolve("a", 1);
793        running.resolve("b", 2);
794        running.assert_empty();
795        assert_eq!(executor.run_until(futs.next().unwrap()), Ok(1));
796        assert_eq!(executor.run_until(futs.next().unwrap()), Ok(2));
797
798        running.resolve("c", 3);
799        running.assert_empty();
800        assert_eq!(executor.run_until(futs.next().unwrap()), Ok(3));
801        assert_eq!(executor.run_until(futs.next().unwrap()), Ok(2));
802        assert!(futs.next().is_none());
803
804        assert_eq!(done.take(), vec!["a", "b", "c"]);
805    }
806
807    #[test]
808    fn handles_many_tasks() {
809        let mut executor = futures::executor::LocalPool::new();
810
811        let (enqueue, running, done) =
812            spawn_test_work_queue::<String, (), ()>(executor.spawner(), 5);
813
814        let mut tasks = FuturesUnordered::new();
815
816        for i in 0..10000 {
817            let key = format!("task_{i}");
818            tasks.push(enqueue.push(key, ()));
819        }
820
821        // also queue up some duplicate tasks.
822        let task_dups = enqueue
823            .push_all((0..10000).filter(|i| i % 2 == 0).map(|i| {
824                let key = format!("task_{i}");
825                (key, ())
826            }))
827            .collect::<FuturesUnordered<_>>();
828
829        executor.run_until_stalled();
830
831        while let Some(key) = running.peek() {
832            running.resolve(&key, ());
833            assert_eq!(executor.run_until(tasks.next()), Some(Ok(())));
834            assert_eq!(done.take(), vec![key]);
835        }
836
837        assert_eq!(executor.run_until(task_dups.collect::<Vec<_>>()), vec![Ok(()); 5000]);
838    }
839
840    #[test]
841    fn dedups_compound_keys() {
842        let mut executor = futures::executor::LocalPool::new();
843
844        #[derive(Debug, Clone, PartialEq, Eq, Hash)]
845        struct Params<'a> {
846            key: &'a str,
847            options: &'a [&'a str],
848        }
849
850        let (enqueue, running, done) =
851            spawn_test_work_queue::<Params<'_>, (), &str>(executor.spawner(), 5);
852
853        let key_a = Params { key: "first", options: &[] };
854        let key_b = Params { key: "first", options: &["unique"] };
855        let task_a1 = enqueue.push(key_a.clone(), ());
856        let task_b = enqueue.push(key_b.clone(), ());
857        let task_a2 = enqueue.push(key_a.clone(), ());
858
859        executor.run_until_stalled();
860
861        running.resolve(&key_b, "first_unique");
862        executor.run_until_stalled();
863        assert_eq!(done.take(), vec![key_b]);
864        assert_eq!(executor.run_until(task_b), Ok("first_unique"));
865
866        running.resolve(&key_a, "first_no_options");
867        executor.run_until_stalled();
868        assert_eq!(done.take(), vec![key_a]);
869        assert_eq!(executor.run_until(task_a2), Ok("first_no_options"));
870        assert_eq!(executor.run_until(task_a1), Ok("first_no_options"));
871    }
872
873    #[test]
874    fn merges_context_of_pending_tasks() {
875        let mut executor = futures::executor::LocalPool::new();
876
877        #[derive(Default, Debug, PartialEq, Eq)]
878        struct MyContext(String);
879
880        impl TryMerge for MyContext {
881            fn try_merge(&mut self, other: Self) -> Result<(), Self> {
882                self.0.push_str(&other.0);
883                Ok(())
884            }
885        }
886
887        let (enqueue, running, done) =
888            spawn_test_work_queue::<&str, MyContext, ()>(executor.spawner(), 1);
889
890        let task_a = enqueue.push("dup", MyContext("a".into()));
891        let task_unique = enqueue.push("unique", MyContext("not-deduped".into()));
892        let task_b = enqueue.push("dup", MyContext("b".into()));
893        executor.run_until_stalled();
894        let task_c1 = enqueue.push("dup", MyContext("c".into()));
895        executor.run_until_stalled();
896
897        // "c" not merged in since "dup" was already running with different context.
898        assert_eq!(running.resolve("dup", ()), MyContext("ab".into()));
899        assert_eq!(executor.run_until(task_a), Ok(()));
900        assert_eq!(executor.run_until(task_b), Ok(()));
901        assert_eq!(done.take(), vec!["dup"]);
902
903        // even though "unique" was added to the queue before "dup"/"c", "dup" is given priority
904        // since it was already running.
905        assert_eq!(running.keys(), vec!["dup"]);
906        assert_eq!(running.resolve("dup", ()), MyContext("c".into()));
907        assert_eq!(executor.run_until(task_c1), Ok(()));
908        assert_eq!(done.take(), vec!["dup"]);
909
910        assert_eq!(running.resolve("unique", ()), MyContext("not-deduped".into()));
911        assert_eq!(executor.run_until(task_unique), Ok(()));
912        assert_eq!(done.take(), vec!["unique"]);
913        running.assert_empty();
914
915        // ensure re-running a previously completed item executes it again.
916        let task_c2 = enqueue.push("dup", MyContext("c".into()));
917        executor.run_until_stalled();
918        assert_eq!(running.keys(), vec!["dup"]);
919        assert_eq!(running.resolve("dup", ()), MyContext("c".into()));
920        assert_eq!(executor.run_until(task_c2), Ok(()));
921        assert_eq!(done.take(), vec!["dup"]);
922        running.assert_empty();
923    }
924
925    #[fuchsia::test]
926    async fn inspect() {
927        // Notify the test when the tasks are running.
928        let (sender, mut receiver) = futures::channel::mpsc::channel(0);
929        let do_work = move |_: String, _: MergeEqual| {
930            let mut sender = sender.clone();
931            async move {
932                let () = sender.send(()).await.unwrap();
933                let () = futures::future::pending().await;
934                Ok::<_, ()>(())
935            }
936        };
937        let (mut processor, enqueue) = work_queue(2, do_work);
938
939        let inspector = fuchsia_inspect::Inspector::default();
940        inspector.root().record_lazy_child("queue", processor.record_lazy_inspect());
941        fuchsia_async::Task::spawn(async move { while (processor.next().await).is_some() {} })
942            .detach();
943
944        // Inspect empty before queue is used.
945        diagnostics_assertions::assert_data_tree!(inspector, root: {
946            "queue": {}
947        });
948
949        // Inspect populated when queue non-empty.
950        let _a0 = enqueue.push("a".into(), MergeEqual(0));
951        let _b0 = enqueue.push("b".into(), MergeEqual(0));
952        let _a1 = enqueue.push("a".into(), MergeEqual(1));
953        let _c0 = enqueue.push("c".into(), MergeEqual(0));
954
955        let () = receiver.next().await.unwrap();
956        let () = receiver.next().await.unwrap();
957
958        diagnostics_assertions::assert_data_tree!(inspector, root: {
959            "queue": {
960                r#""a""#: {
961                    "running": 1u64,
962                    "pending": 1u64,
963                },
964                r#""b""#: {
965                    "running": 1u64,
966                    "pending": 0u64,
967                },
968                r#""c""#: {
969                    "running": 0u64,
970                    "pending": 1u64,
971                },
972            }
973        });
974    }
975}