1#![deny(missing_docs)]
6#![allow(clippy::type_complexity)]
7#![allow(clippy::let_unit_value)]
8
9use fuchsia_sync::Mutex;
12use futures::channel::mpsc;
13use futures::future::{BoxFuture, Shared};
14use futures::prelude::*;
15use futures::ready;
16use futures::stream::{FusedStream, FuturesUnordered};
17use pin_project::pin_project;
18use std::collections::HashMap;
19use std::hash::Hash;
20use std::pin::Pin;
21use std::sync::{Arc, Weak};
22use std::task::{Context, Poll};
23use thiserror::Error;
24
25mod state;
26use state::{TaskFuture, TaskVariants, make_canceled_receiver};
27
28#[derive(Debug, PartialEq, Eq, Clone, Error)]
30#[error("The queue was dropped before processing this task")]
31pub struct Closed;
32
33pub trait TryMerge: Eq + Sized {
39 fn try_merge(&mut self, other: Self) -> Result<(), Self>;
45}
46
47impl TryMerge for () {
48 fn try_merge(&mut self, _: ()) -> Result<(), Self> {
49 Ok(())
50 }
51}
52
53pub fn work_queue<W, K, C>(
95 concurrency: usize,
96 work_fn: W,
97) -> (WorkQueue<W, K, C>, WorkSender<K, C, <W::Future as Future>::Output>)
98where
99 W: Work<K, C>,
100 K: Clone + Eq + Hash,
101 C: TryMerge,
102{
103 let tasks = Arc::new(Mutex::new(HashMap::new()));
104 let (sender, receiver) = mpsc::unbounded();
105 let sender = WorkSender { sender, tasks: Arc::downgrade(&tasks) };
106 (
107 WorkQueue {
108 work_fn,
109 concurrency,
110 pending: receiver,
111 tasks,
112 running: FuturesUnordered::new(),
113 },
114 sender,
115 )
116}
117
118pub trait Work<K, C> {
120 type Future: Future;
122
123 fn start(&self, key: K, context: C) -> Self::Future;
125}
126
127impl<F, K, C, WF> Work<K, C> for F
128where
129 F: Fn(K, C) -> WF,
130 WF: Future,
131{
132 type Future = WF;
133
134 fn start(&self, key: K, context: C) -> Self::Future {
135 (self)(key, context)
136 }
137}
138
139#[pin_project]
145pub struct WorkQueue<W, K, C>
146where
147 W: Work<K, C>,
148{
149 work_fn: W,
151 concurrency: usize,
153 tasks: Arc<Mutex<HashMap<K, TaskVariants<C, <W::Future as Future>::Output>>>>,
156
157 #[pin]
159 pending: mpsc::UnboundedReceiver<K>,
160 #[pin]
162 running: FuturesUnordered<RunningTask<K, W::Future>>,
163}
164
165impl<W, K, C> WorkQueue<W, K, C>
166where
167 W: Work<K, C>,
168 K: Clone + Eq + Hash,
169{
170 pub fn into_future(self) -> impl Future<Output = ()> {
173 self.map(|_res| ()).collect::<()>()
174 }
175
176 fn find_work(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
182 let mut this = self.project();
183
184 if this.pending.is_terminated() {
186 return Poll::Ready(None);
187 }
188
189 let mut found = false;
190 while this.running.len() < *this.concurrency {
191 match ready!(this.pending.as_mut().poll_next(cx)) {
192 None => break,
193 Some(key) => {
194 found = true;
195
196 let context = this
198 .tasks
199 .lock()
200 .get_mut(&key)
201 .expect("map entry to exist if in pending queue")
202 .start();
203
204 let work = this.work_fn.start(key.clone(), context);
208 let fut = futures::future::join(futures::future::ready(key), work);
209
210 this.running.push(fut);
211 }
212 }
213 }
214 if found { Poll::Ready(Some(())) } else { Poll::Pending }
215 }
216
217 fn do_work(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<K>> {
218 let mut this = self.project();
219
220 match this.running.as_mut().poll_next(cx) {
221 Poll::Pending => Poll::Pending,
222 Poll::Ready(None) => {
223 if this.pending.is_terminated() { Poll::Ready(None) } else { Poll::Pending }
230 }
231 Poll::Ready(Some((key, res))) => {
232 let mut tasks = this.tasks.lock();
233 let infos: &mut TaskVariants<_, _> =
234 tasks.get_mut(&key).expect("key to exist in map if not resolved");
235
236 if let Some(next_context) = infos.done(res) {
237 let work = this.work_fn.start(key.clone(), next_context);
239 let key_clone = key.clone();
240 let fut = futures::future::join(futures::future::ready(key_clone), work);
241
242 drop(tasks);
243 this.running.push(fut);
244 } else {
245 tasks.remove(&key);
247 }
248
249 Poll::Ready(Some(key))
252 }
253 }
254 }
255}
256
257impl<W, K, C> WorkQueue<W, K, C>
258where
259 W: Work<K, C>,
260 <<W as Work<K, C>>::Future as futures::Future>::Output: Send + Sync + 'static,
261 K: std::fmt::Debug + Send + 'static,
262 C: Send + 'static,
263{
264 pub fn record_lazy_inspect(
268 &self,
269 ) -> impl Fn() -> BoxFuture<'static, Result<fuchsia_inspect::Inspector, anyhow::Error>>
270 + Send
271 + Sync
272 + 'static {
273 let tasks = Arc::downgrade(&self.tasks);
274 move || {
275 let tasks = tasks.clone();
276 async move {
277 let inspector = fuchsia_inspect::Inspector::default();
278 if let Some(tasks) = tasks.upgrade() {
279 let tasks = {
281 tasks
282 .lock()
283 .iter()
284 .map(|(k, v)| (format!("{k:?}"), (v.running(), v.pending())))
285 .collect::<Vec<_>>()
286 };
287 let root = inspector.root();
288 for (k, (running, pending)) in tasks {
289 root.record_child(k, |n| {
290 n.record_uint("running", running as u64);
291 n.record_uint("pending", pending as u64);
292 })
293 }
294 }
295 Ok(inspector)
296 }
297 .boxed()
298 }
299 }
300}
301
302impl<W, K, C> Stream for WorkQueue<W, K, C>
303where
304 W: Work<K, C>,
305 K: Clone + Eq + Hash,
306{
307 type Item = K;
308 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
309 match (self.as_mut().find_work(cx), self.as_mut().do_work(cx)) {
310 (Poll::Ready(None), Poll::Ready(None)) => {
311 Poll::Ready(None)
314 }
315 (Poll::Ready(Some(())), Poll::Ready(Some(res))) => {
316 let _ = self.as_mut().find_work(cx);
320 Poll::Ready(Some(res))
321 }
322 (_not_ready_none, Poll::Ready(None)) => {
323 Poll::Pending
326 }
327 (_, poll) => poll,
328 }
329 }
330}
331
332type RunningTask<K, WF> = futures::future::Join<futures::future::Ready<K>, WF>;
333
334pub struct WorkSender<K, C, O> {
337 sender: mpsc::UnboundedSender<K>,
338 tasks: Weak<Mutex<HashMap<K, TaskVariants<C, O>>>>,
341}
342
343impl<K, C, O> Clone for WorkSender<K, C, O> {
344 fn clone(&self) -> Self {
345 Self { sender: self.sender.clone(), tasks: self.tasks.clone() }
346 }
347}
348
349impl<K, C, O> std::fmt::Debug for WorkSender<K, C, O> {
350 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
351 f.debug_struct("WorkSender").finish()
352 }
353}
354
355impl<K, C, O> WorkSender<K, C, O>
356where
357 K: Clone + Eq + Hash,
358 C: TryMerge,
359 O: Clone,
360{
361 pub fn push(
364 &self,
365 key: K,
366 context: C,
367 ) -> impl Future<Output = Result<O, Closed>> + use<K, C, O> {
368 let tasks = match self.tasks.upgrade() {
369 Some(tasks) => tasks,
370 None => {
371 return make_canceled_receiver();
373 }
374 };
375 let mut tasks = tasks.lock();
376
377 Self::push_entry(&mut *tasks, &self.sender, key, context)
378 }
379
380 pub fn push_all(
387 &self,
388 entries: impl Iterator<Item = (K, C)>,
389 ) -> impl Iterator<Item = impl Future<Output = Result<O, Closed>>> {
390 let mut tasks = self.tasks.upgrade();
391 let mut tasks = tasks.as_mut().map(|tasks| tasks.lock());
392
393 entries
394 .map(move |(key, context)| {
395 if let Some(ref mut tasks) = tasks {
396 Self::push_entry(&mut *tasks, &self.sender, key, context)
397 } else {
398 make_canceled_receiver()
400 }
401 })
402 .collect::<Vec<_>>()
403 .into_iter()
404 }
405
406 fn push_entry(
407 tasks: &mut HashMap<K, TaskVariants<C, O>>,
408 self_sender: &mpsc::UnboundedSender<K>,
409 key: K,
410 context: C,
411 ) -> Shared<TaskFuture<O>> {
412 use std::collections::hash_map::Entry;
413
414 match tasks.entry(key.clone()) {
415 Entry::Vacant(entry) => {
416 if let Ok(()) = self_sender.unbounded_send(key) {
419 let (infos, fut) = TaskVariants::new(context);
420 entry.insert(infos);
421 fut
422 } else {
423 make_canceled_receiver()
425 }
426 }
427 Entry::Occupied(entry) => entry.into_mut().push(context),
428 }
429 }
430}
431
432#[cfg(test)]
433mod tests {
434 use super::*;
435 use futures::channel::oneshot;
436 use futures::executor::{LocalSpawner, block_on};
437 use futures::task::{LocalSpawnExt, SpawnExt};
438 use std::borrow::Borrow;
439 use std::fmt;
440
441 #[test]
442 fn basic_usage() {
443 async fn do_work(_key: String, _context: ()) -> Result<(), ()> {
444 Ok(())
445 }
446
447 let (processor, enqueue) = work_queue(3, do_work);
448
449 let tasks = FuturesUnordered::new();
450
451 tasks.push(enqueue.push("a".into(), ()));
452 tasks.push(enqueue.push("a".into(), ()));
453 tasks.push(enqueue.push("b".into(), ()));
454 tasks.push(enqueue.push("a".into(), ()));
455 tasks.push(enqueue.push("c".into(), ()));
456
457 drop(enqueue);
458
459 block_on(async move {
460 let (keys, res) = futures::future::join(
461 processor.collect::<Vec<String>>(),
462 tasks.collect::<Vec<Result<Result<(), ()>, _>>>(),
463 )
464 .await;
465 assert_eq!(keys, vec!["a".to_string(), "b".into(), "c".into()]);
466 assert_eq!(res, vec![Ok(Ok(())), Ok(Ok(())), Ok(Ok(())), Ok(Ok(())), Ok(Ok(()))]);
467 });
468 }
469
470 #[test]
471 fn into_future() {
472 async fn nop(key: i32, _context: ()) -> i32 {
473 key
474 }
475 let (processor, enqueue) = work_queue(1, nop);
476
477 let res_fut =
478 future::join3(processor.into_future(), enqueue.push(1, ()), enqueue.push(2, ()));
479 drop(enqueue);
480
481 let res = block_on(res_fut);
482 assert_eq!(res, ((), Ok(1), Ok(2)));
483 }
484
485 #[derive(Debug, PartialEq, Eq)]
486 pub(crate) struct MergeEqual(pub(crate) i32);
487
488 impl TryMerge for MergeEqual {
489 fn try_merge(&mut self, other: Self) -> Result<(), Self> {
490 if self.0 == other.0 { Ok(()) } else { Err(other) }
491 }
492 }
493
494 #[test]
495 fn dropping_queue_fails_requests() {
496 async fn do_work(_key: &str, _context: MergeEqual) -> Result<(), ()> {
497 Ok(())
498 }
499
500 let (processor, enqueue) = work_queue(1, do_work);
501
502 let fut_early_a = enqueue.push("a", MergeEqual(0));
503 let fut_early_b = enqueue.push("a", MergeEqual(1));
504 let fut_early_c = enqueue.push("a", MergeEqual(0));
505 drop(processor);
506 let fut_late = enqueue.push("b", MergeEqual(0));
507
508 block_on(async move {
509 assert_eq!(fut_early_a.await, Err(Closed));
510 assert_eq!(fut_early_b.await, Err(Closed));
511 assert_eq!(fut_early_c.await, Err(Closed));
512 assert_eq!(fut_late.await, Err(Closed));
513
514 let requests = vec![("1", MergeEqual(0)), ("2", MergeEqual(1)), ("1", MergeEqual(0))];
515 for fut in enqueue.push_all(requests.into_iter()) {
516 assert_eq!(fut.await, Err(Closed));
517 }
518 });
519 }
520
521 #[derive(Debug)]
522 struct TestRunningTask<C, O> {
523 unblocker: oneshot::Sender<O>,
524 context: C,
525 }
526
527 #[derive(Debug)]
528 struct TestRunningTasks<K, C, O>
529 where
530 K: Eq + Hash,
531 {
532 tasks: Arc<Mutex<HashMap<K, TestRunningTask<C, O>>>>,
533 }
534
535 impl<K, C, O> TestRunningTasks<K, C, O>
536 where
537 K: fmt::Debug + Eq + Hash + Sized + Clone,
538 C: fmt::Debug,
539 O: fmt::Debug,
540 {
541 fn new() -> Self {
542 Self { tasks: Arc::new(Mutex::new(HashMap::new())) }
543 }
544
545 fn resolve<Q>(&self, key: &Q, res: O) -> C
546 where
547 Q: Eq + Hash + ?Sized,
548 K: Borrow<Q>,
549 {
550 let task =
551 self.tasks.lock().remove(key.borrow()).expect("key to exist in running work");
552 task.unblocker.send(res).unwrap();
553 task.context
554 }
555
556 fn peek(&self) -> Option<K> {
557 self.tasks.lock().keys().next().cloned()
558 }
559
560 fn keys(&self) -> Vec<K> {
561 self.tasks.lock().keys().cloned().collect()
562 }
563
564 fn assert_empty(&self) {
565 assert_eq!(
566 self.tasks.lock().keys().collect::<Vec<&K>>(),
567 Vec::<&K>::new(),
568 "expect queue to be empty"
569 );
570 }
571 }
572
573 #[derive(Debug)]
574 struct TestQueueResults<K> {
575 done: Arc<Mutex<Vec<K>>>,
576 terminated: Arc<Mutex<bool>>,
577 }
578
579 impl<K> TestQueueResults<K> {
580 fn take(&self) -> Vec<K> {
581 std::mem::take(&mut *self.done.lock())
582 }
583
584 fn is_terminated(&self) -> bool {
585 *self.terminated.lock()
586 }
587 }
588
589 #[test]
590 fn check_works_with_sendable_types() {
591 struct TestWork;
592
593 impl Work<(), ()> for TestWork {
594 type Future = futures::future::Ready<()>;
595
596 fn start(&self, _key: (), _context: ()) -> Self::Future {
597 futures::future::ready(())
598 }
599 }
600
601 let (processor, enqueue) = work_queue(3, TestWork);
602
603 let tasks = FuturesUnordered::new();
604 tasks.push(enqueue.push((), ()));
605
606 drop(enqueue);
607
608 let mut executor = futures::executor::LocalPool::new();
609 let handle = executor
610 .spawner()
611 .spawn_with_handle(async move {
612 let (keys, res) =
613 futures::future::join(processor.collect::<Vec<_>>(), tasks.collect::<Vec<_>>())
614 .await;
615 assert_eq!(keys, vec![()]);
616 assert_eq!(res, vec![Ok(())]);
617 })
618 .expect("spawn to work");
619 let () = executor.run_until(handle);
620 }
621
622 #[test]
623 fn check_works_with_unsendable_types() {
624 use std::rc::Rc;
625
626 struct TestWork(#[expect(dead_code)] Rc<()>);
629 #[derive(Clone, Debug, PartialEq, Eq, Hash)]
630 struct TestKey(Rc<()>);
631 #[derive(PartialEq, Eq)]
632 struct TestContext(Rc<()>);
633 #[derive(Clone, Debug, PartialEq)]
634 struct TestOutput(Rc<()>);
635
636 impl Work<TestKey, TestContext> for TestWork {
637 type Future = futures::future::Ready<TestOutput>;
638
639 fn start(&self, _key: TestKey, _context: TestContext) -> Self::Future {
640 futures::future::ready(TestOutput(Rc::new(())))
641 }
642 }
643
644 impl TryMerge for TestContext {
645 fn try_merge(&mut self, _: Self) -> Result<(), Self> {
646 Ok(())
647 }
648 }
649
650 let (processor, enqueue) = work_queue(3, TestWork(Rc::new(())));
651
652 let tasks = FuturesUnordered::new();
653 tasks.push(enqueue.push(TestKey(Rc::new(())), TestContext(Rc::new(()))));
654
655 drop(enqueue);
656
657 let mut executor = futures::executor::LocalPool::new();
658 let handle = executor
659 .spawner()
660 .spawn_local_with_handle(async move {
661 let (keys, res) =
662 futures::future::join(processor.collect::<Vec<_>>(), tasks.collect::<Vec<_>>())
663 .await;
664 assert_eq!(keys, vec![TestKey(Rc::new(()))]);
665 assert_eq!(res, vec![Ok(TestOutput(Rc::new(())))]);
666 })
667 .expect("spawn to work");
668 let () = executor.run_until(handle);
669 }
670
671 fn spawn_test_work_queue<K, C, O>(
672 spawner: LocalSpawner,
673 concurrency: usize,
674 ) -> (WorkSender<K, C, O>, TestRunningTasks<K, C, O>, TestQueueResults<K>)
675 where
676 K: Send + Clone + fmt::Debug + Eq + Hash + 'static,
677 C: TryMerge + Send + fmt::Debug + 'static,
678 O: Send + Clone + fmt::Debug + 'static,
679 {
680 let running = TestRunningTasks::<K, C, O>::new();
681 let running_tasks = running.tasks.clone();
682 let do_work = move |key: K, context: C| {
683 let (sender, receiver) = oneshot::channel();
686 assert!(
687 running_tasks
688 .lock()
689 .insert(key, TestRunningTask::<C, O> { unblocker: sender, context })
690 .is_none()
691 );
692 async move { receiver.await.unwrap() }
693 };
694
695 let (mut processor, enqueue) = work_queue(concurrency, do_work);
696 let done = Arc::new(Mutex::new(Vec::new()));
697 let terminated = Arc::new(Mutex::new(false));
698 let results =
699 TestQueueResults { done: Arc::clone(&done), terminated: Arc::clone(&terminated) };
700
701 spawner
702 .spawn_local(async move {
703 while let Some(res) = processor.next().await {
704 done.lock().push(res);
705 }
706 *terminated.lock() = true;
707 })
708 .expect("spawn to succeed");
709
710 (enqueue, running, results)
711 }
712
713 #[test]
714 fn processes_known_work_before_stalling() {
715 let mut executor = futures::executor::LocalPool::new();
716
717 let (enqueue, running, done) =
718 spawn_test_work_queue::<&str, (), usize>(executor.spawner(), 2);
719
720 let task_hello = enqueue.push("hello", ());
721 let task_world = enqueue.push("world!", ());
722 let task_test = enqueue.push("test", ());
723 executor.run_until_stalled();
724 assert_eq!(done.take(), Vec::<&str>::new());
725
726 running.resolve("hello", 5);
727 running.resolve("world!", 6);
728 running.assert_empty();
729 executor.run_until_stalled();
730 assert_eq!(done.take(), vec!["hello", "world!"]);
731
732 assert_eq!(executor.run_until(task_hello), Ok(5));
733 assert_eq!(executor.run_until(task_world), Ok(6));
734
735 running.resolve("test", 4);
736 assert_eq!(executor.run_until(task_test), Ok(4));
737 assert_eq!(done.take(), vec!["test"]);
738 }
739
740 #[test]
741 fn restarts_after_draining_input_queue() {
742 let mut executor = futures::executor::LocalPool::new();
743
744 let (enqueue, running, done) = spawn_test_work_queue::<&str, (), ()>(executor.spawner(), 2);
745
746 let task_a = enqueue.push("a", ());
748 let task_b = enqueue.push("b", ());
749 executor.run_until_stalled();
750 running.resolve("a", ());
751 running.resolve("b", ());
752 assert_eq!(executor.run_until(task_a), Ok(()));
753 assert_eq!(executor.run_until(task_b), Ok(()));
754 assert_eq!(done.take(), vec!["a", "b"]);
755
756 let task_c = enqueue.push("c", ());
759 executor.run_until_stalled();
760 running.resolve("c", ());
761 assert_eq!(executor.run_until(task_c), Ok(()));
762 assert_eq!(done.take(), vec!["c"]);
763
764 let task_a = enqueue.push("a", ());
767 let task_d = enqueue.push("d", ());
768 drop(enqueue);
769 executor.run_until_stalled();
770 assert!(!done.is_terminated());
771 assert_eq!(done.take(), Vec::<&str>::new());
772 running.resolve("a", ());
773 running.resolve("d", ());
774 assert_eq!(executor.run_until(task_a), Ok(()));
775 assert_eq!(executor.run_until(task_d), Ok(()));
776 assert_eq!(done.take(), vec!["a", "d"]);
777 assert!(done.is_terminated());
778 }
779
780 #[test]
781 fn push_all() {
782 let mut executor = futures::executor::LocalPool::new();
783
784 let (enqueue, running, done) =
785 spawn_test_work_queue::<&str, (), usize>(executor.spawner(), 2);
786
787 let mut futs =
788 enqueue.push_all(vec![("a", ()), ("b", ()), ("c", ()), ("b", ())].into_iter());
789 running.assert_empty();
790
791 executor.run_until_stalled();
792 running.resolve("a", 1);
793 running.resolve("b", 2);
794 running.assert_empty();
795 assert_eq!(executor.run_until(futs.next().unwrap()), Ok(1));
796 assert_eq!(executor.run_until(futs.next().unwrap()), Ok(2));
797
798 running.resolve("c", 3);
799 running.assert_empty();
800 assert_eq!(executor.run_until(futs.next().unwrap()), Ok(3));
801 assert_eq!(executor.run_until(futs.next().unwrap()), Ok(2));
802 assert!(futs.next().is_none());
803
804 assert_eq!(done.take(), vec!["a", "b", "c"]);
805 }
806
807 #[test]
808 fn handles_many_tasks() {
809 let mut executor = futures::executor::LocalPool::new();
810
811 let (enqueue, running, done) =
812 spawn_test_work_queue::<String, (), ()>(executor.spawner(), 5);
813
814 let mut tasks = FuturesUnordered::new();
815
816 for i in 0..10000 {
817 let key = format!("task_{i}");
818 tasks.push(enqueue.push(key, ()));
819 }
820
821 let task_dups = enqueue
823 .push_all((0..10000).filter(|i| i % 2 == 0).map(|i| {
824 let key = format!("task_{i}");
825 (key, ())
826 }))
827 .collect::<FuturesUnordered<_>>();
828
829 executor.run_until_stalled();
830
831 while let Some(key) = running.peek() {
832 running.resolve(&key, ());
833 assert_eq!(executor.run_until(tasks.next()), Some(Ok(())));
834 assert_eq!(done.take(), vec![key]);
835 }
836
837 assert_eq!(executor.run_until(task_dups.collect::<Vec<_>>()), vec![Ok(()); 5000]);
838 }
839
840 #[test]
841 fn dedups_compound_keys() {
842 let mut executor = futures::executor::LocalPool::new();
843
844 #[derive(Debug, Clone, PartialEq, Eq, Hash)]
845 struct Params<'a> {
846 key: &'a str,
847 options: &'a [&'a str],
848 }
849
850 let (enqueue, running, done) =
851 spawn_test_work_queue::<Params<'_>, (), &str>(executor.spawner(), 5);
852
853 let key_a = Params { key: "first", options: &[] };
854 let key_b = Params { key: "first", options: &["unique"] };
855 let task_a1 = enqueue.push(key_a.clone(), ());
856 let task_b = enqueue.push(key_b.clone(), ());
857 let task_a2 = enqueue.push(key_a.clone(), ());
858
859 executor.run_until_stalled();
860
861 running.resolve(&key_b, "first_unique");
862 executor.run_until_stalled();
863 assert_eq!(done.take(), vec![key_b]);
864 assert_eq!(executor.run_until(task_b), Ok("first_unique"));
865
866 running.resolve(&key_a, "first_no_options");
867 executor.run_until_stalled();
868 assert_eq!(done.take(), vec![key_a]);
869 assert_eq!(executor.run_until(task_a2), Ok("first_no_options"));
870 assert_eq!(executor.run_until(task_a1), Ok("first_no_options"));
871 }
872
873 #[test]
874 fn merges_context_of_pending_tasks() {
875 let mut executor = futures::executor::LocalPool::new();
876
877 #[derive(Default, Debug, PartialEq, Eq)]
878 struct MyContext(String);
879
880 impl TryMerge for MyContext {
881 fn try_merge(&mut self, other: Self) -> Result<(), Self> {
882 self.0.push_str(&other.0);
883 Ok(())
884 }
885 }
886
887 let (enqueue, running, done) =
888 spawn_test_work_queue::<&str, MyContext, ()>(executor.spawner(), 1);
889
890 let task_a = enqueue.push("dup", MyContext("a".into()));
891 let task_unique = enqueue.push("unique", MyContext("not-deduped".into()));
892 let task_b = enqueue.push("dup", MyContext("b".into()));
893 executor.run_until_stalled();
894 let task_c1 = enqueue.push("dup", MyContext("c".into()));
895 executor.run_until_stalled();
896
897 assert_eq!(running.resolve("dup", ()), MyContext("ab".into()));
899 assert_eq!(executor.run_until(task_a), Ok(()));
900 assert_eq!(executor.run_until(task_b), Ok(()));
901 assert_eq!(done.take(), vec!["dup"]);
902
903 assert_eq!(running.keys(), vec!["dup"]);
906 assert_eq!(running.resolve("dup", ()), MyContext("c".into()));
907 assert_eq!(executor.run_until(task_c1), Ok(()));
908 assert_eq!(done.take(), vec!["dup"]);
909
910 assert_eq!(running.resolve("unique", ()), MyContext("not-deduped".into()));
911 assert_eq!(executor.run_until(task_unique), Ok(()));
912 assert_eq!(done.take(), vec!["unique"]);
913 running.assert_empty();
914
915 let task_c2 = enqueue.push("dup", MyContext("c".into()));
917 executor.run_until_stalled();
918 assert_eq!(running.keys(), vec!["dup"]);
919 assert_eq!(running.resolve("dup", ()), MyContext("c".into()));
920 assert_eq!(executor.run_until(task_c2), Ok(()));
921 assert_eq!(done.take(), vec!["dup"]);
922 running.assert_empty();
923 }
924
925 #[fuchsia::test]
926 async fn inspect() {
927 let (sender, mut receiver) = futures::channel::mpsc::channel(0);
929 let do_work = move |_: String, _: MergeEqual| {
930 let mut sender = sender.clone();
931 async move {
932 let () = sender.send(()).await.unwrap();
933 let () = futures::future::pending().await;
934 Ok::<_, ()>(())
935 }
936 };
937 let (mut processor, enqueue) = work_queue(2, do_work);
938
939 let inspector = fuchsia_inspect::Inspector::default();
940 inspector.root().record_lazy_child("queue", processor.record_lazy_inspect());
941 fuchsia_async::Task::spawn(async move { while (processor.next().await).is_some() {} })
942 .detach();
943
944 diagnostics_assertions::assert_data_tree!(inspector, root: {
946 "queue": {}
947 });
948
949 let _a0 = enqueue.push("a".into(), MergeEqual(0));
951 let _b0 = enqueue.push("b".into(), MergeEqual(0));
952 let _a1 = enqueue.push("a".into(), MergeEqual(1));
953 let _c0 = enqueue.push("c".into(), MergeEqual(0));
954
955 let () = receiver.next().await.unwrap();
956 let () = receiver.next().await.unwrap();
957
958 diagnostics_assertions::assert_data_tree!(inspector, root: {
959 "queue": {
960 r#""a""#: {
961 "running": 1u64,
962 "pending": 1u64,
963 },
964 r#""b""#: {
965 "running": 1u64,
966 "pending": 0u64,
967 },
968 r#""c""#: {
969 "running": 0u64,
970 "pending": 1u64,
971 },
972 }
973 });
974 }
975}