work_queue/
lib.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![deny(missing_docs)]
6#![allow(clippy::type_complexity)]
7#![allow(clippy::let_unit_value)]
8
9//! Concurrent work queue helpers
10
11use fuchsia_sync::Mutex;
12use futures::channel::mpsc;
13use futures::future::{BoxFuture, Shared};
14use futures::prelude::*;
15use futures::ready;
16use futures::stream::{FusedStream, FuturesUnordered};
17use pin_project::pin_project;
18use std::collections::HashMap;
19use std::hash::Hash;
20use std::pin::Pin;
21use std::sync::{Arc, Weak};
22use std::task::{Context, Poll};
23use thiserror::Error;
24
25mod state;
26use state::{TaskFuture, TaskVariants, make_canceled_receiver};
27
28/// Error type indicating a task failed because the queue was dropped before completing the task.
29#[derive(Debug, PartialEq, Eq, Clone, Error)]
30#[error("The queue was dropped before processing this task")]
31pub struct Closed;
32
33/// Trait for merging context for work tasks with the same key.
34///
35/// Implementations must satisfy (for all `a` and `b`):
36/// * `a == b` implies `a.try_merge(b) == Ok(()) && a` was not modified; and
37/// * `a.try_merge(b) == Err(b)` implies `a` was not modified and `b` was returned unmodified.
38pub trait TryMerge: Eq + Sized {
39    /// Attempts to try_merge `other` into `self`, returning `other` if such an operation is not
40    /// possible.
41    ///
42    /// Implementations should return `Ok(())` if other was fully merged into `self`, or return
43    /// `Err(other)`, leaving `self` unchanged if the instances could not be merged.
44    fn try_merge(&mut self, other: Self) -> Result<(), Self>;
45}
46
47impl TryMerge for () {
48    fn try_merge(&mut self, _: ()) -> Result<(), Self> {
49        Ok(())
50    }
51}
52
53/// Creates an unbounded queue of work tasks that will execute up to `concurrency` `worker`s at once.
54///
55/// # Examples
56///
57/// ```
58/// # use queue::*;
59/// # use futures::prelude::*;
60///
61/// #[derive(Debug, Clone)]
62/// enum DownloadError {}
63///
64/// async fn download_file(url: String, _context: ()) -> Result<Vec<u8>, DownloadError> {
65///     // ...
66/// #     Ok(url.bytes().collect())
67/// }
68///
69/// let mut executor = futures::executor::LocalPool::new();
70/// executor.run_until(async move {
71///     let (mut processor, sender) = work_queue(2, download_file);
72///     let mut join_handles = vec![];
73///     for crate_name in vec!["rand", "lazy_static", "serde", "regex"] {
74///         let fut = sender.push(format!("https://crates.io/api/v1/crates/{}", crate_name), ());
75///         join_handles.push((crate_name, fut));
76///     }
77///
78///     // The queue stream won't terminate until all sender clones are dropped.
79///     drop(sender);
80///
81///     while let Some(key) = processor.next().await {
82///         println!("Finished processing {}", key);
83///     }
84///
85///     for (crate_name, fut) in join_handles {
86///         let res = fut
87///             .await
88///             .expect("queue to execute the task")
89///             .expect("downloads can't fail, right?");
90///         println!("Contents of {}: {:?}", crate_name, res);
91///     }
92/// });
93/// ```
94pub fn work_queue<W, K, C>(
95    concurrency: usize,
96    work_fn: W,
97) -> (WorkQueue<W, K, C>, WorkSender<K, C, <W::Future as Future>::Output>)
98where
99    W: Work<K, C>,
100    K: Clone + Eq + Hash,
101    C: TryMerge,
102{
103    let tasks = Arc::new(Mutex::new(HashMap::new()));
104    let (sender, receiver) = mpsc::unbounded();
105    let sender = WorkSender { sender, tasks: Arc::downgrade(&tasks) };
106    (
107        WorkQueue {
108            work_fn,
109            concurrency,
110            pending: receiver,
111            tasks,
112            running: FuturesUnordered::new(),
113        },
114        sender,
115    )
116}
117
118/// Trait that creates a work future from a key and context.
119pub trait Work<K, C> {
120    /// The future that is executed by the WorkQueue.
121    type Future: Future;
122
123    /// Create a new `Future` to be executed by the WorkQueue.
124    fn start(&self, key: K, context: C) -> Self::Future;
125}
126
127impl<F, K, C, WF> Work<K, C> for F
128where
129    F: Fn(K, C) -> WF,
130    WF: Future,
131{
132    type Future = WF;
133
134    fn start(&self, key: K, context: C) -> Self::Future {
135        (self)(key, context)
136    }
137}
138
139/// A work queue that processes a configurable number of tasks concurrently, deduplicating work
140/// with the same key.
141///
142/// Items are yielded from the stream in the order that they are processed, which may differ from
143/// the order that items are enqueued, depending on which concurrent tasks complete first.
144#[pin_project]
145pub struct WorkQueue<W, K, C>
146where
147    W: Work<K, C>,
148{
149    /// The work callback function.
150    work_fn: W,
151    /// Maximum number of tasks to run concurrently.
152    concurrency: usize,
153    /// Metadata about pending and running work. Modified by the queue itself when running tasks
154    /// and by [WorkSender] to add new tasks to the queue.
155    tasks: Arc<Mutex<HashMap<K, TaskVariants<C, <W::Future as Future>::Output>>>>,
156
157    /// Receiving end of the queue.
158    #[pin]
159    pending: mpsc::UnboundedReceiver<K>,
160    /// Tasks currently being run. Will contain [0, concurrency] futures at any given time.
161    #[pin]
162    running: FuturesUnordered<RunningTask<K, W::Future>>,
163}
164
165impl<W, K, C> WorkQueue<W, K, C>
166where
167    W: Work<K, C>,
168    K: Clone + Eq + Hash,
169{
170    /// Converts this stream of K into a single future that resolves when the stream is
171    /// terminated.
172    pub fn into_future(self) -> impl Future<Output = ()> {
173        self.map(|_res| ()).collect::<()>()
174    }
175
176    /// Starts new work if under the concurrency limit and work is enqueued.
177    /// Returns:
178    /// * Poll::Ready(None) if the input work queue is empty and closed.
179    /// * Poll::Ready(Some(())) if new work was started.
180    /// * Poll::Pending if at the concurrency limit or no work is enqueued.
181    fn find_work(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
182        let mut this = self.project();
183
184        // Nothing to do if the stream of requests is EOF.
185        if this.pending.is_terminated() {
186            return Poll::Ready(None);
187        }
188
189        let mut found = false;
190        while this.running.len() < *this.concurrency {
191            match ready!(this.pending.as_mut().poll_next(cx)) {
192                None => break,
193                Some(key) => {
194                    found = true;
195
196                    // Transition the work info to the running state, claiming the context.
197                    let context = this
198                        .tasks
199                        .lock()
200                        .get_mut(&key)
201                        .expect("map entry to exist if in pending queue")
202                        .start();
203
204                    // WorkSender::push_entry guarantees that key will only be pushed into pending
205                    // if it created the entry in the map, so it is guaranteed here that multiple
206                    // instances of the same key will not be executed concurrently.
207                    let work = this.work_fn.start(key.clone(), context);
208                    let fut = futures::future::join(futures::future::ready(key), work);
209
210                    this.running.push(fut);
211                }
212            }
213        }
214        if found { Poll::Ready(Some(())) } else { Poll::Pending }
215    }
216
217    fn do_work(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<K>> {
218        let mut this = self.project();
219
220        match this.running.as_mut().poll_next(cx) {
221            Poll::Pending => Poll::Pending,
222            Poll::Ready(None) => {
223                // this.running is now terminated, but unlike the guarantees given by other
224                // FusedStream implementations, FuturesUnordered can continue to be polled (unless
225                // new work comes in, it will continue to return Poll::Ready(None)), and new
226                // futures can be pushed into it. Pushing new work on a terminated
227                // FuturesUnordered will cause is_terminated to return false, and polling the
228                // stream will start the task.
229                if this.pending.is_terminated() { Poll::Ready(None) } else { Poll::Pending }
230            }
231            Poll::Ready(Some((key, res))) => {
232                let mut tasks = this.tasks.lock();
233                let infos: &mut TaskVariants<_, _> =
234                    tasks.get_mut(&key).expect("key to exist in map if not resolved");
235
236                if let Some(next_context) = infos.done(res) {
237                    // start the next operation immediately
238                    let work = this.work_fn.start(key.clone(), next_context);
239                    let key_clone = key.clone();
240                    let fut = futures::future::join(futures::future::ready(key_clone), work);
241
242                    drop(tasks);
243                    this.running.push(fut);
244                } else {
245                    // last pending operation with this key
246                    tasks.remove(&key);
247                }
248
249                // Yield the key that was processed to the stream, indicating if processing that
250                // value was successful or not.
251                Poll::Ready(Some(key))
252            }
253        }
254    }
255}
256
257impl<W, K, C> WorkQueue<W, K, C>
258where
259    W: Work<K, C>,
260    <<W as Work<K, C>>::Future as futures::Future>::Output: Send + Sync + 'static,
261    K: std::fmt::Debug + Send + 'static,
262    C: Send + 'static,
263{
264    /// Returns a callback to be given to `fuchsia_inspect::Node::record_lazy_child`.
265    /// Records the keys of the queue using their Debug format along with the number of
266    /// corresponding tasks that are running and pending.
267    pub fn record_lazy_inspect(
268        &self,
269    ) -> impl Fn() -> BoxFuture<'static, Result<fuchsia_inspect::Inspector, anyhow::Error>>
270    + Send
271    + Sync
272    + 'static {
273        let tasks = Arc::downgrade(&self.tasks);
274        move || {
275            let tasks = tasks.clone();
276            async move {
277                let inspector = fuchsia_inspect::Inspector::default();
278                if let Some(tasks) = tasks.upgrade() {
279                    // Drop the lock before the inspect operations in case they are slow.
280                    let tasks = {
281                        tasks
282                            .lock()
283                            .iter()
284                            .map(|(k, v)| (format!("{k:?}"), (v.running(), v.pending())))
285                            .collect::<Vec<_>>()
286                    };
287                    let root = inspector.root();
288                    for (k, (running, pending)) in tasks {
289                        root.record_child(k, |n| {
290                            n.record_uint("running", running as u64);
291                            n.record_uint("pending", pending as u64);
292                        })
293                    }
294                }
295                Ok(inspector)
296            }
297            .boxed()
298        }
299    }
300}
301
302impl<W, K, C> Stream for WorkQueue<W, K, C>
303where
304    W: Work<K, C>,
305    K: Clone + Eq + Hash,
306{
307    type Item = K;
308    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
309        match (self.as_mut().find_work(cx), self.as_mut().do_work(cx)) {
310            (Poll::Ready(None), Poll::Ready(None)) => {
311                // There input queues are empty and closed, and all running work has been
312                // completed. This work queue is now (or was already) terminated.
313                Poll::Ready(None)
314            }
315            (Poll::Ready(Some(())), Poll::Ready(Some(res))) => {
316                // The input queue made progress this iteration and a work item completed.
317                // find_work again to either start more work or register for a wakeup when work
318                // becomes available.
319                let _ = self.as_mut().find_work(cx);
320                Poll::Ready(Some(res))
321            }
322            (_not_ready_none, Poll::Ready(None)) => {
323                // Our active task queue is empty, but more work can still come in. Report this
324                // poll as pending.
325                Poll::Pending
326            }
327            (_, poll) => poll,
328        }
329    }
330}
331
332type RunningTask<K, WF> = futures::future::Join<futures::future::Ready<K>, WF>;
333
334/// A clonable handle to the work queue.  When all clones of [WorkSender] are dropped, the queue
335/// will process all remaining requests and terminate its output stream.
336pub struct WorkSender<K, C, O> {
337    sender: mpsc::UnboundedSender<K>,
338    // Weak reference to ensure that if the queue is dropped, the now unused sender end of the
339    // completion callback will be dropped too, canceling the request.
340    tasks: Weak<Mutex<HashMap<K, TaskVariants<C, O>>>>,
341}
342
343impl<K, C, O> Clone for WorkSender<K, C, O> {
344    fn clone(&self) -> Self {
345        Self { sender: self.sender.clone(), tasks: self.tasks.clone() }
346    }
347}
348
349impl<K, C, O> std::fmt::Debug for WorkSender<K, C, O> {
350    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
351        f.debug_struct("WorkSender").finish()
352    }
353}
354
355impl<K, C, O> WorkSender<K, C, O>
356where
357    K: Clone + Eq + Hash,
358    C: TryMerge,
359    O: Clone,
360{
361    /// Enqueue the given key to be processed by a worker, or attach to an existing request to
362    /// process this key.
363    pub fn push(&self, key: K, context: C) -> impl Future<Output = Result<O, Closed>> {
364        let tasks = match self.tasks.upgrade() {
365            Some(tasks) => tasks,
366            None => {
367                // Work queue no longer exists. Immediately cancel this request.
368                return make_canceled_receiver();
369            }
370        };
371        let mut tasks = tasks.lock();
372
373        Self::push_entry(&mut *tasks, &self.sender, key, context)
374    }
375
376    /// Enqueue all the given keys to be processed by a worker, merging them with existing known
377    /// tasks if possible, returning an iterator of the futures that will resolve to the results of
378    /// processing the keys.
379    ///
380    /// This method is similar to, but more efficient than, mapping an iterator to
381    /// `WorkSender::push`.
382    pub fn push_all(
383        &self,
384        entries: impl Iterator<Item = (K, C)>,
385    ) -> impl Iterator<Item = impl Future<Output = Result<O, Closed>>> {
386        let mut tasks = self.tasks.upgrade();
387        let mut tasks = tasks.as_mut().map(|tasks| tasks.lock());
388
389        entries
390            .map(move |(key, context)| {
391                if let Some(ref mut tasks) = tasks {
392                    Self::push_entry(&mut *tasks, &self.sender, key, context)
393                } else {
394                    // Work queue no longer exists. Immediately cancel this request.
395                    make_canceled_receiver()
396                }
397            })
398            .collect::<Vec<_>>()
399            .into_iter()
400    }
401
402    fn push_entry(
403        tasks: &mut HashMap<K, TaskVariants<C, O>>,
404        self_sender: &mpsc::UnboundedSender<K>,
405        key: K,
406        context: C,
407    ) -> Shared<TaskFuture<O>> {
408        use std::collections::hash_map::Entry;
409
410        match tasks.entry(key.clone()) {
411            Entry::Vacant(entry) => {
412                // No other variant of this task is running or pending. Reserve our
413                // spot in line and configure the task's metadata.
414                if let Ok(()) = self_sender.unbounded_send(key) {
415                    let (infos, fut) = TaskVariants::new(context);
416                    entry.insert(infos);
417                    fut
418                } else {
419                    // Work queue no longer exists. Immediately cancel this request.
420                    make_canceled_receiver()
421                }
422            }
423            Entry::Occupied(entry) => entry.into_mut().push(context),
424        }
425    }
426}
427
428#[cfg(test)]
429mod tests {
430    use super::*;
431    use futures::channel::oneshot;
432    use futures::executor::{LocalSpawner, block_on};
433    use futures::task::{LocalSpawnExt, SpawnExt};
434    use std::borrow::Borrow;
435    use std::fmt;
436
437    #[test]
438    fn basic_usage() {
439        async fn do_work(_key: String, _context: ()) -> Result<(), ()> {
440            Ok(())
441        }
442
443        let (processor, enqueue) = work_queue(3, do_work);
444
445        let tasks = FuturesUnordered::new();
446
447        tasks.push(enqueue.push("a".into(), ()));
448        tasks.push(enqueue.push("a".into(), ()));
449        tasks.push(enqueue.push("b".into(), ()));
450        tasks.push(enqueue.push("a".into(), ()));
451        tasks.push(enqueue.push("c".into(), ()));
452
453        drop(enqueue);
454
455        block_on(async move {
456            let (keys, res) = futures::future::join(
457                processor.collect::<Vec<String>>(),
458                tasks.collect::<Vec<Result<Result<(), ()>, _>>>(),
459            )
460            .await;
461            assert_eq!(keys, vec!["a".to_string(), "b".into(), "c".into()]);
462            assert_eq!(res, vec![Ok(Ok(())), Ok(Ok(())), Ok(Ok(())), Ok(Ok(())), Ok(Ok(()))]);
463        });
464    }
465
466    #[test]
467    fn into_future() {
468        async fn nop(key: i32, _context: ()) -> i32 {
469            key
470        }
471        let (processor, enqueue) = work_queue(1, nop);
472
473        let res_fut =
474            future::join3(processor.into_future(), enqueue.push(1, ()), enqueue.push(2, ()));
475        drop(enqueue);
476
477        let res = block_on(res_fut);
478        assert_eq!(res, ((), Ok(1), Ok(2)));
479    }
480
481    #[derive(Debug, PartialEq, Eq)]
482    pub(crate) struct MergeEqual(pub(crate) i32);
483
484    impl TryMerge for MergeEqual {
485        fn try_merge(&mut self, other: Self) -> Result<(), Self> {
486            if self.0 == other.0 { Ok(()) } else { Err(other) }
487        }
488    }
489
490    #[test]
491    fn dropping_queue_fails_requests() {
492        async fn do_work(_key: &str, _context: MergeEqual) -> Result<(), ()> {
493            Ok(())
494        }
495
496        let (processor, enqueue) = work_queue(1, do_work);
497
498        let fut_early_a = enqueue.push("a", MergeEqual(0));
499        let fut_early_b = enqueue.push("a", MergeEqual(1));
500        let fut_early_c = enqueue.push("a", MergeEqual(0));
501        drop(processor);
502        let fut_late = enqueue.push("b", MergeEqual(0));
503
504        block_on(async move {
505            assert_eq!(fut_early_a.await, Err(Closed));
506            assert_eq!(fut_early_b.await, Err(Closed));
507            assert_eq!(fut_early_c.await, Err(Closed));
508            assert_eq!(fut_late.await, Err(Closed));
509
510            let requests = vec![("1", MergeEqual(0)), ("2", MergeEqual(1)), ("1", MergeEqual(0))];
511            for fut in enqueue.push_all(requests.into_iter()) {
512                assert_eq!(fut.await, Err(Closed));
513            }
514        });
515    }
516
517    #[derive(Debug)]
518    struct TestRunningTask<C, O> {
519        unblocker: oneshot::Sender<O>,
520        context: C,
521    }
522
523    #[derive(Debug)]
524    struct TestRunningTasks<K, C, O>
525    where
526        K: Eq + Hash,
527    {
528        tasks: Arc<Mutex<HashMap<K, TestRunningTask<C, O>>>>,
529    }
530
531    impl<K, C, O> TestRunningTasks<K, C, O>
532    where
533        K: fmt::Debug + Eq + Hash + Sized + Clone,
534        C: fmt::Debug,
535        O: fmt::Debug,
536    {
537        fn new() -> Self {
538            Self { tasks: Arc::new(Mutex::new(HashMap::new())) }
539        }
540
541        fn resolve<Q>(&self, key: &Q, res: O) -> C
542        where
543            Q: Eq + Hash + ?Sized,
544            K: Borrow<Q>,
545        {
546            let task =
547                self.tasks.lock().remove(key.borrow()).expect("key to exist in running work");
548            task.unblocker.send(res).unwrap();
549            task.context
550        }
551
552        fn peek(&self) -> Option<K> {
553            self.tasks.lock().keys().next().cloned()
554        }
555
556        fn keys(&self) -> Vec<K> {
557            self.tasks.lock().keys().cloned().collect()
558        }
559
560        fn assert_empty(&self) {
561            assert_eq!(
562                self.tasks.lock().keys().collect::<Vec<&K>>(),
563                Vec::<&K>::new(),
564                "expect queue to be empty"
565            );
566        }
567    }
568
569    #[derive(Debug)]
570    struct TestQueueResults<K> {
571        done: Arc<Mutex<Vec<K>>>,
572        terminated: Arc<Mutex<bool>>,
573    }
574
575    impl<K> TestQueueResults<K> {
576        fn take(&self) -> Vec<K> {
577            std::mem::take(&mut *self.done.lock())
578        }
579
580        fn is_terminated(&self) -> bool {
581            *self.terminated.lock()
582        }
583    }
584
585    #[test]
586    fn check_works_with_sendable_types() {
587        struct TestWork;
588
589        impl Work<(), ()> for TestWork {
590            type Future = futures::future::Ready<()>;
591
592            fn start(&self, _key: (), _context: ()) -> Self::Future {
593                futures::future::ready(())
594            }
595        }
596
597        let (processor, enqueue) = work_queue(3, TestWork);
598
599        let tasks = FuturesUnordered::new();
600        tasks.push(enqueue.push((), ()));
601
602        drop(enqueue);
603
604        let mut executor = futures::executor::LocalPool::new();
605        let handle = executor
606            .spawner()
607            .spawn_with_handle(async move {
608                let (keys, res) =
609                    futures::future::join(processor.collect::<Vec<_>>(), tasks.collect::<Vec<_>>())
610                        .await;
611                assert_eq!(keys, vec![()]);
612                assert_eq!(res, vec![Ok(())]);
613            })
614            .expect("spawn to work");
615        let () = executor.run_until(handle);
616    }
617
618    #[test]
619    fn check_works_with_unsendable_types() {
620        use std::rc::Rc;
621
622        // Unfortunately `impl !Send for $Type` is unstable, so use Rc<()> to make sure WorkQueue
623        // still works.
624        struct TestWork(#[expect(dead_code)] Rc<()>);
625        #[derive(Clone, Debug, PartialEq, Eq, Hash)]
626        struct TestKey(Rc<()>);
627        #[derive(PartialEq, Eq)]
628        struct TestContext(Rc<()>);
629        #[derive(Clone, Debug, PartialEq)]
630        struct TestOutput(Rc<()>);
631
632        impl Work<TestKey, TestContext> for TestWork {
633            type Future = futures::future::Ready<TestOutput>;
634
635            fn start(&self, _key: TestKey, _context: TestContext) -> Self::Future {
636                futures::future::ready(TestOutput(Rc::new(())))
637            }
638        }
639
640        impl TryMerge for TestContext {
641            fn try_merge(&mut self, _: Self) -> Result<(), Self> {
642                Ok(())
643            }
644        }
645
646        let (processor, enqueue) = work_queue(3, TestWork(Rc::new(())));
647
648        let tasks = FuturesUnordered::new();
649        tasks.push(enqueue.push(TestKey(Rc::new(())), TestContext(Rc::new(()))));
650
651        drop(enqueue);
652
653        let mut executor = futures::executor::LocalPool::new();
654        let handle = executor
655            .spawner()
656            .spawn_local_with_handle(async move {
657                let (keys, res) =
658                    futures::future::join(processor.collect::<Vec<_>>(), tasks.collect::<Vec<_>>())
659                        .await;
660                assert_eq!(keys, vec![TestKey(Rc::new(()))]);
661                assert_eq!(res, vec![Ok(TestOutput(Rc::new(())))]);
662            })
663            .expect("spawn to work");
664        let () = executor.run_until(handle);
665    }
666
667    fn spawn_test_work_queue<K, C, O>(
668        spawner: LocalSpawner,
669        concurrency: usize,
670    ) -> (WorkSender<K, C, O>, TestRunningTasks<K, C, O>, TestQueueResults<K>)
671    where
672        K: Send + Clone + fmt::Debug + Eq + Hash + 'static,
673        C: TryMerge + Send + fmt::Debug + 'static,
674        O: Send + Clone + fmt::Debug + 'static,
675    {
676        let running = TestRunningTasks::<K, C, O>::new();
677        let running_tasks = running.tasks.clone();
678        let do_work = move |key: K, context: C| {
679            // wait for the test driver to resolve this work item and return the result it
680            // provides.
681            let (sender, receiver) = oneshot::channel();
682            assert!(
683                running_tasks
684                    .lock()
685                    .insert(key, TestRunningTask::<C, O> { unblocker: sender, context })
686                    .is_none()
687            );
688            async move { receiver.await.unwrap() }
689        };
690
691        let (mut processor, enqueue) = work_queue(concurrency, do_work);
692        let done = Arc::new(Mutex::new(Vec::new()));
693        let terminated = Arc::new(Mutex::new(false));
694        let results =
695            TestQueueResults { done: Arc::clone(&done), terminated: Arc::clone(&terminated) };
696
697        spawner
698            .spawn_local(async move {
699                while let Some(res) = processor.next().await {
700                    done.lock().push(res);
701                }
702                *terminated.lock() = true;
703            })
704            .expect("spawn to succeed");
705
706        (enqueue, running, results)
707    }
708
709    #[test]
710    fn processes_known_work_before_stalling() {
711        let mut executor = futures::executor::LocalPool::new();
712
713        let (enqueue, running, done) =
714            spawn_test_work_queue::<&str, (), usize>(executor.spawner(), 2);
715
716        let task_hello = enqueue.push("hello", ());
717        let task_world = enqueue.push("world!", ());
718        let task_test = enqueue.push("test", ());
719        executor.run_until_stalled();
720        assert_eq!(done.take(), Vec::<&str>::new());
721
722        running.resolve("hello", 5);
723        running.resolve("world!", 6);
724        running.assert_empty();
725        executor.run_until_stalled();
726        assert_eq!(done.take(), vec!["hello", "world!"]);
727
728        assert_eq!(executor.run_until(task_hello), Ok(5));
729        assert_eq!(executor.run_until(task_world), Ok(6));
730
731        running.resolve("test", 4);
732        assert_eq!(executor.run_until(task_test), Ok(4));
733        assert_eq!(done.take(), vec!["test"]);
734    }
735
736    #[test]
737    fn restarts_after_draining_input_queue() {
738        let mut executor = futures::executor::LocalPool::new();
739
740        let (enqueue, running, done) = spawn_test_work_queue::<&str, (), ()>(executor.spawner(), 2);
741
742        // Process a few tasks to completion through the queue.
743        let task_a = enqueue.push("a", ());
744        let task_b = enqueue.push("b", ());
745        executor.run_until_stalled();
746        running.resolve("a", ());
747        running.resolve("b", ());
748        assert_eq!(executor.run_until(task_a), Ok(()));
749        assert_eq!(executor.run_until(task_b), Ok(()));
750        assert_eq!(done.take(), vec!["a", "b"]);
751
752        // Ensure the queue processes more tasks after its inner FuturesUnordered queue has
753        // previously terminated.
754        let task_c = enqueue.push("c", ());
755        executor.run_until_stalled();
756        running.resolve("c", ());
757        assert_eq!(executor.run_until(task_c), Ok(()));
758        assert_eq!(done.take(), vec!["c"]);
759
760        // Also ensure the queue itself terminates once all send handles are dropped and all tasks
761        // are complete.
762        let task_a = enqueue.push("a", ());
763        let task_d = enqueue.push("d", ());
764        drop(enqueue);
765        executor.run_until_stalled();
766        assert!(!done.is_terminated());
767        assert_eq!(done.take(), Vec::<&str>::new());
768        running.resolve("a", ());
769        running.resolve("d", ());
770        assert_eq!(executor.run_until(task_a), Ok(()));
771        assert_eq!(executor.run_until(task_d), Ok(()));
772        assert_eq!(done.take(), vec!["a", "d"]);
773        assert!(done.is_terminated());
774    }
775
776    #[test]
777    fn push_all() {
778        let mut executor = futures::executor::LocalPool::new();
779
780        let (enqueue, running, done) =
781            spawn_test_work_queue::<&str, (), usize>(executor.spawner(), 2);
782
783        let mut futs =
784            enqueue.push_all(vec![("a", ()), ("b", ()), ("c", ()), ("b", ())].into_iter());
785        running.assert_empty();
786
787        executor.run_until_stalled();
788        running.resolve("a", 1);
789        running.resolve("b", 2);
790        running.assert_empty();
791        assert_eq!(executor.run_until(futs.next().unwrap()), Ok(1));
792        assert_eq!(executor.run_until(futs.next().unwrap()), Ok(2));
793
794        running.resolve("c", 3);
795        running.assert_empty();
796        assert_eq!(executor.run_until(futs.next().unwrap()), Ok(3));
797        assert_eq!(executor.run_until(futs.next().unwrap()), Ok(2));
798        assert!(futs.next().is_none());
799
800        assert_eq!(done.take(), vec!["a", "b", "c"]);
801    }
802
803    #[test]
804    fn handles_many_tasks() {
805        let mut executor = futures::executor::LocalPool::new();
806
807        let (enqueue, running, done) =
808            spawn_test_work_queue::<String, (), ()>(executor.spawner(), 5);
809
810        let mut tasks = FuturesUnordered::new();
811
812        for i in 0..10000 {
813            let key = format!("task_{i}");
814            tasks.push(enqueue.push(key, ()));
815        }
816
817        // also queue up some duplicate tasks.
818        let task_dups = enqueue
819            .push_all((0..10000).filter(|i| i % 2 == 0).map(|i| {
820                let key = format!("task_{i}");
821                (key, ())
822            }))
823            .collect::<FuturesUnordered<_>>();
824
825        executor.run_until_stalled();
826
827        while let Some(key) = running.peek() {
828            running.resolve(&key, ());
829            assert_eq!(executor.run_until(tasks.next()), Some(Ok(())));
830            assert_eq!(done.take(), vec![key]);
831        }
832
833        assert_eq!(executor.run_until(task_dups.collect::<Vec<_>>()), vec![Ok(()); 5000]);
834    }
835
836    #[test]
837    fn dedups_compound_keys() {
838        let mut executor = futures::executor::LocalPool::new();
839
840        #[derive(Debug, Clone, PartialEq, Eq, Hash)]
841        struct Params<'a> {
842            key: &'a str,
843            options: &'a [&'a str],
844        }
845
846        let (enqueue, running, done) =
847            spawn_test_work_queue::<Params<'_>, (), &str>(executor.spawner(), 5);
848
849        let key_a = Params { key: "first", options: &[] };
850        let key_b = Params { key: "first", options: &["unique"] };
851        let task_a1 = enqueue.push(key_a.clone(), ());
852        let task_b = enqueue.push(key_b.clone(), ());
853        let task_a2 = enqueue.push(key_a.clone(), ());
854
855        executor.run_until_stalled();
856
857        running.resolve(&key_b, "first_unique");
858        executor.run_until_stalled();
859        assert_eq!(done.take(), vec![key_b]);
860        assert_eq!(executor.run_until(task_b), Ok("first_unique"));
861
862        running.resolve(&key_a, "first_no_options");
863        executor.run_until_stalled();
864        assert_eq!(done.take(), vec![key_a]);
865        assert_eq!(executor.run_until(task_a2), Ok("first_no_options"));
866        assert_eq!(executor.run_until(task_a1), Ok("first_no_options"));
867    }
868
869    #[test]
870    fn merges_context_of_pending_tasks() {
871        let mut executor = futures::executor::LocalPool::new();
872
873        #[derive(Default, Debug, PartialEq, Eq)]
874        struct MyContext(String);
875
876        impl TryMerge for MyContext {
877            fn try_merge(&mut self, other: Self) -> Result<(), Self> {
878                self.0.push_str(&other.0);
879                Ok(())
880            }
881        }
882
883        let (enqueue, running, done) =
884            spawn_test_work_queue::<&str, MyContext, ()>(executor.spawner(), 1);
885
886        let task_a = enqueue.push("dup", MyContext("a".into()));
887        let task_unique = enqueue.push("unique", MyContext("not-deduped".into()));
888        let task_b = enqueue.push("dup", MyContext("b".into()));
889        executor.run_until_stalled();
890        let task_c1 = enqueue.push("dup", MyContext("c".into()));
891        executor.run_until_stalled();
892
893        // "c" not merged in since "dup" was already running with different context.
894        assert_eq!(running.resolve("dup", ()), MyContext("ab".into()));
895        assert_eq!(executor.run_until(task_a), Ok(()));
896        assert_eq!(executor.run_until(task_b), Ok(()));
897        assert_eq!(done.take(), vec!["dup"]);
898
899        // even though "unique" was added to the queue before "dup"/"c", "dup" is given priority
900        // since it was already running.
901        assert_eq!(running.keys(), vec!["dup"]);
902        assert_eq!(running.resolve("dup", ()), MyContext("c".into()));
903        assert_eq!(executor.run_until(task_c1), Ok(()));
904        assert_eq!(done.take(), vec!["dup"]);
905
906        assert_eq!(running.resolve("unique", ()), MyContext("not-deduped".into()));
907        assert_eq!(executor.run_until(task_unique), Ok(()));
908        assert_eq!(done.take(), vec!["unique"]);
909        running.assert_empty();
910
911        // ensure re-running a previously completed item executes it again.
912        let task_c2 = enqueue.push("dup", MyContext("c".into()));
913        executor.run_until_stalled();
914        assert_eq!(running.keys(), vec!["dup"]);
915        assert_eq!(running.resolve("dup", ()), MyContext("c".into()));
916        assert_eq!(executor.run_until(task_c2), Ok(()));
917        assert_eq!(done.take(), vec!["dup"]);
918        running.assert_empty();
919    }
920
921    #[fuchsia::test]
922    async fn inspect() {
923        // Notify the test when the tasks are running.
924        let (sender, mut receiver) = futures::channel::mpsc::channel(0);
925        let do_work = move |_: String, _: MergeEqual| {
926            let mut sender = sender.clone();
927            async move {
928                let () = sender.send(()).await.unwrap();
929                let () = futures::future::pending().await;
930                Ok::<_, ()>(())
931            }
932        };
933        let (mut processor, enqueue) = work_queue(2, do_work);
934
935        let inspector = fuchsia_inspect::Inspector::default();
936        inspector.root().record_lazy_child("queue", processor.record_lazy_inspect());
937        fuchsia_async::Task::spawn(async move { while (processor.next().await).is_some() {} })
938            .detach();
939
940        // Inspect empty before queue is used.
941        diagnostics_assertions::assert_data_tree!(inspector, root: {
942            "queue": {}
943        });
944
945        // Inspect populated when queue non-empty.
946        let _a0 = enqueue.push("a".into(), MergeEqual(0));
947        let _b0 = enqueue.push("b".into(), MergeEqual(0));
948        let _a1 = enqueue.push("a".into(), MergeEqual(1));
949        let _c0 = enqueue.push("c".into(), MergeEqual(0));
950
951        let () = receiver.next().await.unwrap();
952        let () = receiver.next().await.unwrap();
953
954        diagnostics_assertions::assert_data_tree!(inspector, root: {
955            "queue": {
956                r#""a""#: {
957                    "running": 1u64,
958                    "pending": 1u64,
959                },
960                r#""b""#: {
961                    "running": 1u64,
962                    "pending": 0u64,
963                },
964                r#""c""#: {
965                    "running": 0u64,
966                    "pending": 1u64,
967                },
968            }
969        });
970    }
971}