fdf_channel/
channel.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Safe bindings for the driver runtime channel stable ABI
6
7use core::future::Future;
8use std::mem::ManuallyDrop;
9use std::sync::Arc;
10use zx::Status;
11
12use crate::arena::{Arena, ArenaBox};
13use crate::futures::{ReadMessageState, ReadMessageStateOp};
14use crate::message::Message;
15use fdf_core::dispatcher::OnDispatcher;
16use fdf_core::handle::{DriverHandle, MixedHandle};
17use fdf_sys::*;
18
19use core::marker::PhantomData;
20use core::mem::{MaybeUninit, size_of_val};
21use core::num::NonZero;
22use core::pin::Pin;
23use core::ptr::{NonNull, null_mut};
24use core::task::{Context, Poll};
25
26pub use fdf_sys::fdf_handle_t;
27
28/// Implements a message channel through the Fuchsia Driver Runtime
29#[derive(Debug)]
30pub struct Channel<T: ?Sized + 'static> {
31    // Note: if we're waiting on a callback we can't drop the handle until
32    // that callback has fired.
33    pub(crate) handle: ManuallyDrop<DriverHandle>,
34    pub(crate) wait_state: Option<Arc<ReadMessageStateOp>>,
35    _p: PhantomData<Message<T>>,
36}
37
38impl<T: ?Sized> Drop for Channel<T> {
39    fn drop(&mut self) {
40        let mut can_drop = true;
41
42        if let Some(current_wait) = &self.wait_state {
43            // channel_dropped() will return true if we can drop the handle ourselves.
44            // otherwise the channel should not be dropped until the callback is called.
45            can_drop = current_wait.set_channel_dropped();
46        }
47
48        if can_drop {
49            // SAFETY: If there's no current wait active, we are the only
50            // owner of the handle.
51            unsafe {
52                ManuallyDrop::drop(&mut self.handle);
53            }
54        };
55    }
56}
57
58impl<T: ?Sized + 'static> Channel<T> {
59    /// Creates a new channel pair that can be used to send messages of type `T`
60    /// between threads managed by the driver runtime.
61    pub fn create() -> (Self, Self) {
62        let mut channel1 = 0;
63        let mut channel2 = 0;
64        // This call cannot fail as the only reason it would fail is due to invalid
65        // option flags, and 0 is a valid option.
66        Status::ok(unsafe { fdf_channel_create(0, &mut channel1, &mut channel2) })
67            .expect("failed to create channel pair");
68        // SAFETY: if fdf_channel_create returned ZX_OK, it will have placed
69        // valid channel handles that must be non-zero.
70        unsafe {
71            (
72                Self::from_handle_unchecked(NonZero::new_unchecked(channel1)),
73                Self::from_handle_unchecked(NonZero::new_unchecked(channel2)),
74            )
75        }
76    }
77
78    /// Returns a reference to the inner handle of the channel.
79    pub fn driver_handle(&self) -> &DriverHandle {
80        &self.handle
81    }
82
83    /// Takes the inner handle to the channel. The caller is responsible for ensuring
84    /// that the handle is freed.
85    ///
86    /// # Panics
87    ///
88    /// This function will panic if the channel has previously had a read wait
89    /// registered on it.
90    pub fn into_driver_handle(self) -> DriverHandle {
91        assert!(
92            self.wait_state.is_none(),
93            "A read wait has been registered on this channel so it can't be destructured"
94        );
95
96        // SAFETY: We will be forgetting `self` after this, so we can safely
97        // take ownership of the raw handle for reconstituting into a `DriverHandle`
98        // object after.
99        let handle = unsafe { self.handle.get_raw() };
100
101        // we don't want to call drop here because we've taken the handle out of the
102        // object.
103        std::mem::forget(self);
104
105        // SAFETY: We just took this handle from the object we just forgot, so we
106        // are the only owner of it.
107        unsafe { DriverHandle::new_unchecked(handle) }
108    }
109
110    /// Initializes a [`Channel`] object from the given non-zero handle.
111    ///
112    /// # Safety
113    ///
114    /// The caller must ensure that the handle is not invalid and that it is
115    /// part of a driver runtime channel pair of type `T`.
116    unsafe fn from_handle_unchecked(handle: NonZero<fdf_handle_t>) -> Self {
117        // SAFETY: caller is responsible for ensuring that it is a valid channel
118        Self {
119            handle: ManuallyDrop::new(unsafe { DriverHandle::new_unchecked(handle) }),
120            wait_state: None,
121            _p: PhantomData,
122        }
123    }
124
125    /// Initializes a [`Channel`] object from the given [`DriverHandle`],
126    /// assuming that it is a channel of type `T`.
127    ///
128    /// # Safety
129    ///
130    /// The caller must ensure that the handle is a [`Channel`]-based handle that is
131    /// using type `T` as its wire format.
132    pub unsafe fn from_driver_handle(handle: DriverHandle) -> Self {
133        Self { handle: ManuallyDrop::new(handle), wait_state: None, _p: PhantomData }
134    }
135
136    /// Writes the [`Message`] given to the channel. This will complete asynchronously and can't
137    /// be cancelled.
138    ///
139    /// The channel will take ownership of the data and handles passed in,
140    pub fn write(&self, message: Message<T>) -> Result<(), Status> {
141        // get the sizes while the we still have refs to the data and handles
142        let data_len = message.data().map_or(0, |data| size_of_val(data) as u32);
143        let handles_count = message.handles().map_or(0, |handles| handles.len() as u32);
144
145        let (arena, data, handles) = message.into_raw();
146
147        // transform the `Option<NonNull<T>>` into just `*mut T`
148        let data_ptr = data.map_or(null_mut(), |data| data.cast().as_ptr());
149        let handles_ptr = handles.map_or(null_mut(), |handles| handles.cast().as_ptr());
150
151        // SAFETY:
152        // - Normally, we could be reading uninit bytes here. However, as long as fdf_channel_write
153        //   doesn't allow cross-LTO then it won't care whether the bytes are initialized.
154        // - The `Message` will generally only construct correctly if the data and handles pointers
155        //   inside it are from the arena it holds, but just in case `fdf_channel_write` will check
156        //   that we are using the correct arena so we do not need to re-verify that they are from
157        //   the same arena.
158        Status::ok(unsafe {
159            fdf_channel_write(
160                self.handle.get_raw().get(),
161                0,
162                arena.as_ptr(),
163                data_ptr,
164                data_len,
165                handles_ptr,
166                handles_count,
167            )
168        })?;
169
170        // SAFETY: this is the valid-by-contruction arena we were passed in through the [`Message`]
171        // object, and now that we have completed `fdf_channel_write` it is safe to drop our copy
172        // of it.
173        unsafe { fdf_arena_drop_ref(arena.as_ptr()) };
174        Ok(())
175    }
176
177    /// Shorthand for calling [`Self::write`] with the result of [`Message::new_with`]
178    pub fn write_with<F>(&self, arena: Arena, f: F) -> Result<(), Status>
179    where
180        F: for<'a> FnOnce(
181            &'a Arena,
182        )
183            -> (Option<ArenaBox<'a, T>>, Option<ArenaBox<'a, [Option<MixedHandle>]>>),
184    {
185        self.write(Message::new_with(arena, f))
186    }
187
188    /// Shorthand for calling [`Self::write`] with the result of [`Message::new_with`]
189    pub fn write_with_data<F>(&self, arena: Arena, f: F) -> Result<(), Status>
190    where
191        F: for<'a> FnOnce(&'a Arena) -> ArenaBox<'a, T>,
192    {
193        self.write(Message::new_with_data(arena, f))
194    }
195}
196
197/// Attempts to read from the channel, returning a [`Message`] object that can be used to
198/// access or take the data received if there was any. This is the basic building block
199/// on which the other `try_read_*` methods are built.
200pub(crate) fn try_read_raw(
201    channel: &DriverHandle,
202) -> Result<Option<Message<[MaybeUninit<u8>]>>, Status> {
203    let mut out_arena = null_mut();
204    let mut out_data = null_mut();
205    let mut out_num_bytes = 0;
206    let mut out_handles = null_mut();
207    let mut out_num_handles = 0;
208    Status::ok(unsafe {
209        fdf_channel_read(
210            channel.get_raw().get(),
211            0,
212            &mut out_arena,
213            &mut out_data,
214            &mut out_num_bytes,
215            &mut out_handles,
216            &mut out_num_handles,
217        )
218    })?;
219    // if no arena was returned, that means no data was returned.
220    if out_arena.is_null() {
221        return Ok(None);
222    }
223    // SAFETY: we just checked that the `out_arena` is non-null
224    let arena = Arena(unsafe { NonNull::new_unchecked(out_arena) });
225    let data_ptr = if !out_data.is_null() {
226        let ptr = core::ptr::slice_from_raw_parts_mut(out_data.cast(), out_num_bytes as usize);
227        // SAFETY: we just checked that the pointer was non-null, the slice version of it should
228        // be too.
229        Some(unsafe { ArenaBox::new(NonNull::new_unchecked(ptr)) })
230    } else {
231        None
232    };
233    let handles_ptr = if !out_handles.is_null() {
234        let ptr = core::ptr::slice_from_raw_parts_mut(out_handles.cast(), out_num_handles as usize);
235        // SAFETY: we just checked that the pointer was non-null, the slice version of it should
236        // be too.
237        Some(unsafe { ArenaBox::new(NonNull::new_unchecked(ptr)) })
238    } else {
239        None
240    };
241    Ok(Some(unsafe { Message::new_unchecked(arena, data_ptr, handles_ptr) }))
242}
243
244/// Reads a message from the channel asynchronously
245///
246/// # Panic
247///
248/// Panics if this is not run from a driver framework dispatcher.
249///
250/// # Safety
251///
252/// The caller is responsible for ensuring that the channel object's
253/// handle lifetime is longer than the returned future.
254pub(crate) unsafe fn read_raw<T: ?Sized, D>(
255    channel: &mut Channel<T>,
256    dispatcher: D,
257) -> ReadMessageRawFut<D> {
258    // SAFETY: The caller promises that the message state object can't outlive the handle.
259    let raw_fut = unsafe { ReadMessageState::register_read_wait(channel) };
260    ReadMessageRawFut { raw_fut, dispatcher }
261}
262
263impl<T> Channel<T> {
264    /// Attempts to read an object of type `T` and a handle set from the channel
265    pub fn try_read(&self) -> Result<Option<Message<T>>, Status> {
266        // read a message from the channel
267        let Some(message) = try_read_raw(&self.handle)? else {
268            return Ok(None);
269        };
270        // SAFETY: It is an invariant of Channel<T> that messages sent or received are always of
271        // type T.
272        Ok(Some(unsafe { message.cast_unchecked() }))
273    }
274
275    /// Reads an object of type `T` and a handle set from the channel asynchronously
276    pub async fn read<D: OnDispatcher + Unpin>(
277        &mut self,
278        dispatcher: D,
279    ) -> Result<Option<Message<T>>, Status> {
280        // SAFETY: By calling `read_raw` in an async context that holds this channel's lifetime open
281        // beyond the resolution of the future, we ensure that the channel handle outlives the
282        // future state object.
283        let Some(message) = unsafe { read_raw(self, dispatcher) }.await? else {
284            return Ok(None);
285        };
286        // SAFETY: It is an invariant of Channel<T> that messages sent or received are always of
287        // type T.
288        Ok(Some(unsafe { message.cast_unchecked() }))
289    }
290}
291
292impl Channel<[u8]> {
293    /// Attempts to read an object of type `T` and a handle set from the channel
294    pub fn try_read_bytes(&self) -> Result<Option<Message<[u8]>>, Status> {
295        // read a message from the channel
296        let Some(message) = try_read_raw(&self.handle)? else {
297            return Ok(None);
298        };
299        // SAFETY: It is an invariant of Channel<[u8]> that messages sent or received are always of
300        // type [u8].
301        Ok(Some(unsafe { message.assume_init() }))
302    }
303
304    /// Reads a slice of type `T` and a handle set from the channel asynchronously
305    pub async fn read_bytes<D: OnDispatcher + Unpin>(
306        &mut self,
307        dispatcher: D,
308    ) -> Result<Option<Message<[u8]>>, Status> {
309        // read a message from the channel
310        // SAFETY: By calling `read_raw` in an async context that holds this channel's lifetime open
311        // beyond the resolution of the future, we ensure that the channel handle outlives the
312        // future state object.
313        let Some(message) = unsafe { read_raw(self, dispatcher) }.await? else {
314            return Ok(None);
315        };
316        // SAFETY: It is an invariant of Channel<[u8]> that messages sent or received are always of
317        // type [u8].
318        Ok(Some(unsafe { message.assume_init() }))
319    }
320}
321
322impl<T> From<Channel<T>> for MixedHandle {
323    fn from(value: Channel<T>) -> Self {
324        MixedHandle::from(value.into_driver_handle())
325    }
326}
327
328impl<T: ?Sized> std::cmp::Ord for Channel<T> {
329    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
330        self.handle.cmp(&other.handle)
331    }
332}
333
334impl<T: ?Sized> std::cmp::PartialOrd for Channel<T> {
335    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
336        Some(self.cmp(other))
337    }
338}
339
340impl<T: ?Sized> std::cmp::PartialEq for Channel<T> {
341    fn eq(&self, other: &Self) -> bool {
342        self.handle.eq(&other.handle)
343    }
344}
345
346impl<T: ?Sized> std::cmp::Eq for Channel<T> {}
347
348impl<T: ?Sized> std::hash::Hash for Channel<T> {
349    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
350        self.handle.hash(state);
351    }
352}
353
354pub(crate) struct ReadMessageRawFut<D> {
355    pub(crate) raw_fut: ReadMessageState,
356    dispatcher: D,
357}
358
359impl<D: OnDispatcher + Unpin> Future for ReadMessageRawFut<D> {
360    type Output = Result<Option<Message<[MaybeUninit<u8>]>>, Status>;
361
362    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
363        let dispatcher = self.dispatcher.clone();
364        self.as_mut().raw_fut.poll_with_dispatcher(cx, dispatcher)
365    }
366}
367
368#[cfg(test)]
369mod tests {
370    use std::io::{Write, stdout};
371    use std::pin::pin;
372    use std::sync::atomic::{AtomicU64, Ordering};
373    use std::sync::{Arc, mpsc};
374
375    use fdf_core::dispatcher::{
376        AsyncDispatcher, CurrentDispatcher, Dispatcher, DispatcherBuilder, DispatcherRef,
377        OnDispatcher,
378    };
379    use fdf_core::handle::MixedHandleType;
380    use fdf_env::test::spawn_in_driver;
381    use futures::channel::oneshot;
382    use futures::poll;
383
384    use super::*;
385    use crate::test_utils::*;
386
387    #[test]
388    fn send_and_receive_bytes_synchronously() {
389        let (first, second) = Channel::create();
390        let arena = Arena::new();
391        assert_eq!(first.try_read_bytes().unwrap_err(), Status::SHOULD_WAIT);
392        first.write_with_data(arena.clone(), |arena| arena.insert_slice(&[1, 2, 3, 4])).unwrap();
393        assert_eq!(second.try_read_bytes().unwrap().unwrap().data().unwrap(), &[1, 2, 3, 4]);
394        assert_eq!(second.try_read_bytes().unwrap_err(), Status::SHOULD_WAIT);
395        second.write_with_data(arena.clone(), |arena| arena.insert_slice(&[5, 6, 7, 8])).unwrap();
396        assert_eq!(first.try_read_bytes().unwrap().unwrap().data().unwrap(), &[5, 6, 7, 8]);
397        assert_eq!(first.try_read_bytes().unwrap_err(), Status::SHOULD_WAIT);
398        assert_eq!(second.try_read_bytes().unwrap_err(), Status::SHOULD_WAIT);
399        drop(second);
400        assert_eq!(
401            first.write_with_data(arena.clone(), |arena| arena.insert_slice(&[9, 10, 11, 12])),
402            Err(Status::PEER_CLOSED)
403        );
404    }
405
406    #[test]
407    fn send_and_receive_bytes_asynchronously() {
408        spawn_in_driver("channel async", async {
409            let arena = Arena::new();
410            let (mut first, second) = Channel::create();
411
412            assert!(poll!(pin!(first.read_bytes(CurrentDispatcher))).is_pending());
413            second.write_with_data(arena, |arena| arena.insert_slice(&[1, 2, 3, 4])).unwrap();
414            assert_eq!(
415                first.read_bytes(CurrentDispatcher).await.unwrap().unwrap().data().unwrap(),
416                &[1, 2, 3, 4]
417            );
418        });
419    }
420
421    #[test]
422    fn send_and_receive_objects_synchronously() {
423        let arena = Arena::new();
424        let (first, second) = Channel::create();
425        let (tx, rx) = mpsc::channel();
426        first
427            .write_with_data(arena.clone(), |arena| arena.insert(DropSender::new(1, tx.clone())))
428            .unwrap();
429        rx.try_recv().expect_err("should not drop the object when sent");
430        let message = second.try_read().unwrap().unwrap();
431        assert_eq!(message.data().unwrap().0, 1);
432        rx.try_recv().expect_err("should not drop the object when received");
433        drop(message);
434        rx.try_recv().expect("dropped when received");
435    }
436
437    #[test]
438    fn send_and_receive_handles_synchronously() {
439        println!("Create channels and write one end of one of the channel pairs to the other");
440        let (first, second) = Channel::<()>::create();
441        let (inner_first, inner_second) = Channel::<String>::create();
442        let message = Message::new_with(Arena::new(), |arena| {
443            (None, Some(arena.insert_boxed_slice(Box::new([Some(inner_first.into())]))))
444        });
445        first.write(message).unwrap();
446
447        println!("Receive the channel back on the other end of the first channel pair.");
448        let mut arena = None;
449        let message =
450            second.try_read().unwrap().expect("Expected a message with contents to be received");
451        let (_, received_handles) = message.into_arena_boxes(&mut arena);
452        let mut first_handle_received =
453            ArenaBox::take_boxed_slice(received_handles.expect("expected handles in the message"));
454        let first_handle_received = first_handle_received
455            .first_mut()
456            .expect("expected one handle in the handle set")
457            .take()
458            .expect("expected the first handle to be non-null");
459        let first_handle_received = first_handle_received.resolve();
460        let MixedHandleType::Driver(driver_handle) = first_handle_received else {
461            panic!("Got a non-driver handle when we sent a driver handle");
462        };
463        let inner_first_received = unsafe { Channel::from_driver_handle(driver_handle) };
464
465        println!("Send and receive a string across the now-transmitted channel pair.");
466        inner_first_received
467            .write_with_data(Arena::new(), |arena| arena.insert("boom".to_string()))
468            .unwrap();
469        assert_eq!(inner_second.try_read().unwrap().unwrap().data().unwrap(), &"boom".to_string());
470    }
471
472    async fn ping(mut chan: Channel<u8>) {
473        println!("starting ping!");
474        chan.write_with_data(Arena::new(), |arena| arena.insert(0)).unwrap();
475        while let Ok(Some(msg)) = chan.read(CurrentDispatcher).await {
476            let next = *msg.data().unwrap();
477            println!("ping! {next}");
478            chan.write_with_data(msg.take_arena(), |arena| arena.insert(next + 1)).unwrap();
479        }
480    }
481
482    async fn pong(mut chan: Channel<u8>) {
483        println!("starting pong!");
484        while let Some(msg) = chan.read(CurrentDispatcher).await.unwrap() {
485            let next = *msg.data().unwrap();
486            println!("pong! {next}");
487            if next > 10 {
488                println!("bye!");
489                break;
490            }
491            chan.write_with_data(msg.take_arena(), |arena| arena.insert(next + 1)).unwrap();
492        }
493    }
494
495    #[test]
496    fn async_ping_pong() {
497        spawn_in_driver("async ping pong", async {
498            let (ping_chan, pong_chan) = Channel::create();
499            CurrentDispatcher.spawn_task(ping(ping_chan)).unwrap();
500            pong(pong_chan).await;
501        });
502    }
503
504    #[test]
505    fn async_ping_pong_on_fuchsia_async() {
506        spawn_in_driver("async ping pong", async {
507            let (ping_chan, pong_chan) = Channel::create();
508
509            let fdf_dispatcher = DispatcherBuilder::new()
510                .name("fdf-async")
511                .create()
512                .expect("failure creating non-blocking dispatcher for fdf operations on rust-async dispatcher")
513                .release();
514
515            let rust_async_dispatcher = DispatcherBuilder::new()
516                .name("fuchsia-async")
517                .allow_thread_blocking()
518                .create()
519                .expect("failure creating blocking dispatcher for rust async")
520                .release();
521
522            rust_async_dispatcher
523                .post_task_sync(move |_| {
524                    Dispatcher::override_current(fdf_dispatcher, || {
525                        let mut executor = fuchsia_async::LocalExecutor::default();
526                        executor.run_singlethreaded(ping(ping_chan));
527                    });
528                })
529                .unwrap();
530
531            pong(pong_chan).await
532        });
533    }
534
535    async fn recv_lots_of_bytes_with_cancellations(
536        mut rx: Channel<[u8]>,
537        fin_tx: oneshot::Sender<()>,
538        pending_count: Arc<AtomicU64>,
539    ) {
540        let mut immediate_count = 0;
541        let mut count = 0;
542        loop {
543            // try to read as fast as we can, but any time we get a pending drop the future
544            // and then re-try with a proper await so we re-read and get it. This tests
545            // the reliability of the channel read's drop cancellation.
546            let mut next_fut = Box::pin(rx.read_bytes(CurrentDispatcher));
547            let next = match futures::poll!(&mut next_fut) {
548                Poll::Pending => {
549                    pending_count.fetch_add(1, Ordering::Relaxed);
550                    drop(next_fut);
551                    rx.read_bytes(CurrentDispatcher).await
552                }
553                Poll::Ready(r) => {
554                    immediate_count += 1;
555                    r
556                }
557            };
558            match next {
559                Err(Status::PEER_CLOSED) | Ok(None) => break,
560                Err(_) => {
561                    next.unwrap();
562                }
563                Ok(Some(msg)) => {
564                    assert_eq!(msg.data().unwrap(), &[count as u8; 100]);
565                    count += 1;
566                }
567            }
568        }
569        println!("read total: {count}, immediate: {immediate_count}, pending: {pending_count:?}");
570        // send the channel out as well so that the cancellation can finish
571        fin_tx.send(()).unwrap();
572    }
573
574    async fn send_lots_of_bytes(
575        tx: Channel<[u8]>,
576        fin_rx: oneshot::Receiver<()>,
577        pending_count: Arc<AtomicU64>,
578    ) {
579        // The potential failure modes here are not entirely deterministic, so we want to
580        // make sure that we get enough runs through the danger path (a pending read that is
581        // dropped) so that we exercise it thoroughly. To that end, we will do up to 10,000
582        // writes but stop early if we have 500 pending events.
583        let arena = Arena::new();
584        print!("writing: ");
585        for i in 0..10000 {
586            tx.write_with_data(arena.clone(), |arena| arena.insert_slice(&[i as u8; 100])).unwrap();
587            // the following print and flush is not just aesthetic. It helps slow down the
588            // writes a bit so that the reader dispatcher is more likely to have to wait for
589            // further data.
590            print!(".");
591            stdout().flush().unwrap();
592            if pending_count.load(Ordering::Relaxed) > 500 {
593                break;
594            }
595        }
596        drop(tx);
597        fin_rx.await.unwrap();
598    }
599
600    async fn send_and_recv_lots_of_bytes_with_cancellations(dispatcher: DispatcherRef<'static>) {
601        let (tx, rx) = Channel::create();
602        let (fin_tx, fin_rx) = oneshot::channel();
603        let pending_count = Arc::new(AtomicU64::new(0));
604        dispatcher
605            .spawn_task(recv_lots_of_bytes_with_cancellations(rx, fin_tx, pending_count.clone()))
606            .unwrap();
607
608        send_lots_of_bytes(tx, fin_rx, pending_count).await;
609    }
610
611    #[test]
612    fn send_and_recv_lots_of_bytes_with_cancellations_on_synchronized_dispatcher() {
613        spawn_in_driver(
614            "lots of bytes and with some cancellations on a synchronized dispatcher",
615            async {
616                let dispatcher =
617                    DispatcherBuilder::new().name("fdf-synchronized").create().unwrap().release();
618
619                send_and_recv_lots_of_bytes_with_cancellations(dispatcher).await;
620            },
621        );
622    }
623
624    #[test]
625    fn send_and_recv_lots_of_bytes_with_cancellations_on_unsynchronized_dispatcher() {
626        spawn_in_driver(
627            "lots of bytes and with some cancellations on an unsynchronized dispatcher",
628            async {
629                let dispatcher = DispatcherBuilder::new()
630                    .name("fdf-unsynchronized")
631                    .unsynchronized()
632                    .create()
633                    .unwrap()
634                    .release();
635
636                send_and_recv_lots_of_bytes_with_cancellations(dispatcher).await;
637            },
638        );
639    }
640
641    #[test]
642    fn send_and_recv_lots_of_bytes_with_cancellations_on_fuchsia_async_dispatcher() {
643        spawn_in_driver(
644            "lots of bytes and with some cancellations on a fuchsia-async overridden dispatcher",
645            async {
646                let fdf_dispatcher = DispatcherBuilder::new()
647                    .name("fdf-async")
648                    .create()
649                    .expect("failure creating non-blocking dispatcher for fdf operations on rust-async dispatcher")
650                    .release();
651
652                let dispatcher = DispatcherBuilder::new()
653                    .name("fdf-fuchsia-async")
654                    .allow_thread_blocking()
655                    .create()
656                    .expect("failure creating blocking dispatcher for rust async")
657                    .release();
658
659                let (tx, rx) = Channel::create();
660                let (fin_tx, fin_rx) = oneshot::channel();
661                let pending_count = Arc::new(AtomicU64::new(0));
662
663                let pending_count_clone = pending_count.clone();
664                dispatcher
665                    .post_task_sync(move |_| {
666                        Dispatcher::override_current(fdf_dispatcher, || {
667                            let mut executor = fuchsia_async::LocalExecutor::default();
668                            executor.run_singlethreaded(recv_lots_of_bytes_with_cancellations(
669                                rx,
670                                fin_tx,
671                                pending_count_clone,
672                            ));
673                        });
674                    })
675                    .unwrap();
676
677                send_lots_of_bytes(tx, fin_rx, pending_count).await;
678            },
679        );
680    }
681}