1#![deny(missing_docs)]
6#![allow(clippy::type_complexity)]
7#![allow(clippy::let_unit_value)]
8
9use fuchsia_sync::Mutex;
12use futures::channel::mpsc;
13use futures::future::{BoxFuture, Shared};
14use futures::prelude::*;
15use futures::ready;
16use futures::stream::{FusedStream, FuturesUnordered};
17use pin_project::pin_project;
18use std::collections::HashMap;
19use std::hash::Hash;
20use std::pin::Pin;
21use std::sync::{Arc, Weak};
22use std::task::{Context, Poll};
23use thiserror::Error;
24
25mod state;
26use state::{TaskFuture, TaskVariants, make_canceled_receiver};
27
28#[derive(Debug, PartialEq, Eq, Clone, Error)]
30#[error("The queue was dropped before processing this task")]
31pub struct Closed;
32
33pub trait TryMerge: Eq + Sized {
39 fn try_merge(&mut self, other: Self) -> Result<(), Self>;
45}
46
47impl TryMerge for () {
48 fn try_merge(&mut self, _: ()) -> Result<(), Self> {
49 Ok(())
50 }
51}
52
53pub fn work_queue<W, K, C>(
95 concurrency: usize,
96 work_fn: W,
97) -> (WorkQueue<W, K, C>, WorkSender<K, C, <W::Future as Future>::Output>)
98where
99 W: Work<K, C>,
100 K: Clone + Eq + Hash,
101 C: TryMerge,
102{
103 let tasks = Arc::new(Mutex::new(HashMap::new()));
104 let (sender, receiver) = mpsc::unbounded();
105 let sender = WorkSender { sender, tasks: Arc::downgrade(&tasks) };
106 (
107 WorkQueue {
108 work_fn,
109 concurrency,
110 pending: receiver,
111 tasks,
112 running: FuturesUnordered::new(),
113 },
114 sender,
115 )
116}
117
118pub trait Work<K, C> {
120 type Future: Future;
122
123 fn start(&self, key: K, context: C) -> Self::Future;
125}
126
127impl<F, K, C, WF> Work<K, C> for F
128where
129 F: Fn(K, C) -> WF,
130 WF: Future,
131{
132 type Future = WF;
133
134 fn start(&self, key: K, context: C) -> Self::Future {
135 (self)(key, context)
136 }
137}
138
139#[pin_project]
145pub struct WorkQueue<W, K, C>
146where
147 W: Work<K, C>,
148{
149 work_fn: W,
151 concurrency: usize,
153 tasks: Arc<Mutex<HashMap<K, TaskVariants<C, <W::Future as Future>::Output>>>>,
156
157 #[pin]
159 pending: mpsc::UnboundedReceiver<K>,
160 #[pin]
162 running: FuturesUnordered<RunningTask<K, W::Future>>,
163}
164
165impl<W, K, C> WorkQueue<W, K, C>
166where
167 W: Work<K, C>,
168 K: Clone + Eq + Hash,
169{
170 pub fn into_future(self) -> impl Future<Output = ()> {
173 self.map(|_res| ()).collect::<()>()
174 }
175
176 fn find_work(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
182 let mut this = self.project();
183
184 if this.pending.is_terminated() {
186 return Poll::Ready(None);
187 }
188
189 let mut found = false;
190 while this.running.len() < *this.concurrency {
191 match ready!(this.pending.as_mut().poll_next(cx)) {
192 None => break,
193 Some(key) => {
194 found = true;
195
196 let context = this
198 .tasks
199 .lock()
200 .get_mut(&key)
201 .expect("map entry to exist if in pending queue")
202 .start();
203
204 let work = this.work_fn.start(key.clone(), context);
208 let fut = futures::future::join(futures::future::ready(key), work);
209
210 this.running.push(fut);
211 }
212 }
213 }
214 if found { Poll::Ready(Some(())) } else { Poll::Pending }
215 }
216
217 fn do_work(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<K>> {
218 let mut this = self.project();
219
220 match this.running.as_mut().poll_next(cx) {
221 Poll::Pending => Poll::Pending,
222 Poll::Ready(None) => {
223 if this.pending.is_terminated() { Poll::Ready(None) } else { Poll::Pending }
230 }
231 Poll::Ready(Some((key, res))) => {
232 let mut tasks = this.tasks.lock();
233 let infos: &mut TaskVariants<_, _> =
234 tasks.get_mut(&key).expect("key to exist in map if not resolved");
235
236 if let Some(next_context) = infos.done(res) {
237 let work = this.work_fn.start(key.clone(), next_context);
239 let key_clone = key.clone();
240 let fut = futures::future::join(futures::future::ready(key_clone), work);
241
242 drop(tasks);
243 this.running.push(fut);
244 } else {
245 tasks.remove(&key);
247 }
248
249 Poll::Ready(Some(key))
252 }
253 }
254 }
255}
256
257impl<W, K, C> WorkQueue<W, K, C>
258where
259 W: Work<K, C>,
260 <<W as Work<K, C>>::Future as futures::Future>::Output: Send + Sync + 'static,
261 K: std::fmt::Debug + Send + 'static,
262 C: Send + 'static,
263{
264 pub fn record_lazy_inspect(
268 &self,
269 ) -> impl Fn() -> BoxFuture<'static, Result<fuchsia_inspect::Inspector, anyhow::Error>>
270 + Send
271 + Sync
272 + 'static {
273 let tasks = Arc::downgrade(&self.tasks);
274 move || {
275 let tasks = tasks.clone();
276 async move {
277 let inspector = fuchsia_inspect::Inspector::default();
278 if let Some(tasks) = tasks.upgrade() {
279 let tasks = {
281 tasks
282 .lock()
283 .iter()
284 .map(|(k, v)| (format!("{k:?}"), (v.running(), v.pending())))
285 .collect::<Vec<_>>()
286 };
287 let root = inspector.root();
288 for (k, (running, pending)) in tasks {
289 root.record_child(k, |n| {
290 n.record_uint("running", running as u64);
291 n.record_uint("pending", pending as u64);
292 })
293 }
294 }
295 Ok(inspector)
296 }
297 .boxed()
298 }
299 }
300}
301
302impl<W, K, C> Stream for WorkQueue<W, K, C>
303where
304 W: Work<K, C>,
305 K: Clone + Eq + Hash,
306{
307 type Item = K;
308 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
309 match (self.as_mut().find_work(cx), self.as_mut().do_work(cx)) {
310 (Poll::Ready(None), Poll::Ready(None)) => {
311 Poll::Ready(None)
314 }
315 (Poll::Ready(Some(())), Poll::Ready(Some(res))) => {
316 let _ = self.as_mut().find_work(cx);
320 Poll::Ready(Some(res))
321 }
322 (_not_ready_none, Poll::Ready(None)) => {
323 Poll::Pending
326 }
327 (_, poll) => poll,
328 }
329 }
330}
331
332type RunningTask<K, WF> = futures::future::Join<futures::future::Ready<K>, WF>;
333
334pub struct WorkSender<K, C, O> {
337 sender: mpsc::UnboundedSender<K>,
338 tasks: Weak<Mutex<HashMap<K, TaskVariants<C, O>>>>,
341}
342
343impl<K, C, O> Clone for WorkSender<K, C, O> {
344 fn clone(&self) -> Self {
345 Self { sender: self.sender.clone(), tasks: self.tasks.clone() }
346 }
347}
348
349impl<K, C, O> std::fmt::Debug for WorkSender<K, C, O> {
350 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
351 f.debug_struct("WorkSender").finish()
352 }
353}
354
355impl<K, C, O> WorkSender<K, C, O>
356where
357 K: Clone + Eq + Hash,
358 C: TryMerge,
359 O: Clone,
360{
361 pub fn push(&self, key: K, context: C) -> impl Future<Output = Result<O, Closed>> {
364 let tasks = match self.tasks.upgrade() {
365 Some(tasks) => tasks,
366 None => {
367 return make_canceled_receiver();
369 }
370 };
371 let mut tasks = tasks.lock();
372
373 Self::push_entry(&mut *tasks, &self.sender, key, context)
374 }
375
376 pub fn push_all(
383 &self,
384 entries: impl Iterator<Item = (K, C)>,
385 ) -> impl Iterator<Item = impl Future<Output = Result<O, Closed>>> {
386 let mut tasks = self.tasks.upgrade();
387 let mut tasks = tasks.as_mut().map(|tasks| tasks.lock());
388
389 entries
390 .map(move |(key, context)| {
391 if let Some(ref mut tasks) = tasks {
392 Self::push_entry(&mut *tasks, &self.sender, key, context)
393 } else {
394 make_canceled_receiver()
396 }
397 })
398 .collect::<Vec<_>>()
399 .into_iter()
400 }
401
402 fn push_entry(
403 tasks: &mut HashMap<K, TaskVariants<C, O>>,
404 self_sender: &mpsc::UnboundedSender<K>,
405 key: K,
406 context: C,
407 ) -> Shared<TaskFuture<O>> {
408 use std::collections::hash_map::Entry;
409
410 match tasks.entry(key.clone()) {
411 Entry::Vacant(entry) => {
412 if let Ok(()) = self_sender.unbounded_send(key) {
415 let (infos, fut) = TaskVariants::new(context);
416 entry.insert(infos);
417 fut
418 } else {
419 make_canceled_receiver()
421 }
422 }
423 Entry::Occupied(entry) => entry.into_mut().push(context),
424 }
425 }
426}
427
428#[cfg(test)]
429mod tests {
430 use super::*;
431 use futures::channel::oneshot;
432 use futures::executor::{LocalSpawner, block_on};
433 use futures::task::{LocalSpawnExt, SpawnExt};
434 use std::borrow::Borrow;
435 use std::fmt;
436
437 #[test]
438 fn basic_usage() {
439 async fn do_work(_key: String, _context: ()) -> Result<(), ()> {
440 Ok(())
441 }
442
443 let (processor, enqueue) = work_queue(3, do_work);
444
445 let tasks = FuturesUnordered::new();
446
447 tasks.push(enqueue.push("a".into(), ()));
448 tasks.push(enqueue.push("a".into(), ()));
449 tasks.push(enqueue.push("b".into(), ()));
450 tasks.push(enqueue.push("a".into(), ()));
451 tasks.push(enqueue.push("c".into(), ()));
452
453 drop(enqueue);
454
455 block_on(async move {
456 let (keys, res) = futures::future::join(
457 processor.collect::<Vec<String>>(),
458 tasks.collect::<Vec<Result<Result<(), ()>, _>>>(),
459 )
460 .await;
461 assert_eq!(keys, vec!["a".to_string(), "b".into(), "c".into()]);
462 assert_eq!(res, vec![Ok(Ok(())), Ok(Ok(())), Ok(Ok(())), Ok(Ok(())), Ok(Ok(()))]);
463 });
464 }
465
466 #[test]
467 fn into_future() {
468 async fn nop(key: i32, _context: ()) -> i32 {
469 key
470 }
471 let (processor, enqueue) = work_queue(1, nop);
472
473 let res_fut =
474 future::join3(processor.into_future(), enqueue.push(1, ()), enqueue.push(2, ()));
475 drop(enqueue);
476
477 let res = block_on(res_fut);
478 assert_eq!(res, ((), Ok(1), Ok(2)));
479 }
480
481 #[derive(Debug, PartialEq, Eq)]
482 pub(crate) struct MergeEqual(pub(crate) i32);
483
484 impl TryMerge for MergeEqual {
485 fn try_merge(&mut self, other: Self) -> Result<(), Self> {
486 if self.0 == other.0 { Ok(()) } else { Err(other) }
487 }
488 }
489
490 #[test]
491 fn dropping_queue_fails_requests() {
492 async fn do_work(_key: &str, _context: MergeEqual) -> Result<(), ()> {
493 Ok(())
494 }
495
496 let (processor, enqueue) = work_queue(1, do_work);
497
498 let fut_early_a = enqueue.push("a", MergeEqual(0));
499 let fut_early_b = enqueue.push("a", MergeEqual(1));
500 let fut_early_c = enqueue.push("a", MergeEqual(0));
501 drop(processor);
502 let fut_late = enqueue.push("b", MergeEqual(0));
503
504 block_on(async move {
505 assert_eq!(fut_early_a.await, Err(Closed));
506 assert_eq!(fut_early_b.await, Err(Closed));
507 assert_eq!(fut_early_c.await, Err(Closed));
508 assert_eq!(fut_late.await, Err(Closed));
509
510 let requests = vec![("1", MergeEqual(0)), ("2", MergeEqual(1)), ("1", MergeEqual(0))];
511 for fut in enqueue.push_all(requests.into_iter()) {
512 assert_eq!(fut.await, Err(Closed));
513 }
514 });
515 }
516
517 #[derive(Debug)]
518 struct TestRunningTask<C, O> {
519 unblocker: oneshot::Sender<O>,
520 context: C,
521 }
522
523 #[derive(Debug)]
524 struct TestRunningTasks<K, C, O>
525 where
526 K: Eq + Hash,
527 {
528 tasks: Arc<Mutex<HashMap<K, TestRunningTask<C, O>>>>,
529 }
530
531 impl<K, C, O> TestRunningTasks<K, C, O>
532 where
533 K: fmt::Debug + Eq + Hash + Sized + Clone,
534 C: fmt::Debug,
535 O: fmt::Debug,
536 {
537 fn new() -> Self {
538 Self { tasks: Arc::new(Mutex::new(HashMap::new())) }
539 }
540
541 fn resolve<Q>(&self, key: &Q, res: O) -> C
542 where
543 Q: Eq + Hash + ?Sized,
544 K: Borrow<Q>,
545 {
546 let task =
547 self.tasks.lock().remove(key.borrow()).expect("key to exist in running work");
548 task.unblocker.send(res).unwrap();
549 task.context
550 }
551
552 fn peek(&self) -> Option<K> {
553 self.tasks.lock().keys().next().cloned()
554 }
555
556 fn keys(&self) -> Vec<K> {
557 self.tasks.lock().keys().cloned().collect()
558 }
559
560 fn assert_empty(&self) {
561 assert_eq!(
562 self.tasks.lock().keys().collect::<Vec<&K>>(),
563 Vec::<&K>::new(),
564 "expect queue to be empty"
565 );
566 }
567 }
568
569 #[derive(Debug)]
570 struct TestQueueResults<K> {
571 done: Arc<Mutex<Vec<K>>>,
572 terminated: Arc<Mutex<bool>>,
573 }
574
575 impl<K> TestQueueResults<K> {
576 fn take(&self) -> Vec<K> {
577 std::mem::take(&mut *self.done.lock())
578 }
579
580 fn is_terminated(&self) -> bool {
581 *self.terminated.lock()
582 }
583 }
584
585 #[test]
586 fn check_works_with_sendable_types() {
587 struct TestWork;
588
589 impl Work<(), ()> for TestWork {
590 type Future = futures::future::Ready<()>;
591
592 fn start(&self, _key: (), _context: ()) -> Self::Future {
593 futures::future::ready(())
594 }
595 }
596
597 let (processor, enqueue) = work_queue(3, TestWork);
598
599 let tasks = FuturesUnordered::new();
600 tasks.push(enqueue.push((), ()));
601
602 drop(enqueue);
603
604 let mut executor = futures::executor::LocalPool::new();
605 let handle = executor
606 .spawner()
607 .spawn_with_handle(async move {
608 let (keys, res) =
609 futures::future::join(processor.collect::<Vec<_>>(), tasks.collect::<Vec<_>>())
610 .await;
611 assert_eq!(keys, vec![()]);
612 assert_eq!(res, vec![Ok(())]);
613 })
614 .expect("spawn to work");
615 let () = executor.run_until(handle);
616 }
617
618 #[test]
619 fn check_works_with_unsendable_types() {
620 use std::rc::Rc;
621
622 struct TestWork(#[expect(dead_code)] Rc<()>);
625 #[derive(Clone, Debug, PartialEq, Eq, Hash)]
626 struct TestKey(Rc<()>);
627 #[derive(PartialEq, Eq)]
628 struct TestContext(Rc<()>);
629 #[derive(Clone, Debug, PartialEq)]
630 struct TestOutput(Rc<()>);
631
632 impl Work<TestKey, TestContext> for TestWork {
633 type Future = futures::future::Ready<TestOutput>;
634
635 fn start(&self, _key: TestKey, _context: TestContext) -> Self::Future {
636 futures::future::ready(TestOutput(Rc::new(())))
637 }
638 }
639
640 impl TryMerge for TestContext {
641 fn try_merge(&mut self, _: Self) -> Result<(), Self> {
642 Ok(())
643 }
644 }
645
646 let (processor, enqueue) = work_queue(3, TestWork(Rc::new(())));
647
648 let tasks = FuturesUnordered::new();
649 tasks.push(enqueue.push(TestKey(Rc::new(())), TestContext(Rc::new(()))));
650
651 drop(enqueue);
652
653 let mut executor = futures::executor::LocalPool::new();
654 let handle = executor
655 .spawner()
656 .spawn_local_with_handle(async move {
657 let (keys, res) =
658 futures::future::join(processor.collect::<Vec<_>>(), tasks.collect::<Vec<_>>())
659 .await;
660 assert_eq!(keys, vec![TestKey(Rc::new(()))]);
661 assert_eq!(res, vec![Ok(TestOutput(Rc::new(())))]);
662 })
663 .expect("spawn to work");
664 let () = executor.run_until(handle);
665 }
666
667 fn spawn_test_work_queue<K, C, O>(
668 spawner: LocalSpawner,
669 concurrency: usize,
670 ) -> (WorkSender<K, C, O>, TestRunningTasks<K, C, O>, TestQueueResults<K>)
671 where
672 K: Send + Clone + fmt::Debug + Eq + Hash + 'static,
673 C: TryMerge + Send + fmt::Debug + 'static,
674 O: Send + Clone + fmt::Debug + 'static,
675 {
676 let running = TestRunningTasks::<K, C, O>::new();
677 let running_tasks = running.tasks.clone();
678 let do_work = move |key: K, context: C| {
679 let (sender, receiver) = oneshot::channel();
682 assert!(
683 running_tasks
684 .lock()
685 .insert(key, TestRunningTask::<C, O> { unblocker: sender, context })
686 .is_none()
687 );
688 async move { receiver.await.unwrap() }
689 };
690
691 let (mut processor, enqueue) = work_queue(concurrency, do_work);
692 let done = Arc::new(Mutex::new(Vec::new()));
693 let terminated = Arc::new(Mutex::new(false));
694 let results =
695 TestQueueResults { done: Arc::clone(&done), terminated: Arc::clone(&terminated) };
696
697 spawner
698 .spawn_local(async move {
699 while let Some(res) = processor.next().await {
700 done.lock().push(res);
701 }
702 *terminated.lock() = true;
703 })
704 .expect("spawn to succeed");
705
706 (enqueue, running, results)
707 }
708
709 #[test]
710 fn processes_known_work_before_stalling() {
711 let mut executor = futures::executor::LocalPool::new();
712
713 let (enqueue, running, done) =
714 spawn_test_work_queue::<&str, (), usize>(executor.spawner(), 2);
715
716 let task_hello = enqueue.push("hello", ());
717 let task_world = enqueue.push("world!", ());
718 let task_test = enqueue.push("test", ());
719 executor.run_until_stalled();
720 assert_eq!(done.take(), Vec::<&str>::new());
721
722 running.resolve("hello", 5);
723 running.resolve("world!", 6);
724 running.assert_empty();
725 executor.run_until_stalled();
726 assert_eq!(done.take(), vec!["hello", "world!"]);
727
728 assert_eq!(executor.run_until(task_hello), Ok(5));
729 assert_eq!(executor.run_until(task_world), Ok(6));
730
731 running.resolve("test", 4);
732 assert_eq!(executor.run_until(task_test), Ok(4));
733 assert_eq!(done.take(), vec!["test"]);
734 }
735
736 #[test]
737 fn restarts_after_draining_input_queue() {
738 let mut executor = futures::executor::LocalPool::new();
739
740 let (enqueue, running, done) = spawn_test_work_queue::<&str, (), ()>(executor.spawner(), 2);
741
742 let task_a = enqueue.push("a", ());
744 let task_b = enqueue.push("b", ());
745 executor.run_until_stalled();
746 running.resolve("a", ());
747 running.resolve("b", ());
748 assert_eq!(executor.run_until(task_a), Ok(()));
749 assert_eq!(executor.run_until(task_b), Ok(()));
750 assert_eq!(done.take(), vec!["a", "b"]);
751
752 let task_c = enqueue.push("c", ());
755 executor.run_until_stalled();
756 running.resolve("c", ());
757 assert_eq!(executor.run_until(task_c), Ok(()));
758 assert_eq!(done.take(), vec!["c"]);
759
760 let task_a = enqueue.push("a", ());
763 let task_d = enqueue.push("d", ());
764 drop(enqueue);
765 executor.run_until_stalled();
766 assert!(!done.is_terminated());
767 assert_eq!(done.take(), Vec::<&str>::new());
768 running.resolve("a", ());
769 running.resolve("d", ());
770 assert_eq!(executor.run_until(task_a), Ok(()));
771 assert_eq!(executor.run_until(task_d), Ok(()));
772 assert_eq!(done.take(), vec!["a", "d"]);
773 assert!(done.is_terminated());
774 }
775
776 #[test]
777 fn push_all() {
778 let mut executor = futures::executor::LocalPool::new();
779
780 let (enqueue, running, done) =
781 spawn_test_work_queue::<&str, (), usize>(executor.spawner(), 2);
782
783 let mut futs =
784 enqueue.push_all(vec![("a", ()), ("b", ()), ("c", ()), ("b", ())].into_iter());
785 running.assert_empty();
786
787 executor.run_until_stalled();
788 running.resolve("a", 1);
789 running.resolve("b", 2);
790 running.assert_empty();
791 assert_eq!(executor.run_until(futs.next().unwrap()), Ok(1));
792 assert_eq!(executor.run_until(futs.next().unwrap()), Ok(2));
793
794 running.resolve("c", 3);
795 running.assert_empty();
796 assert_eq!(executor.run_until(futs.next().unwrap()), Ok(3));
797 assert_eq!(executor.run_until(futs.next().unwrap()), Ok(2));
798 assert!(futs.next().is_none());
799
800 assert_eq!(done.take(), vec!["a", "b", "c"]);
801 }
802
803 #[test]
804 fn handles_many_tasks() {
805 let mut executor = futures::executor::LocalPool::new();
806
807 let (enqueue, running, done) =
808 spawn_test_work_queue::<String, (), ()>(executor.spawner(), 5);
809
810 let mut tasks = FuturesUnordered::new();
811
812 for i in 0..10000 {
813 let key = format!("task_{i}");
814 tasks.push(enqueue.push(key, ()));
815 }
816
817 let task_dups = enqueue
819 .push_all((0..10000).filter(|i| i % 2 == 0).map(|i| {
820 let key = format!("task_{i}");
821 (key, ())
822 }))
823 .collect::<FuturesUnordered<_>>();
824
825 executor.run_until_stalled();
826
827 while let Some(key) = running.peek() {
828 running.resolve(&key, ());
829 assert_eq!(executor.run_until(tasks.next()), Some(Ok(())));
830 assert_eq!(done.take(), vec![key]);
831 }
832
833 assert_eq!(executor.run_until(task_dups.collect::<Vec<_>>()), vec![Ok(()); 5000]);
834 }
835
836 #[test]
837 fn dedups_compound_keys() {
838 let mut executor = futures::executor::LocalPool::new();
839
840 #[derive(Debug, Clone, PartialEq, Eq, Hash)]
841 struct Params<'a> {
842 key: &'a str,
843 options: &'a [&'a str],
844 }
845
846 let (enqueue, running, done) =
847 spawn_test_work_queue::<Params<'_>, (), &str>(executor.spawner(), 5);
848
849 let key_a = Params { key: "first", options: &[] };
850 let key_b = Params { key: "first", options: &["unique"] };
851 let task_a1 = enqueue.push(key_a.clone(), ());
852 let task_b = enqueue.push(key_b.clone(), ());
853 let task_a2 = enqueue.push(key_a.clone(), ());
854
855 executor.run_until_stalled();
856
857 running.resolve(&key_b, "first_unique");
858 executor.run_until_stalled();
859 assert_eq!(done.take(), vec![key_b]);
860 assert_eq!(executor.run_until(task_b), Ok("first_unique"));
861
862 running.resolve(&key_a, "first_no_options");
863 executor.run_until_stalled();
864 assert_eq!(done.take(), vec![key_a]);
865 assert_eq!(executor.run_until(task_a2), Ok("first_no_options"));
866 assert_eq!(executor.run_until(task_a1), Ok("first_no_options"));
867 }
868
869 #[test]
870 fn merges_context_of_pending_tasks() {
871 let mut executor = futures::executor::LocalPool::new();
872
873 #[derive(Default, Debug, PartialEq, Eq)]
874 struct MyContext(String);
875
876 impl TryMerge for MyContext {
877 fn try_merge(&mut self, other: Self) -> Result<(), Self> {
878 self.0.push_str(&other.0);
879 Ok(())
880 }
881 }
882
883 let (enqueue, running, done) =
884 spawn_test_work_queue::<&str, MyContext, ()>(executor.spawner(), 1);
885
886 let task_a = enqueue.push("dup", MyContext("a".into()));
887 let task_unique = enqueue.push("unique", MyContext("not-deduped".into()));
888 let task_b = enqueue.push("dup", MyContext("b".into()));
889 executor.run_until_stalled();
890 let task_c1 = enqueue.push("dup", MyContext("c".into()));
891 executor.run_until_stalled();
892
893 assert_eq!(running.resolve("dup", ()), MyContext("ab".into()));
895 assert_eq!(executor.run_until(task_a), Ok(()));
896 assert_eq!(executor.run_until(task_b), Ok(()));
897 assert_eq!(done.take(), vec!["dup"]);
898
899 assert_eq!(running.keys(), vec!["dup"]);
902 assert_eq!(running.resolve("dup", ()), MyContext("c".into()));
903 assert_eq!(executor.run_until(task_c1), Ok(()));
904 assert_eq!(done.take(), vec!["dup"]);
905
906 assert_eq!(running.resolve("unique", ()), MyContext("not-deduped".into()));
907 assert_eq!(executor.run_until(task_unique), Ok(()));
908 assert_eq!(done.take(), vec!["unique"]);
909 running.assert_empty();
910
911 let task_c2 = enqueue.push("dup", MyContext("c".into()));
913 executor.run_until_stalled();
914 assert_eq!(running.keys(), vec!["dup"]);
915 assert_eq!(running.resolve("dup", ()), MyContext("c".into()));
916 assert_eq!(executor.run_until(task_c2), Ok(()));
917 assert_eq!(done.take(), vec!["dup"]);
918 running.assert_empty();
919 }
920
921 #[fuchsia::test]
922 async fn inspect() {
923 let (sender, mut receiver) = futures::channel::mpsc::channel(0);
925 let do_work = move |_: String, _: MergeEqual| {
926 let mut sender = sender.clone();
927 async move {
928 let () = sender.send(()).await.unwrap();
929 let () = futures::future::pending().await;
930 Ok::<_, ()>(())
931 }
932 };
933 let (mut processor, enqueue) = work_queue(2, do_work);
934
935 let inspector = fuchsia_inspect::Inspector::default();
936 inspector.root().record_lazy_child("queue", processor.record_lazy_inspect());
937 fuchsia_async::Task::spawn(async move { while (processor.next().await).is_some() {} })
938 .detach();
939
940 diagnostics_assertions::assert_data_tree!(inspector, root: {
942 "queue": {}
943 });
944
945 let _a0 = enqueue.push("a".into(), MergeEqual(0));
947 let _b0 = enqueue.push("b".into(), MergeEqual(0));
948 let _a1 = enqueue.push("a".into(), MergeEqual(1));
949 let _c0 = enqueue.push("c".into(), MergeEqual(0));
950
951 let () = receiver.next().await.unwrap();
952 let () = receiver.next().await.unwrap();
953
954 diagnostics_assertions::assert_data_tree!(inspector, root: {
955 "queue": {
956 r#""a""#: {
957 "running": 1u64,
958 "pending": 1u64,
959 },
960 r#""b""#: {
961 "running": 1u64,
962 "pending": 0u64,
963 },
964 r#""c""#: {
965 "running": 0u64,
966 "pending": 1u64,
967 },
968 }
969 });
970 }
971}