1#![deny(missing_docs)]
6#![allow(clippy::type_complexity)]
7#![allow(clippy::let_unit_value)]
8
9use fuchsia_sync::Mutex;
12use futures::channel::mpsc;
13use futures::future::{BoxFuture, Shared};
14use futures::prelude::*;
15use futures::ready;
16use futures::stream::{FusedStream, FuturesUnordered};
17use pin_project::pin_project;
18use std::collections::HashMap;
19use std::hash::Hash;
20use std::pin::Pin;
21use std::sync::{Arc, Weak};
22use std::task::{Context, Poll};
23use thiserror::Error;
24
25mod state;
26use state::{make_canceled_receiver, TaskFuture, TaskVariants};
27
28#[derive(Debug, PartialEq, Eq, Clone, Error)]
30#[error("The queue was dropped before processing this task")]
31pub struct Closed;
32
33pub trait TryMerge: Eq + Sized {
39 fn try_merge(&mut self, other: Self) -> Result<(), Self>;
45}
46
47impl TryMerge for () {
48 fn try_merge(&mut self, _: ()) -> Result<(), Self> {
49 Ok(())
50 }
51}
52
53pub fn work_queue<W, K, C>(
95 concurrency: usize,
96 work_fn: W,
97) -> (WorkQueue<W, K, C>, WorkSender<K, C, <W::Future as Future>::Output>)
98where
99 W: Work<K, C>,
100 K: Clone + Eq + Hash,
101 C: TryMerge,
102{
103 let tasks = Arc::new(Mutex::new(HashMap::new()));
104 let (sender, receiver) = mpsc::unbounded();
105 let sender = WorkSender { sender, tasks: Arc::downgrade(&tasks) };
106 (
107 WorkQueue {
108 work_fn,
109 concurrency,
110 pending: receiver,
111 tasks,
112 running: FuturesUnordered::new(),
113 },
114 sender,
115 )
116}
117
118pub trait Work<K, C> {
120 type Future: Future;
122
123 fn start(&self, key: K, context: C) -> Self::Future;
125}
126
127impl<F, K, C, WF> Work<K, C> for F
128where
129 F: Fn(K, C) -> WF,
130 WF: Future,
131{
132 type Future = WF;
133
134 fn start(&self, key: K, context: C) -> Self::Future {
135 (self)(key, context)
136 }
137}
138
139#[pin_project]
145pub struct WorkQueue<W, K, C>
146where
147 W: Work<K, C>,
148{
149 work_fn: W,
151 concurrency: usize,
153 tasks: Arc<Mutex<HashMap<K, TaskVariants<C, <W::Future as Future>::Output>>>>,
156
157 #[pin]
159 pending: mpsc::UnboundedReceiver<K>,
160 #[pin]
162 running: FuturesUnordered<RunningTask<K, W::Future>>,
163}
164
165impl<W, K, C> WorkQueue<W, K, C>
166where
167 W: Work<K, C>,
168 K: Clone + Eq + Hash,
169{
170 pub fn into_future(self) -> impl Future<Output = ()> {
173 self.map(|_res| ()).collect::<()>()
174 }
175
176 fn find_work(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
182 let mut this = self.project();
183
184 if this.pending.is_terminated() {
186 return Poll::Ready(None);
187 }
188
189 let mut found = false;
190 while this.running.len() < *this.concurrency {
191 match ready!(this.pending.as_mut().poll_next(cx)) {
192 None => break,
193 Some(key) => {
194 found = true;
195
196 let context = this
198 .tasks
199 .lock()
200 .get_mut(&key)
201 .expect("map entry to exist if in pending queue")
202 .start();
203
204 let work = this.work_fn.start(key.clone(), context);
208 let fut = futures::future::join(futures::future::ready(key), work);
209
210 this.running.push(fut);
211 }
212 }
213 }
214 if found {
215 Poll::Ready(Some(()))
216 } else {
217 Poll::Pending
218 }
219 }
220
221 fn do_work(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<K>> {
222 let mut this = self.project();
223
224 match this.running.as_mut().poll_next(cx) {
225 Poll::Pending => Poll::Pending,
226 Poll::Ready(None) => {
227 if this.pending.is_terminated() {
234 Poll::Ready(None)
235 } else {
236 Poll::Pending
237 }
238 }
239 Poll::Ready(Some((key, res))) => {
240 let mut tasks = this.tasks.lock();
241 let infos: &mut TaskVariants<_, _> =
242 tasks.get_mut(&key).expect("key to exist in map if not resolved");
243
244 if let Some(next_context) = infos.done(res) {
245 let work = this.work_fn.start(key.clone(), next_context);
247 let key_clone = key.clone();
248 let fut = futures::future::join(futures::future::ready(key_clone), work);
249
250 drop(tasks);
251 this.running.push(fut);
252 } else {
253 tasks.remove(&key);
255 }
256
257 Poll::Ready(Some(key))
260 }
261 }
262 }
263}
264
265impl<W, K, C> WorkQueue<W, K, C>
266where
267 W: Work<K, C>,
268 <<W as Work<K, C>>::Future as futures::Future>::Output: Send + Sync + 'static,
269 K: std::fmt::Debug + Send + 'static,
270 C: Send + 'static,
271{
272 pub fn record_lazy_inspect(
276 &self,
277 ) -> impl Fn() -> BoxFuture<'static, Result<fuchsia_inspect::Inspector, anyhow::Error>>
278 + Send
279 + Sync
280 + 'static {
281 let tasks = Arc::downgrade(&self.tasks);
282 move || {
283 let tasks = tasks.clone();
284 async move {
285 let inspector = fuchsia_inspect::Inspector::default();
286 if let Some(tasks) = tasks.upgrade() {
287 let tasks = {
289 tasks
290 .lock()
291 .iter()
292 .map(|(k, v)| (format!("{k:?}"), (v.running(), v.pending())))
293 .collect::<Vec<_>>()
294 };
295 let root = inspector.root();
296 for (k, (running, pending)) in tasks {
297 root.record_child(k, |n| {
298 n.record_uint("running", running as u64);
299 n.record_uint("pending", pending as u64);
300 })
301 }
302 }
303 Ok(inspector)
304 }
305 .boxed()
306 }
307 }
308}
309
310impl<W, K, C> Stream for WorkQueue<W, K, C>
311where
312 W: Work<K, C>,
313 K: Clone + Eq + Hash,
314{
315 type Item = K;
316 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
317 match (self.as_mut().find_work(cx), self.as_mut().do_work(cx)) {
318 (Poll::Ready(None), Poll::Ready(None)) => {
319 Poll::Ready(None)
322 }
323 (Poll::Ready(Some(())), Poll::Ready(Some(res))) => {
324 let _ = self.as_mut().find_work(cx);
328 Poll::Ready(Some(res))
329 }
330 (_not_ready_none, Poll::Ready(None)) => {
331 Poll::Pending
334 }
335 (_, poll) => poll,
336 }
337 }
338}
339
340type RunningTask<K, WF> = futures::future::Join<futures::future::Ready<K>, WF>;
341
342pub struct WorkSender<K, C, O> {
345 sender: mpsc::UnboundedSender<K>,
346 tasks: Weak<Mutex<HashMap<K, TaskVariants<C, O>>>>,
349}
350
351impl<K, C, O> Clone for WorkSender<K, C, O> {
352 fn clone(&self) -> Self {
353 Self { sender: self.sender.clone(), tasks: self.tasks.clone() }
354 }
355}
356
357impl<K, C, O> std::fmt::Debug for WorkSender<K, C, O> {
358 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
359 f.debug_struct("WorkSender").finish()
360 }
361}
362
363impl<K, C, O> WorkSender<K, C, O>
364where
365 K: Clone + Eq + Hash,
366 C: TryMerge,
367 O: Clone,
368{
369 pub fn push(&self, key: K, context: C) -> impl Future<Output = Result<O, Closed>> {
372 let tasks = match self.tasks.upgrade() {
373 Some(tasks) => tasks,
374 None => {
375 return make_canceled_receiver();
377 }
378 };
379 let mut tasks = tasks.lock();
380
381 Self::push_entry(&mut *tasks, &self.sender, key, context)
382 }
383
384 pub fn push_all(
391 &self,
392 entries: impl Iterator<Item = (K, C)>,
393 ) -> impl Iterator<Item = impl Future<Output = Result<O, Closed>>> {
394 let mut tasks = self.tasks.upgrade();
395 let mut tasks = tasks.as_mut().map(|tasks| tasks.lock());
396
397 entries
398 .map(move |(key, context)| {
399 if let Some(ref mut tasks) = tasks {
400 Self::push_entry(&mut *tasks, &self.sender, key, context)
401 } else {
402 make_canceled_receiver()
404 }
405 })
406 .collect::<Vec<_>>()
407 .into_iter()
408 }
409
410 fn push_entry(
411 tasks: &mut HashMap<K, TaskVariants<C, O>>,
412 self_sender: &mpsc::UnboundedSender<K>,
413 key: K,
414 context: C,
415 ) -> Shared<TaskFuture<O>> {
416 use std::collections::hash_map::Entry;
417
418 match tasks.entry(key.clone()) {
419 Entry::Vacant(entry) => {
420 if let Ok(()) = self_sender.unbounded_send(key) {
423 let (infos, fut) = TaskVariants::new(context);
424 entry.insert(infos);
425 fut
426 } else {
427 make_canceled_receiver()
429 }
430 }
431 Entry::Occupied(entry) => entry.into_mut().push(context),
432 }
433 }
434}
435
436#[cfg(test)]
437mod tests {
438 use super::*;
439 use futures::channel::oneshot;
440 use futures::executor::{block_on, LocalSpawner};
441 use futures::task::{LocalSpawnExt, SpawnExt};
442 use std::borrow::Borrow;
443 use std::fmt;
444
445 #[test]
446 fn basic_usage() {
447 async fn do_work(_key: String, _context: ()) -> Result<(), ()> {
448 Ok(())
449 }
450
451 let (processor, enqueue) = work_queue(3, do_work);
452
453 let tasks = FuturesUnordered::new();
454
455 tasks.push(enqueue.push("a".into(), ()));
456 tasks.push(enqueue.push("a".into(), ()));
457 tasks.push(enqueue.push("b".into(), ()));
458 tasks.push(enqueue.push("a".into(), ()));
459 tasks.push(enqueue.push("c".into(), ()));
460
461 drop(enqueue);
462
463 block_on(async move {
464 let (keys, res) = futures::future::join(
465 processor.collect::<Vec<String>>(),
466 tasks.collect::<Vec<Result<Result<(), ()>, _>>>(),
467 )
468 .await;
469 assert_eq!(keys, vec!["a".to_string(), "b".into(), "c".into()]);
470 assert_eq!(res, vec![Ok(Ok(())), Ok(Ok(())), Ok(Ok(())), Ok(Ok(())), Ok(Ok(()))]);
471 });
472 }
473
474 #[test]
475 fn into_future() {
476 async fn nop(key: i32, _context: ()) -> i32 {
477 key
478 }
479 let (processor, enqueue) = work_queue(1, nop);
480
481 let res_fut =
482 future::join3(processor.into_future(), enqueue.push(1, ()), enqueue.push(2, ()));
483 drop(enqueue);
484
485 let res = block_on(res_fut);
486 assert_eq!(res, ((), Ok(1), Ok(2)));
487 }
488
489 #[derive(Debug, PartialEq, Eq)]
490 pub(crate) struct MergeEqual(pub(crate) i32);
491
492 impl TryMerge for MergeEqual {
493 fn try_merge(&mut self, other: Self) -> Result<(), Self> {
494 if self.0 == other.0 {
495 Ok(())
496 } else {
497 Err(other)
498 }
499 }
500 }
501
502 #[test]
503 fn dropping_queue_fails_requests() {
504 async fn do_work(_key: &str, _context: MergeEqual) -> Result<(), ()> {
505 Ok(())
506 }
507
508 let (processor, enqueue) = work_queue(1, do_work);
509
510 let fut_early_a = enqueue.push("a", MergeEqual(0));
511 let fut_early_b = enqueue.push("a", MergeEqual(1));
512 let fut_early_c = enqueue.push("a", MergeEqual(0));
513 drop(processor);
514 let fut_late = enqueue.push("b", MergeEqual(0));
515
516 block_on(async move {
517 assert_eq!(fut_early_a.await, Err(Closed));
518 assert_eq!(fut_early_b.await, Err(Closed));
519 assert_eq!(fut_early_c.await, Err(Closed));
520 assert_eq!(fut_late.await, Err(Closed));
521
522 let requests = vec![("1", MergeEqual(0)), ("2", MergeEqual(1)), ("1", MergeEqual(0))];
523 for fut in enqueue.push_all(requests.into_iter()) {
524 assert_eq!(fut.await, Err(Closed));
525 }
526 });
527 }
528
529 #[derive(Debug)]
530 struct TestRunningTask<C, O> {
531 unblocker: oneshot::Sender<O>,
532 context: C,
533 }
534
535 #[derive(Debug)]
536 struct TestRunningTasks<K, C, O>
537 where
538 K: Eq + Hash,
539 {
540 tasks: Arc<Mutex<HashMap<K, TestRunningTask<C, O>>>>,
541 }
542
543 impl<K, C, O> TestRunningTasks<K, C, O>
544 where
545 K: fmt::Debug + Eq + Hash + Sized + Clone,
546 C: fmt::Debug,
547 O: fmt::Debug,
548 {
549 fn new() -> Self {
550 Self { tasks: Arc::new(Mutex::new(HashMap::new())) }
551 }
552
553 fn resolve<Q>(&self, key: &Q, res: O) -> C
554 where
555 Q: Eq + Hash + ?Sized,
556 K: Borrow<Q>,
557 {
558 let task =
559 self.tasks.lock().remove(key.borrow()).expect("key to exist in running work");
560 task.unblocker.send(res).unwrap();
561 task.context
562 }
563
564 fn peek(&self) -> Option<K> {
565 self.tasks.lock().keys().next().cloned()
566 }
567
568 fn keys(&self) -> Vec<K> {
569 self.tasks.lock().keys().cloned().collect()
570 }
571
572 fn assert_empty(&self) {
573 assert_eq!(
574 self.tasks.lock().keys().collect::<Vec<&K>>(),
575 Vec::<&K>::new(),
576 "expect queue to be empty"
577 );
578 }
579 }
580
581 #[derive(Debug)]
582 struct TestQueueResults<K> {
583 done: Arc<Mutex<Vec<K>>>,
584 terminated: Arc<Mutex<bool>>,
585 }
586
587 impl<K> TestQueueResults<K> {
588 fn take(&self) -> Vec<K> {
589 std::mem::take(&mut *self.done.lock())
590 }
591
592 fn is_terminated(&self) -> bool {
593 *self.terminated.lock()
594 }
595 }
596
597 #[test]
598 fn check_works_with_sendable_types() {
599 struct TestWork;
600
601 impl Work<(), ()> for TestWork {
602 type Future = futures::future::Ready<()>;
603
604 fn start(&self, _key: (), _context: ()) -> Self::Future {
605 futures::future::ready(())
606 }
607 }
608
609 let (processor, enqueue) = work_queue(3, TestWork);
610
611 let tasks = FuturesUnordered::new();
612 tasks.push(enqueue.push((), ()));
613
614 drop(enqueue);
615
616 let mut executor = futures::executor::LocalPool::new();
617 let handle = executor
618 .spawner()
619 .spawn_with_handle(async move {
620 let (keys, res) =
621 futures::future::join(processor.collect::<Vec<_>>(), tasks.collect::<Vec<_>>())
622 .await;
623 assert_eq!(keys, vec![()]);
624 assert_eq!(res, vec![Ok(())]);
625 })
626 .expect("spawn to work");
627 let () = executor.run_until(handle);
628 }
629
630 #[test]
631 fn check_works_with_unsendable_types() {
632 use std::rc::Rc;
633
634 struct TestWork(#[expect(dead_code)] Rc<()>);
637 #[derive(Clone, Debug, PartialEq, Eq, Hash)]
638 struct TestKey(Rc<()>);
639 #[derive(PartialEq, Eq)]
640 struct TestContext(Rc<()>);
641 #[derive(Clone, Debug, PartialEq)]
642 struct TestOutput(Rc<()>);
643
644 impl Work<TestKey, TestContext> for TestWork {
645 type Future = futures::future::Ready<TestOutput>;
646
647 fn start(&self, _key: TestKey, _context: TestContext) -> Self::Future {
648 futures::future::ready(TestOutput(Rc::new(())))
649 }
650 }
651
652 impl TryMerge for TestContext {
653 fn try_merge(&mut self, _: Self) -> Result<(), Self> {
654 Ok(())
655 }
656 }
657
658 let (processor, enqueue) = work_queue(3, TestWork(Rc::new(())));
659
660 let tasks = FuturesUnordered::new();
661 tasks.push(enqueue.push(TestKey(Rc::new(())), TestContext(Rc::new(()))));
662
663 drop(enqueue);
664
665 let mut executor = futures::executor::LocalPool::new();
666 let handle = executor
667 .spawner()
668 .spawn_local_with_handle(async move {
669 let (keys, res) =
670 futures::future::join(processor.collect::<Vec<_>>(), tasks.collect::<Vec<_>>())
671 .await;
672 assert_eq!(keys, vec![TestKey(Rc::new(()))]);
673 assert_eq!(res, vec![Ok(TestOutput(Rc::new(())))]);
674 })
675 .expect("spawn to work");
676 let () = executor.run_until(handle);
677 }
678
679 fn spawn_test_work_queue<K, C, O>(
680 spawner: LocalSpawner,
681 concurrency: usize,
682 ) -> (WorkSender<K, C, O>, TestRunningTasks<K, C, O>, TestQueueResults<K>)
683 where
684 K: Send + Clone + fmt::Debug + Eq + Hash + 'static,
685 C: TryMerge + Send + fmt::Debug + 'static,
686 O: Send + Clone + fmt::Debug + 'static,
687 {
688 let running = TestRunningTasks::<K, C, O>::new();
689 let running_tasks = running.tasks.clone();
690 let do_work = move |key: K, context: C| {
691 let (sender, receiver) = oneshot::channel();
694 assert!(running_tasks
695 .lock()
696 .insert(key, TestRunningTask::<C, O> { unblocker: sender, context })
697 .is_none());
698 async move { receiver.await.unwrap() }
699 };
700
701 let (mut processor, enqueue) = work_queue(concurrency, do_work);
702 let done = Arc::new(Mutex::new(Vec::new()));
703 let terminated = Arc::new(Mutex::new(false));
704 let results =
705 TestQueueResults { done: Arc::clone(&done), terminated: Arc::clone(&terminated) };
706
707 spawner
708 .spawn_local(async move {
709 while let Some(res) = processor.next().await {
710 done.lock().push(res);
711 }
712 *terminated.lock() = true;
713 })
714 .expect("spawn to succeed");
715
716 (enqueue, running, results)
717 }
718
719 #[test]
720 fn processes_known_work_before_stalling() {
721 let mut executor = futures::executor::LocalPool::new();
722
723 let (enqueue, running, done) =
724 spawn_test_work_queue::<&str, (), usize>(executor.spawner(), 2);
725
726 let task_hello = enqueue.push("hello", ());
727 let task_world = enqueue.push("world!", ());
728 let task_test = enqueue.push("test", ());
729 executor.run_until_stalled();
730 assert_eq!(done.take(), Vec::<&str>::new());
731
732 running.resolve("hello", 5);
733 running.resolve("world!", 6);
734 running.assert_empty();
735 executor.run_until_stalled();
736 assert_eq!(done.take(), vec!["hello", "world!"]);
737
738 assert_eq!(executor.run_until(task_hello), Ok(5));
739 assert_eq!(executor.run_until(task_world), Ok(6));
740
741 running.resolve("test", 4);
742 assert_eq!(executor.run_until(task_test), Ok(4));
743 assert_eq!(done.take(), vec!["test"]);
744 }
745
746 #[test]
747 fn restarts_after_draining_input_queue() {
748 let mut executor = futures::executor::LocalPool::new();
749
750 let (enqueue, running, done) = spawn_test_work_queue::<&str, (), ()>(executor.spawner(), 2);
751
752 let task_a = enqueue.push("a", ());
754 let task_b = enqueue.push("b", ());
755 executor.run_until_stalled();
756 running.resolve("a", ());
757 running.resolve("b", ());
758 assert_eq!(executor.run_until(task_a), Ok(()));
759 assert_eq!(executor.run_until(task_b), Ok(()));
760 assert_eq!(done.take(), vec!["a", "b"]);
761
762 let task_c = enqueue.push("c", ());
765 executor.run_until_stalled();
766 running.resolve("c", ());
767 assert_eq!(executor.run_until(task_c), Ok(()));
768 assert_eq!(done.take(), vec!["c"]);
769
770 let task_a = enqueue.push("a", ());
773 let task_d = enqueue.push("d", ());
774 drop(enqueue);
775 executor.run_until_stalled();
776 assert!(!done.is_terminated());
777 assert_eq!(done.take(), Vec::<&str>::new());
778 running.resolve("a", ());
779 running.resolve("d", ());
780 assert_eq!(executor.run_until(task_a), Ok(()));
781 assert_eq!(executor.run_until(task_d), Ok(()));
782 assert_eq!(done.take(), vec!["a", "d"]);
783 assert!(done.is_terminated());
784 }
785
786 #[test]
787 fn push_all() {
788 let mut executor = futures::executor::LocalPool::new();
789
790 let (enqueue, running, done) =
791 spawn_test_work_queue::<&str, (), usize>(executor.spawner(), 2);
792
793 let mut futs =
794 enqueue.push_all(vec![("a", ()), ("b", ()), ("c", ()), ("b", ())].into_iter());
795 running.assert_empty();
796
797 executor.run_until_stalled();
798 running.resolve("a", 1);
799 running.resolve("b", 2);
800 running.assert_empty();
801 assert_eq!(executor.run_until(futs.next().unwrap()), Ok(1));
802 assert_eq!(executor.run_until(futs.next().unwrap()), Ok(2));
803
804 running.resolve("c", 3);
805 running.assert_empty();
806 assert_eq!(executor.run_until(futs.next().unwrap()), Ok(3));
807 assert_eq!(executor.run_until(futs.next().unwrap()), Ok(2));
808 assert!(futs.next().is_none());
809
810 assert_eq!(done.take(), vec!["a", "b", "c"]);
811 }
812
813 #[test]
814 fn handles_many_tasks() {
815 let mut executor = futures::executor::LocalPool::new();
816
817 let (enqueue, running, done) =
818 spawn_test_work_queue::<String, (), ()>(executor.spawner(), 5);
819
820 let mut tasks = FuturesUnordered::new();
821
822 for i in 0..10000 {
823 let key = format!("task_{i}");
824 tasks.push(enqueue.push(key, ()));
825 }
826
827 let task_dups = enqueue
829 .push_all((0..10000).filter(|i| i % 2 == 0).map(|i| {
830 let key = format!("task_{i}");
831 (key, ())
832 }))
833 .collect::<FuturesUnordered<_>>();
834
835 executor.run_until_stalled();
836
837 while let Some(key) = running.peek() {
838 running.resolve(&key, ());
839 assert_eq!(executor.run_until(tasks.next()), Some(Ok(())));
840 assert_eq!(done.take(), vec![key]);
841 }
842
843 assert_eq!(executor.run_until(task_dups.collect::<Vec<_>>()), vec![Ok(()); 5000]);
844 }
845
846 #[test]
847 fn dedups_compound_keys() {
848 let mut executor = futures::executor::LocalPool::new();
849
850 #[derive(Debug, Clone, PartialEq, Eq, Hash)]
851 struct Params<'a> {
852 key: &'a str,
853 options: &'a [&'a str],
854 }
855
856 let (enqueue, running, done) =
857 spawn_test_work_queue::<Params<'_>, (), &str>(executor.spawner(), 5);
858
859 let key_a = Params { key: "first", options: &[] };
860 let key_b = Params { key: "first", options: &["unique"] };
861 let task_a1 = enqueue.push(key_a.clone(), ());
862 let task_b = enqueue.push(key_b.clone(), ());
863 let task_a2 = enqueue.push(key_a.clone(), ());
864
865 executor.run_until_stalled();
866
867 running.resolve(&key_b, "first_unique");
868 executor.run_until_stalled();
869 assert_eq!(done.take(), vec![key_b]);
870 assert_eq!(executor.run_until(task_b), Ok("first_unique"));
871
872 running.resolve(&key_a, "first_no_options");
873 executor.run_until_stalled();
874 assert_eq!(done.take(), vec![key_a]);
875 assert_eq!(executor.run_until(task_a2), Ok("first_no_options"));
876 assert_eq!(executor.run_until(task_a1), Ok("first_no_options"));
877 }
878
879 #[test]
880 fn merges_context_of_pending_tasks() {
881 let mut executor = futures::executor::LocalPool::new();
882
883 #[derive(Default, Debug, PartialEq, Eq)]
884 struct MyContext(String);
885
886 impl TryMerge for MyContext {
887 fn try_merge(&mut self, other: Self) -> Result<(), Self> {
888 self.0.push_str(&other.0);
889 Ok(())
890 }
891 }
892
893 let (enqueue, running, done) =
894 spawn_test_work_queue::<&str, MyContext, ()>(executor.spawner(), 1);
895
896 let task_a = enqueue.push("dup", MyContext("a".into()));
897 let task_unique = enqueue.push("unique", MyContext("not-deduped".into()));
898 let task_b = enqueue.push("dup", MyContext("b".into()));
899 executor.run_until_stalled();
900 let task_c1 = enqueue.push("dup", MyContext("c".into()));
901 executor.run_until_stalled();
902
903 assert_eq!(running.resolve("dup", ()), MyContext("ab".into()));
905 assert_eq!(executor.run_until(task_a), Ok(()));
906 assert_eq!(executor.run_until(task_b), Ok(()));
907 assert_eq!(done.take(), vec!["dup"]);
908
909 assert_eq!(running.keys(), vec!["dup"]);
912 assert_eq!(running.resolve("dup", ()), MyContext("c".into()));
913 assert_eq!(executor.run_until(task_c1), Ok(()));
914 assert_eq!(done.take(), vec!["dup"]);
915
916 assert_eq!(running.resolve("unique", ()), MyContext("not-deduped".into()));
917 assert_eq!(executor.run_until(task_unique), Ok(()));
918 assert_eq!(done.take(), vec!["unique"]);
919 running.assert_empty();
920
921 let task_c2 = enqueue.push("dup", MyContext("c".into()));
923 executor.run_until_stalled();
924 assert_eq!(running.keys(), vec!["dup"]);
925 assert_eq!(running.resolve("dup", ()), MyContext("c".into()));
926 assert_eq!(executor.run_until(task_c2), Ok(()));
927 assert_eq!(done.take(), vec!["dup"]);
928 running.assert_empty();
929 }
930
931 #[fuchsia::test]
932 async fn inspect() {
933 let (sender, mut receiver) = futures::channel::mpsc::channel(0);
935 let do_work = move |_: String, _: MergeEqual| {
936 let mut sender = sender.clone();
937 async move {
938 let () = sender.send(()).await.unwrap();
939 let () = futures::future::pending().await;
940 Ok::<_, ()>(())
941 }
942 };
943 let (mut processor, enqueue) = work_queue(2, do_work);
944
945 let inspector = fuchsia_inspect::Inspector::default();
946 inspector.root().record_lazy_child("queue", processor.record_lazy_inspect());
947 fuchsia_async::Task::spawn(async move { while (processor.next().await).is_some() {} })
948 .detach();
949
950 diagnostics_assertions::assert_data_tree!(inspector, root: {
952 "queue": {}
953 });
954
955 let _a0 = enqueue.push("a".into(), MergeEqual(0));
957 let _b0 = enqueue.push("b".into(), MergeEqual(0));
958 let _a1 = enqueue.push("a".into(), MergeEqual(1));
959 let _c0 = enqueue.push("c".into(), MergeEqual(0));
960
961 let () = receiver.next().await.unwrap();
962 let () = receiver.next().await.unwrap();
963
964 diagnostics_assertions::assert_data_tree!(inspector, root: {
965 "queue": {
966 r#""a""#: {
967 "running": 1u64,
968 "pending": 1u64,
969 },
970 r#""b""#: {
971 "running": 1u64,
972 "pending": 0u64,
973 },
974 r#""c""#: {
975 "running": 0u64,
976 "pending": 1u64,
977 },
978 }
979 });
980 }
981}