fdf_core/
dispatcher.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Safe bindings for the driver runtime dispatcher stable ABI
6
7use fdf_sys::*;
8
9use core::cell::RefCell;
10use core::ffi;
11use core::marker::PhantomData;
12use core::mem::ManuallyDrop;
13use core::ptr::{NonNull, null_mut};
14use std::sync::atomic::{AtomicPtr, Ordering};
15use std::sync::{Arc, Weak};
16
17use zx::Status;
18
19use crate::shutdown_observer::ShutdownObserver;
20
21pub use fdf_sys::fdf_dispatcher_t;
22pub use libasync::{
23    AfterDeadline, AsyncDispatcher, AsyncDispatcherRef, JoinHandle, OnDispatcher, Task,
24};
25
26/// A marker trait for a function type that can be used as a shutdown observer for [`Dispatcher`].
27pub trait ShutdownObserverFn: FnOnce(DispatcherRef<'_>) + Send + 'static {}
28impl<T> ShutdownObserverFn for T where T: FnOnce(DispatcherRef<'_>) + Send + 'static {}
29
30/// A builder for [`Dispatcher`]s
31#[derive(Default)]
32pub struct DispatcherBuilder {
33    #[doc(hidden)]
34    pub options: u32,
35    #[doc(hidden)]
36    pub name: String,
37    #[doc(hidden)]
38    pub scheduler_role: String,
39    #[doc(hidden)]
40    pub shutdown_observer: Option<Box<dyn ShutdownObserverFn>>,
41}
42
43impl DispatcherBuilder {
44    /// See `FDF_DISPATCHER_OPTION_UNSYNCHRONIZED` in the C API
45    pub(crate) const UNSYNCHRONIZED: u32 = fdf_sys::FDF_DISPATCHER_OPTION_UNSYNCHRONIZED;
46    /// See `FDF_DISPATCHER_OPTION_ALLOW_SYNC_CALLS` in the C API
47    pub(crate) const ALLOW_THREAD_BLOCKING: u32 = fdf_sys::FDF_DISPATCHER_OPTION_ALLOW_SYNC_CALLS;
48
49    /// Creates a new [`DispatcherBuilder`] that can be used to configure a new dispatcher.
50    /// For more information on the threading-related flags for the dispatcher, see
51    /// https://fuchsia.dev/fuchsia-src/concepts/drivers/driver-dispatcher-and-threads
52    pub fn new() -> Self {
53        Self::default()
54    }
55
56    /// Sets whether parallel callbacks in the callbacks set in the dispatcher are allowed. May
57    /// not be set with [`Self::allow_thread_blocking`].
58    ///
59    /// See https://fuchsia.dev/fuchsia-src/concepts/drivers/driver-dispatcher-and-threads
60    /// for more information on the threading model of driver dispatchers.
61    pub fn unsynchronized(mut self) -> Self {
62        assert!(
63            !self.allows_thread_blocking(),
64            "you may not create an unsynchronized dispatcher that allows synchronous calls"
65        );
66        self.options |= Self::UNSYNCHRONIZED;
67        self
68    }
69
70    /// Whether or not this is an unsynchronized dispatcher
71    pub fn is_unsynchronized(&self) -> bool {
72        (self.options & Self::UNSYNCHRONIZED) == Self::UNSYNCHRONIZED
73    }
74
75    /// This dispatcher may not share zircon threads with other drivers. May not be set with
76    /// [`Self::unsynchronized`].
77    ///
78    /// See https://fuchsia.dev/fuchsia-src/concepts/drivers/driver-dispatcher-and-threads
79    /// for more information on the threading model of driver dispatchers.
80    pub fn allow_thread_blocking(mut self) -> Self {
81        assert!(
82            !self.is_unsynchronized(),
83            "you may not create an unsynchronized dispatcher that allows synchronous calls"
84        );
85        self.options |= Self::ALLOW_THREAD_BLOCKING;
86        self
87    }
88
89    /// Whether or not this dispatcher allows synchronous calls
90    pub fn allows_thread_blocking(&self) -> bool {
91        (self.options & Self::ALLOW_THREAD_BLOCKING) == Self::ALLOW_THREAD_BLOCKING
92    }
93
94    /// A descriptive name for this dispatcher that is used in debug output and process
95    /// lists.
96    pub fn name(mut self, name: &str) -> Self {
97        self.name = name.to_string();
98        self
99    }
100
101    /// A hint string for the runtime that may or may not impact the priority the work scheduled
102    /// by this dispatcher is handled at. It may or may not impact the ability for other drivers
103    /// to share zircon threads with the dispatcher.
104    pub fn scheduler_role(mut self, role: &str) -> Self {
105        self.scheduler_role = role.to_string();
106        self
107    }
108
109    /// A callback to be called before after the dispatcher has completed asynchronous shutdown.
110    pub fn shutdown_observer<F: ShutdownObserverFn>(mut self, shutdown_observer: F) -> Self {
111        self.shutdown_observer = Some(Box::new(shutdown_observer));
112        self
113    }
114
115    /// Create the dispatcher as configured by this object. This must be called from a
116    /// thread managed by the driver runtime. The dispatcher returned is owned by the caller,
117    /// and will initiate asynchronous shutdown when the object is dropped unless
118    /// [`Dispatcher::release`] is called on it to convert it into an unowned [`DispatcherRef`].
119    pub fn create(self) -> Result<Dispatcher, Status> {
120        let mut out_dispatcher = null_mut();
121        let options = self.options;
122        let name = self.name.as_ptr() as *mut ffi::c_char;
123        let name_len = self.name.len();
124        let scheduler_role = self.scheduler_role.as_ptr() as *mut ffi::c_char;
125        let scheduler_role_len = self.scheduler_role.len();
126        let observer =
127            ShutdownObserver::new(self.shutdown_observer.unwrap_or_else(|| Box::new(|_| {})))
128                .into_ptr();
129        // SAFETY: all arguments point to memory that will be available for the duration
130        // of the call, except `observer`, which will be available until it is unallocated
131        // by the dispatcher exit handler.
132        Status::ok(unsafe {
133            fdf_dispatcher_create(
134                options,
135                name,
136                name_len,
137                scheduler_role,
138                scheduler_role_len,
139                observer,
140                &mut out_dispatcher,
141            )
142        })?;
143        // SAFETY: `out_dispatcher` is valid by construction if `fdf_dispatcher_create` returns
144        // ZX_OK.
145        Ok(Dispatcher(unsafe { NonNull::new_unchecked(out_dispatcher) }))
146    }
147
148    /// As with [`Self::create`], this creates a new dispatcher as configured by this object, but
149    /// instead of returning an owned reference it immediately releases the reference to be
150    /// managed by the driver runtime.
151    pub fn create_released(self) -> Result<DispatcherRef<'static>, Status> {
152        self.create().map(Dispatcher::release)
153    }
154}
155
156/// An owned handle for a dispatcher managed by the driver runtime.
157#[derive(Debug)]
158pub struct Dispatcher(pub(crate) NonNull<fdf_dispatcher_t>);
159
160// SAFETY: The api of fdf_dispatcher_t is thread safe.
161unsafe impl Send for Dispatcher {}
162unsafe impl Sync for Dispatcher {}
163thread_local! {
164    pub(crate) static OVERRIDE_DISPATCHER: RefCell<Option<NonNull<fdf_dispatcher_t>>> = const { RefCell::new(None) };
165}
166
167impl Dispatcher {
168    /// Creates a dispatcher ref from a raw handle.
169    ///
170    /// # Safety
171    ///
172    /// Caller is responsible for ensuring that the given handle is valid and
173    /// not owned by any other wrapper that will free it at an arbitrary
174    /// time.
175    pub unsafe fn from_raw(handle: NonNull<fdf_dispatcher_t>) -> Self {
176        Self(handle)
177    }
178
179    fn get_raw_flags(&self) -> u32 {
180        // SAFETY: the inner fdf_dispatcher_t is valid by construction
181        unsafe { fdf_dispatcher_get_options(self.0.as_ptr()) }
182    }
183
184    /// Whether this dispatcher's tasks and futures can run on multiple threads at the same time.
185    pub fn is_unsynchronized(&self) -> bool {
186        (self.get_raw_flags() & DispatcherBuilder::UNSYNCHRONIZED) != 0
187    }
188
189    /// Whether this dispatcher is allowed to call blocking functions or not
190    pub fn allows_thread_blocking(&self) -> bool {
191        (self.get_raw_flags() & DispatcherBuilder::ALLOW_THREAD_BLOCKING) != 0
192    }
193
194    /// Whether this is the dispatcher the current thread is running on
195    pub fn is_current_dispatcher(&self) -> bool {
196        // SAFETY: we don't do anything with the dispatcher pointer, and NULL is returned if this
197        // isn't a dispatcher-managed thread.
198        self.0.as_ptr() == unsafe { fdf_dispatcher_get_current_dispatcher() }
199    }
200
201    /// Releases ownership over this dispatcher and returns a [`DispatcherRef`]
202    /// that can be used to access it. The lifetime of this reference is static because it will
203    /// exist so long as this current driver is loaded, but the driver runtime will shut it down
204    /// when the driver is unloaded.
205    pub fn release(self) -> DispatcherRef<'static> {
206        DispatcherRef(ManuallyDrop::new(self), PhantomData)
207    }
208
209    /// Returns a [`DispatcherRef`] that references this dispatcher with a lifetime constrained by
210    /// `self`.
211    pub fn as_dispatcher_ref(&self) -> DispatcherRef<'_> {
212        DispatcherRef(ManuallyDrop::new(Dispatcher(self.0)), PhantomData)
213    }
214}
215
216impl AsyncDispatcher for Dispatcher {
217    fn as_async_dispatcher_ref(&self) -> AsyncDispatcherRef<'_> {
218        let async_dispatcher =
219            NonNull::new(unsafe { fdf_dispatcher_get_async_dispatcher(self.0.as_ptr()) })
220                .expect("No async dispatcher on driver dispatcher");
221        unsafe { AsyncDispatcherRef::from_raw(async_dispatcher) }
222    }
223}
224
225impl Drop for Dispatcher {
226    fn drop(&mut self) {
227        // SAFETY: we only ever provide an owned `Dispatcher` to one owner, so when
228        // that one is dropped we can invoke the shutdown of the dispatcher
229        unsafe { fdf_dispatcher_shutdown_async(self.0.as_mut()) }
230    }
231}
232
233/// An owned reference to a driver runtime dispatcher that auto-releases when dropped. This gives
234/// you the best of both worlds of having an `Arc<Dispatcher>` and a `DispatcherRef<'static>`
235/// created by [`Dispatcher::release`]:
236///
237/// - You can vend [`Weak`]-like pointers to it that will not cause memory access errors if used
238///   after the dispatcher has shut down, like an [`Arc`].
239/// - You can tie its terminal lifetime to that of the driver itself.
240///
241/// This is particularly useful in tests.
242#[derive(Debug)]
243pub struct AutoReleaseDispatcher(Arc<AtomicPtr<fdf_dispatcher>>);
244
245impl AutoReleaseDispatcher {
246    /// Returns a weakened reference to this dispatcher. This weak reference will only be valid so
247    /// long as the [`AutoReleaseDispatcher`] object that spawned it is alive, after which it will
248    /// no longer be usable to spawn tasks on.
249    pub fn downgrade(&self) -> WeakDispatcher {
250        WeakDispatcher::from(self)
251    }
252}
253
254impl AsyncDispatcher for AutoReleaseDispatcher {
255    fn as_async_dispatcher_ref(&self) -> AsyncDispatcherRef<'_> {
256        let dispatcher = NonNull::new(self.0.load(Ordering::Relaxed))
257            .expect("tried to obtain async dispatcher after drop");
258        // SAFETY: the validity of this dispatcher is ensured by use of NonNull above and this
259        // object's exclusive ownership over the dispatcher while it's alive.
260        unsafe {
261            AsyncDispatcherRef::from_raw(
262                NonNull::new(fdf_dispatcher_get_async_dispatcher(dispatcher.as_ptr())).unwrap(),
263            )
264        }
265    }
266}
267
268impl From<Dispatcher> for AutoReleaseDispatcher {
269    fn from(dispatcher: Dispatcher) -> Self {
270        let dispatcher_ptr = dispatcher.release().0.0.as_ptr();
271        Self(Arc::new(AtomicPtr::new(dispatcher_ptr)))
272    }
273}
274
275impl Drop for AutoReleaseDispatcher {
276    fn drop(&mut self) {
277        // Store nullptr into the atomic so that any future attempts to obtain a strong reference
278        // through a WeakDispatcher will not successfully upgrade.
279        self.0.store(null_mut(), Ordering::Relaxed);
280        // We want to allow for any outstanding `on_dispatcher` calls to finish before returning
281        // from drop, so we're going to loop until the strong reference count goes down to zero,
282        // after which any future attempts to call `on_dispatcher` on a `WeakDispatcher` will fail.
283        while Arc::strong_count(&self.0) > 1 {
284            // This sleep is kind of gross, but it should happen extremely rarely and
285            // `on_dispatcher` calls should not be performing any blocking work.
286            std::thread::sleep(std::time::Duration::from_nanos(100))
287        }
288    }
289}
290
291/// An unowned but reference counted reference to a dispatcher. This would usually come from
292/// an [`AutoReleaseDispatcher`] reference to a dispatcher.
293///
294/// The advantage to using this instead of using [`Weak`] directly is that it controls the lifetime
295/// of any given strong reference to the dispatcher, since the only way to access that strong
296/// reference is through [`OnDispatcher::on_dispatcher`]. This makes it much easier to be sure
297/// that you aren't leaving any dangling strong references to the dispatcher object around.
298#[derive(Clone, Debug)]
299pub struct WeakDispatcher(Weak<AtomicPtr<fdf_dispatcher>>);
300
301impl From<&AutoReleaseDispatcher> for WeakDispatcher {
302    fn from(value: &AutoReleaseDispatcher) -> Self {
303        Self(Arc::downgrade(&value.0))
304    }
305}
306
307impl OnDispatcher for WeakDispatcher {
308    fn on_dispatcher<R>(&self, f: impl FnOnce(Option<AsyncDispatcherRef<'_>>) -> R) -> R {
309        let Some(dispatcher_ptr) = self.0.upgrade() else {
310            return f(None);
311        };
312        let Some(dispatcher) = NonNull::new(dispatcher_ptr.load(Ordering::Relaxed)) else {
313            return f(None);
314        };
315        // SAFETY: As long as we hold the strong reference in dispatcher_ptr, the
316        // AutoReleaseDispatcher will not allow its drop to finish and the dispatcher should still
317        // be valid.
318        f(Some(unsafe { DispatcherRef::from_raw(dispatcher) }.as_async_dispatcher_ref()))
319    }
320}
321
322/// An unowned reference to a driver runtime dispatcher such as is produced by calling
323/// [`Dispatcher::release`]. When this object goes out of scope it won't shut down the dispatcher,
324/// leaving that up to the driver runtime or another owner.
325#[derive(Debug)]
326pub struct DispatcherRef<'a>(ManuallyDrop<Dispatcher>, PhantomData<&'a Dispatcher>);
327
328impl<'a> DispatcherRef<'a> {
329    /// Creates a dispatcher ref from a raw handle.
330    ///
331    /// # Safety
332    ///
333    /// Caller is responsible for ensuring that the given handle is valid for
334    /// the lifetime `'a`.
335    pub unsafe fn from_raw(handle: NonNull<fdf_dispatcher_t>) -> Self {
336        // SAFETY: Caller promises the handle is valid.
337        Self(ManuallyDrop::new(unsafe { Dispatcher::from_raw(handle) }), PhantomData)
338    }
339
340    /// Creates a dispatcher ref from an [`AsyncDispatcherRef`].
341    ///
342    /// # Panics
343    ///
344    /// Note that this will cause an assert if the [`AsyncDispatcherRef`] was not created from a
345    /// driver dispatcher in the first place.
346    pub fn from_async_dispatcher(dispatcher: AsyncDispatcherRef<'a>) -> Self {
347        let handle = NonNull::new(unsafe {
348            fdf_dispatcher_downcast_async_dispatcher(dispatcher.inner().as_ptr())
349        })
350        .unwrap();
351        unsafe { Self::from_raw(handle) }
352    }
353
354    /// Gets the raw handle from this dispatcher ref.
355    ///
356    /// # Safety
357    ///
358    /// Caller is responsible for ensuring that the dispatcher handle is used safely.
359    pub unsafe fn as_raw(&mut self) -> *mut fdf_dispatcher_t {
360        unsafe { self.0.0.as_mut() }
361    }
362}
363
364impl<'a> AsyncDispatcher for DispatcherRef<'a> {
365    fn as_async_dispatcher_ref(&self) -> AsyncDispatcherRef<'_> {
366        self.0.as_async_dispatcher_ref()
367    }
368}
369
370impl<'a> Clone for DispatcherRef<'a> {
371    fn clone(&self) -> Self {
372        Self(ManuallyDrop::new(Dispatcher(self.0.0)), PhantomData)
373    }
374}
375
376impl<'a> core::ops::Deref for DispatcherRef<'a> {
377    type Target = Dispatcher;
378    fn deref(&self) -> &Self::Target {
379        &self.0
380    }
381}
382
383impl<'a> core::ops::DerefMut for DispatcherRef<'a> {
384    fn deref_mut(&mut self) -> &mut Self::Target {
385        &mut self.0
386    }
387}
388
389impl<'a> OnDispatcher for DispatcherRef<'a> {
390    fn on_dispatcher<R>(&self, f: impl FnOnce(Option<AsyncDispatcherRef<'_>>) -> R) -> R {
391        f(Some(self.as_async_dispatcher_ref()))
392    }
393}
394
395/// A placeholder for the currently active dispatcher. Use [`OnDispatcher::on_dispatcher`] to
396/// access it when needed.
397#[derive(Clone, Copy, Debug, Default, PartialEq)]
398pub struct CurrentDispatcher;
399
400impl OnDispatcher for CurrentDispatcher {
401    fn on_dispatcher<R>(&self, f: impl FnOnce(Option<AsyncDispatcherRef<'_>>) -> R) -> R {
402        let dispatcher = OVERRIDE_DISPATCHER
403            .with(|global| *global.borrow())
404            .or_else(|| {
405                // SAFETY: NonNull::new will null-check that we have a current dispatcher.
406                NonNull::new(unsafe { fdf_dispatcher_get_current_dispatcher() })
407            })
408            .map(|dispatcher| {
409                // SAFETY: We constrain the lifetime of the `DispatcherRef` we provide to the
410                // function below to the span of the current function. Since we are running on
411                // the dispatcher, or another dispatcher that is bound to the same lifetime (through
412                // override_dispatcher), we can be sure that the dispatcher will not be shut
413                // down before that function completes.
414                let async_dispatcher = NonNull::new(unsafe {
415                    fdf_dispatcher_get_async_dispatcher(dispatcher.as_ptr())
416                })
417                .expect("No async dispatcher on driver dispatcher");
418                unsafe { AsyncDispatcherRef::from_raw(async_dispatcher) }
419            });
420        f(dispatcher)
421    }
422}
423
424#[cfg(test)]
425mod tests {
426    use super::*;
427
428    use std::sync::{Arc, Once, Weak, mpsc};
429
430    use futures::channel::mpsc as async_mpsc;
431    use futures::{SinkExt, StreamExt};
432    use zx::sys::ZX_OK;
433
434    use core::ffi::{c_char, c_void};
435    use core::ptr::null_mut;
436
437    static GLOBAL_DRIVER_ENV: Once = Once::new();
438
439    pub fn ensure_driver_env() {
440        GLOBAL_DRIVER_ENV.call_once(|| {
441            // SAFETY: calling fdf_env_start, which does not have any soundness
442            // concerns for rust code, and this is only used in tests.
443            unsafe {
444                assert_eq!(fdf_env_start(0), ZX_OK);
445            }
446        });
447    }
448    pub fn with_raw_dispatcher<T>(name: &str, p: impl for<'a> FnOnce(Weak<Dispatcher>) -> T) -> T {
449        with_raw_dispatcher_flags(name, DispatcherBuilder::ALLOW_THREAD_BLOCKING, p)
450    }
451
452    pub(crate) fn with_raw_dispatcher_flags<T>(
453        name: &str,
454        flags: u32,
455        p: impl for<'a> FnOnce(Weak<Dispatcher>) -> T,
456    ) -> T {
457        ensure_driver_env();
458
459        let (shutdown_tx, shutdown_rx) = mpsc::channel();
460        let mut dispatcher = null_mut();
461        let mut observer = ShutdownObserver::new(move |dispatcher| {
462            // SAFETY: we verify that the dispatcher has no tasks left queued in it,
463            // just because this is testing code.
464            assert!(!unsafe { fdf_env_dispatcher_has_queued_tasks(dispatcher.0.0.as_ptr()) });
465            shutdown_tx.send(()).unwrap();
466        })
467        .into_ptr();
468        let driver_ptr = &mut observer as *mut _ as *mut c_void;
469        // SAFETY: The pointers we pass to this function are all stable for the
470        // duration of this function, and are not available to copy or clone to
471        // client code (only through a ref to the non-`Clone`` `Dispatcher`
472        // wrapper).
473        let res = unsafe {
474            fdf_env_dispatcher_create_with_owner(
475                driver_ptr,
476                flags,
477                name.as_ptr() as *const c_char,
478                name.len(),
479                "".as_ptr() as *const c_char,
480                0_usize,
481                observer,
482                &mut dispatcher,
483            )
484        };
485        assert_eq!(res, ZX_OK);
486        let dispatcher = Arc::new(Dispatcher(NonNull::new(dispatcher).unwrap()));
487
488        let res = p(Arc::downgrade(&dispatcher));
489
490        // this initiates the dispatcher shutdown on a driver runtime
491        // thread. When all tasks on the dispatcher have completed, the wait
492        // on the shutdown_rx below will end and we can tear it down.
493        let weak_dispatcher = Arc::downgrade(&dispatcher);
494        drop(dispatcher);
495        shutdown_rx.recv().unwrap();
496        assert_eq!(
497            0,
498            weak_dispatcher.strong_count(),
499            "a dispatcher reference escaped the test body"
500        );
501
502        res
503    }
504
505    #[test]
506    fn start_test_dispatcher() {
507        with_raw_dispatcher("testing", |dispatcher| {
508            println!("hello {dispatcher:?}");
509        })
510    }
511
512    #[test]
513    fn post_task_on_dispatcher() {
514        with_raw_dispatcher("testing task", |dispatcher| {
515            let (tx, rx) = mpsc::channel();
516            let dispatcher = Weak::upgrade(&dispatcher).unwrap();
517            dispatcher
518                .post_task_sync(move |status| {
519                    assert_eq!(status, Status::from_raw(ZX_OK));
520                    tx.send(status).unwrap();
521                })
522                .unwrap();
523            assert_eq!(rx.recv().unwrap(), Status::from_raw(ZX_OK));
524        });
525    }
526
527    #[test]
528    fn post_task_on_subdispatcher() {
529        let (shutdown_tx, shutdown_rx) = mpsc::channel();
530        with_raw_dispatcher("testing task top level", move |dispatcher| {
531            let (tx, rx) = mpsc::channel();
532            let (inner_tx, inner_rx) = mpsc::channel();
533            let dispatcher = Weak::upgrade(&dispatcher).unwrap();
534            dispatcher
535                .post_task_sync(move |status| {
536                    assert_eq!(status, Status::from_raw(ZX_OK));
537                    let inner = DispatcherBuilder::new()
538                        .name("testing task second level")
539                        .scheduler_role("")
540                        .allow_thread_blocking()
541                        .shutdown_observer(move |_dispatcher| {
542                            println!("shutdown observer called");
543                            shutdown_tx.send(1).unwrap();
544                        })
545                        .create()
546                        .unwrap();
547                    inner
548                        .post_task_sync(move |status| {
549                            assert_eq!(status, Status::from_raw(ZX_OK));
550                            tx.send(status).unwrap();
551                        })
552                        .unwrap();
553                    // we want to make sure the inner dispatcher lives long
554                    // enough to run the task, so we sent it out to the outer
555                    // closure.
556                    inner_tx.send(inner).unwrap();
557                })
558                .unwrap();
559            assert_eq!(rx.recv().unwrap(), Status::from_raw(ZX_OK));
560            inner_rx.recv().unwrap();
561        });
562        assert_eq!(shutdown_rx.recv().unwrap(), 1);
563    }
564
565    async fn ping(mut tx: async_mpsc::Sender<u8>, mut rx: async_mpsc::Receiver<u8>) {
566        println!("starting ping!");
567        tx.send(0).await.unwrap();
568        while let Some(next) = rx.next().await {
569            println!("ping! {next}");
570            tx.send(next + 1).await.unwrap();
571        }
572    }
573
574    async fn pong(
575        fin_tx: std::sync::mpsc::Sender<()>,
576        mut tx: async_mpsc::Sender<u8>,
577        mut rx: async_mpsc::Receiver<u8>,
578    ) {
579        println!("starting pong!");
580        while let Some(next) = rx.next().await {
581            println!("pong! {next}");
582            if next > 10 {
583                println!("bye!");
584                break;
585            }
586            tx.send(next + 1).await.unwrap();
587        }
588        fin_tx.send(()).unwrap();
589    }
590
591    #[test]
592    fn async_ping_pong() {
593        with_raw_dispatcher("async ping pong", |dispatcher| {
594            let (fin_tx, fin_rx) = mpsc::channel();
595            let (ping_tx, pong_rx) = async_mpsc::channel(10);
596            let (pong_tx, ping_rx) = async_mpsc::channel(10);
597            dispatcher.spawn(ping(ping_tx, ping_rx)).unwrap();
598            dispatcher.spawn(pong(fin_tx, pong_tx, pong_rx)).unwrap();
599
600            fin_rx.recv().expect("to receive final value");
601        });
602    }
603
604    async fn slow_pong(
605        fin_tx: std::sync::mpsc::Sender<()>,
606        mut tx: async_mpsc::Sender<u8>,
607        mut rx: async_mpsc::Receiver<u8>,
608    ) {
609        use zx::MonotonicDuration;
610        println!("starting pong!");
611        while let Some(next) = rx.next().await {
612            println!("pong! {next}");
613            fuchsia_async::Timer::new(fuchsia_async::MonotonicInstant::after(
614                MonotonicDuration::from_seconds(1),
615            ))
616            .await;
617            if next > 10 {
618                println!("bye!");
619                break;
620            }
621            tx.send(next + 1).await.unwrap();
622        }
623        fin_tx.send(()).unwrap();
624    }
625
626    #[test]
627    fn mixed_executor_async_ping_pong() {
628        with_raw_dispatcher("async ping pong", |dispatcher| {
629            let (fin_tx, fin_rx) = mpsc::channel();
630            let (ping_tx, pong_rx) = async_mpsc::channel(10);
631            let (pong_tx, ping_rx) = async_mpsc::channel(10);
632
633            // spawn ping on the driver dispatcher
634            dispatcher.spawn(ping(ping_tx, ping_rx)).unwrap();
635
636            // and run pong on the fuchsia_async executor
637            let mut executor = fuchsia_async::LocalExecutor::default();
638            executor.run_singlethreaded(slow_pong(fin_tx, pong_tx, pong_rx));
639
640            fin_rx.recv().expect("to receive final value");
641        });
642    }
643}