1use core::future::Future;
8use std::mem::ManuallyDrop;
9use std::sync::Arc;
10use zx::Status;
11
12use crate::arena::{Arena, ArenaBox};
13use crate::futures::{ReadMessageState, ReadMessageStateOp};
14use crate::message::Message;
15use fdf_core::dispatcher::OnDispatcher;
16use fdf_core::handle::{DriverHandle, MixedHandle};
17use fdf_sys::*;
18
19use core::marker::PhantomData;
20use core::mem::{MaybeUninit, size_of_val};
21use core::num::NonZero;
22use core::pin::Pin;
23use core::ptr::{NonNull, null_mut};
24use core::task::{Context, Poll};
25
26pub use fdf_sys::fdf_handle_t;
27
28#[derive(Debug)]
30pub struct Channel<T: ?Sized + 'static> {
31 pub(crate) handle: ManuallyDrop<DriverHandle>,
34 pub(crate) wait_state: Option<Arc<ReadMessageStateOp>>,
35 _p: PhantomData<Message<T>>,
36}
37
38impl<T: ?Sized> Drop for Channel<T> {
39 fn drop(&mut self) {
40 let mut can_drop = true;
41
42 if let Some(current_wait) = &self.wait_state {
43 can_drop = current_wait.set_channel_dropped();
46 }
47
48 if can_drop {
49 unsafe {
52 ManuallyDrop::drop(&mut self.handle);
53 }
54 };
55 }
56}
57
58impl<T: ?Sized + 'static> Channel<T> {
59 pub fn create() -> (Self, Self) {
62 let mut channel1 = 0;
63 let mut channel2 = 0;
64 Status::ok(unsafe { fdf_channel_create(0, &mut channel1, &mut channel2) })
67 .expect("failed to create channel pair");
68 unsafe {
71 (
72 Self::from_handle_unchecked(NonZero::new_unchecked(channel1)),
73 Self::from_handle_unchecked(NonZero::new_unchecked(channel2)),
74 )
75 }
76 }
77
78 pub fn driver_handle(&self) -> &DriverHandle {
80 &self.handle
81 }
82
83 pub fn into_driver_handle(self) -> DriverHandle {
91 assert!(
92 self.wait_state.is_none(),
93 "A read wait has been registered on this channel so it can't be destructured"
94 );
95
96 let handle = unsafe { self.handle.get_raw() };
100
101 std::mem::forget(self);
104
105 unsafe { DriverHandle::new_unchecked(handle) }
108 }
109
110 unsafe fn from_handle_unchecked(handle: NonZero<fdf_handle_t>) -> Self {
117 Self {
119 handle: ManuallyDrop::new(unsafe { DriverHandle::new_unchecked(handle) }),
120 wait_state: None,
121 _p: PhantomData,
122 }
123 }
124
125 pub unsafe fn from_driver_handle(handle: DriverHandle) -> Self {
133 Self { handle: ManuallyDrop::new(handle), wait_state: None, _p: PhantomData }
134 }
135
136 pub fn write(&self, message: Message<T>) -> Result<(), Status> {
141 let data_len = message.data().map_or(0, |data| size_of_val(data) as u32);
143 let handles_count = message.handles().map_or(0, |handles| handles.len() as u32);
144
145 let (arena, data, handles) = message.into_raw();
146
147 let data_ptr = data.map_or(null_mut(), |data| data.cast().as_ptr());
149 let handles_ptr = handles.map_or(null_mut(), |handles| handles.cast().as_ptr());
150
151 Status::ok(unsafe {
159 fdf_channel_write(
160 self.handle.get_raw().get(),
161 0,
162 arena.as_ptr(),
163 data_ptr,
164 data_len,
165 handles_ptr,
166 handles_count,
167 )
168 })?;
169
170 unsafe { fdf_arena_drop_ref(arena.as_ptr()) };
174 Ok(())
175 }
176
177 pub fn write_with<F>(&self, arena: Arena, f: F) -> Result<(), Status>
179 where
180 F: for<'a> FnOnce(
181 &'a Arena,
182 )
183 -> (Option<ArenaBox<'a, T>>, Option<ArenaBox<'a, [Option<MixedHandle>]>>),
184 {
185 self.write(Message::new_with(arena, f))
186 }
187
188 pub fn write_with_data<F>(&self, arena: Arena, f: F) -> Result<(), Status>
190 where
191 F: for<'a> FnOnce(&'a Arena) -> ArenaBox<'a, T>,
192 {
193 self.write(Message::new_with_data(arena, f))
194 }
195}
196
197pub(crate) fn try_read_raw(
201 channel: &DriverHandle,
202) -> Result<Option<Message<[MaybeUninit<u8>]>>, Status> {
203 let mut out_arena = null_mut();
204 let mut out_data = null_mut();
205 let mut out_num_bytes = 0;
206 let mut out_handles = null_mut();
207 let mut out_num_handles = 0;
208 Status::ok(unsafe {
209 fdf_channel_read(
210 channel.get_raw().get(),
211 0,
212 &mut out_arena,
213 &mut out_data,
214 &mut out_num_bytes,
215 &mut out_handles,
216 &mut out_num_handles,
217 )
218 })?;
219 if out_arena.is_null() {
221 return Ok(None);
222 }
223 let arena = Arena(unsafe { NonNull::new_unchecked(out_arena) });
225 let data_ptr = if !out_data.is_null() {
226 let ptr = core::ptr::slice_from_raw_parts_mut(out_data.cast(), out_num_bytes as usize);
227 Some(unsafe { ArenaBox::new(NonNull::new_unchecked(ptr)) })
230 } else {
231 None
232 };
233 let handles_ptr = if !out_handles.is_null() {
234 let ptr = core::ptr::slice_from_raw_parts_mut(out_handles.cast(), out_num_handles as usize);
235 Some(unsafe { ArenaBox::new(NonNull::new_unchecked(ptr)) })
238 } else {
239 None
240 };
241 Ok(Some(unsafe { Message::new_unchecked(arena, data_ptr, handles_ptr) }))
242}
243
244pub(crate) unsafe fn read_raw<T: ?Sized, D>(
255 channel: &mut Channel<T>,
256 dispatcher: D,
257) -> ReadMessageRawFut<D> {
258 let raw_fut = unsafe { ReadMessageState::register_read_wait(channel) };
260 ReadMessageRawFut { raw_fut, dispatcher }
261}
262
263impl<T> Channel<T> {
264 pub fn try_read(&self) -> Result<Option<Message<T>>, Status> {
266 let Some(message) = try_read_raw(&self.handle)? else {
268 return Ok(None);
269 };
270 Ok(Some(unsafe { message.cast_unchecked() }))
273 }
274
275 pub async fn read<D: OnDispatcher + Unpin>(
277 &mut self,
278 dispatcher: D,
279 ) -> Result<Option<Message<T>>, Status> {
280 let Some(message) = unsafe { read_raw(self, dispatcher) }.await? else {
284 return Ok(None);
285 };
286 Ok(Some(unsafe { message.cast_unchecked() }))
289 }
290}
291
292impl Channel<[u8]> {
293 pub fn try_read_bytes(&self) -> Result<Option<Message<[u8]>>, Status> {
295 let Some(message) = try_read_raw(&self.handle)? else {
297 return Ok(None);
298 };
299 Ok(Some(unsafe { message.assume_init() }))
302 }
303
304 pub async fn read_bytes<D: OnDispatcher + Unpin>(
306 &mut self,
307 dispatcher: D,
308 ) -> Result<Option<Message<[u8]>>, Status> {
309 let Some(message) = unsafe { read_raw(self, dispatcher) }.await? else {
314 return Ok(None);
315 };
316 Ok(Some(unsafe { message.assume_init() }))
319 }
320}
321
322impl<T> From<Channel<T>> for MixedHandle {
323 fn from(value: Channel<T>) -> Self {
324 MixedHandle::from(value.into_driver_handle())
325 }
326}
327
328impl<T: ?Sized> std::cmp::Ord for Channel<T> {
329 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
330 self.handle.cmp(&other.handle)
331 }
332}
333
334impl<T: ?Sized> std::cmp::PartialOrd for Channel<T> {
335 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
336 Some(self.cmp(other))
337 }
338}
339
340impl<T: ?Sized> std::cmp::PartialEq for Channel<T> {
341 fn eq(&self, other: &Self) -> bool {
342 self.handle.eq(&other.handle)
343 }
344}
345
346impl<T: ?Sized> std::cmp::Eq for Channel<T> {}
347
348impl<T: ?Sized> std::hash::Hash for Channel<T> {
349 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
350 self.handle.hash(state);
351 }
352}
353
354pub(crate) struct ReadMessageRawFut<D> {
355 pub(crate) raw_fut: ReadMessageState,
356 dispatcher: D,
357}
358
359impl<D: OnDispatcher + Unpin> Future for ReadMessageRawFut<D> {
360 type Output = Result<Option<Message<[MaybeUninit<u8>]>>, Status>;
361
362 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
363 let dispatcher = self.dispatcher.clone();
364 self.as_mut().raw_fut.poll_with_dispatcher(cx, dispatcher)
365 }
366}
367
368#[cfg(test)]
369mod tests {
370 use std::io::{Write, stdout};
371 use std::pin::pin;
372 use std::sync::atomic::{AtomicU64, Ordering};
373 use std::sync::{Arc, mpsc};
374
375 use fdf_core::dispatcher::{
376 AsyncDispatcher, CurrentDispatcher, Dispatcher, DispatcherBuilder, DispatcherRef,
377 OnDispatcher,
378 };
379 use fdf_core::handle::MixedHandleType;
380 use fdf_env::test::spawn_in_driver;
381 use futures::channel::oneshot;
382 use futures::poll;
383
384 use super::*;
385 use crate::test_utils::*;
386
387 #[test]
388 fn send_and_receive_bytes_synchronously() {
389 let (first, second) = Channel::create();
390 let arena = Arena::new();
391 assert_eq!(first.try_read_bytes().unwrap_err(), Status::SHOULD_WAIT);
392 first.write_with_data(arena.clone(), |arena| arena.insert_slice(&[1, 2, 3, 4])).unwrap();
393 assert_eq!(second.try_read_bytes().unwrap().unwrap().data().unwrap(), &[1, 2, 3, 4]);
394 assert_eq!(second.try_read_bytes().unwrap_err(), Status::SHOULD_WAIT);
395 second.write_with_data(arena.clone(), |arena| arena.insert_slice(&[5, 6, 7, 8])).unwrap();
396 assert_eq!(first.try_read_bytes().unwrap().unwrap().data().unwrap(), &[5, 6, 7, 8]);
397 assert_eq!(first.try_read_bytes().unwrap_err(), Status::SHOULD_WAIT);
398 assert_eq!(second.try_read_bytes().unwrap_err(), Status::SHOULD_WAIT);
399 drop(second);
400 assert_eq!(
401 first.write_with_data(arena.clone(), |arena| arena.insert_slice(&[9, 10, 11, 12])),
402 Err(Status::PEER_CLOSED)
403 );
404 }
405
406 #[test]
407 fn send_and_receive_bytes_asynchronously() {
408 spawn_in_driver("channel async", async {
409 let arena = Arena::new();
410 let (mut first, second) = Channel::create();
411
412 assert!(poll!(pin!(first.read_bytes(CurrentDispatcher))).is_pending());
413 second.write_with_data(arena, |arena| arena.insert_slice(&[1, 2, 3, 4])).unwrap();
414 assert_eq!(
415 first.read_bytes(CurrentDispatcher).await.unwrap().unwrap().data().unwrap(),
416 &[1, 2, 3, 4]
417 );
418 });
419 }
420
421 #[test]
422 fn send_and_receive_objects_synchronously() {
423 let arena = Arena::new();
424 let (first, second) = Channel::create();
425 let (tx, rx) = mpsc::channel();
426 first
427 .write_with_data(arena.clone(), |arena| arena.insert(DropSender::new(1, tx.clone())))
428 .unwrap();
429 rx.try_recv().expect_err("should not drop the object when sent");
430 let message = second.try_read().unwrap().unwrap();
431 assert_eq!(message.data().unwrap().0, 1);
432 rx.try_recv().expect_err("should not drop the object when received");
433 drop(message);
434 rx.try_recv().expect("dropped when received");
435 }
436
437 #[test]
438 fn send_and_receive_handles_synchronously() {
439 println!("Create channels and write one end of one of the channel pairs to the other");
440 let (first, second) = Channel::<()>::create();
441 let (inner_first, inner_second) = Channel::<String>::create();
442 let message = Message::new_with(Arena::new(), |arena| {
443 (None, Some(arena.insert_boxed_slice(Box::new([Some(inner_first.into())]))))
444 });
445 first.write(message).unwrap();
446
447 println!("Receive the channel back on the other end of the first channel pair.");
448 let mut arena = None;
449 let message =
450 second.try_read().unwrap().expect("Expected a message with contents to be received");
451 let (_, received_handles) = message.into_arena_boxes(&mut arena);
452 let mut first_handle_received =
453 ArenaBox::take_boxed_slice(received_handles.expect("expected handles in the message"));
454 let first_handle_received = first_handle_received
455 .first_mut()
456 .expect("expected one handle in the handle set")
457 .take()
458 .expect("expected the first handle to be non-null");
459 let first_handle_received = first_handle_received.resolve();
460 let MixedHandleType::Driver(driver_handle) = first_handle_received else {
461 panic!("Got a non-driver handle when we sent a driver handle");
462 };
463 let inner_first_received = unsafe { Channel::from_driver_handle(driver_handle) };
464
465 println!("Send and receive a string across the now-transmitted channel pair.");
466 inner_first_received
467 .write_with_data(Arena::new(), |arena| arena.insert("boom".to_string()))
468 .unwrap();
469 assert_eq!(inner_second.try_read().unwrap().unwrap().data().unwrap(), &"boom".to_string());
470 }
471
472 async fn ping(mut chan: Channel<u8>) {
473 println!("starting ping!");
474 chan.write_with_data(Arena::new(), |arena| arena.insert(0)).unwrap();
475 while let Ok(Some(msg)) = chan.read(CurrentDispatcher).await {
476 let next = *msg.data().unwrap();
477 println!("ping! {next}");
478 chan.write_with_data(msg.take_arena(), |arena| arena.insert(next + 1)).unwrap();
479 }
480 }
481
482 async fn pong(mut chan: Channel<u8>) {
483 println!("starting pong!");
484 while let Some(msg) = chan.read(CurrentDispatcher).await.unwrap() {
485 let next = *msg.data().unwrap();
486 println!("pong! {next}");
487 if next > 10 {
488 println!("bye!");
489 break;
490 }
491 chan.write_with_data(msg.take_arena(), |arena| arena.insert(next + 1)).unwrap();
492 }
493 }
494
495 #[test]
496 fn async_ping_pong() {
497 spawn_in_driver("async ping pong", async {
498 let (ping_chan, pong_chan) = Channel::create();
499 CurrentDispatcher.spawn_task(ping(ping_chan)).unwrap();
500 pong(pong_chan).await;
501 });
502 }
503
504 #[test]
505 fn async_ping_pong_on_fuchsia_async() {
506 spawn_in_driver("async ping pong", async {
507 let (ping_chan, pong_chan) = Channel::create();
508
509 let fdf_dispatcher = DispatcherBuilder::new()
510 .name("fdf-async")
511 .create()
512 .expect("failure creating non-blocking dispatcher for fdf operations on rust-async dispatcher")
513 .release();
514
515 let rust_async_dispatcher = DispatcherBuilder::new()
516 .name("fuchsia-async")
517 .allow_thread_blocking()
518 .create()
519 .expect("failure creating blocking dispatcher for rust async")
520 .release();
521
522 rust_async_dispatcher
523 .post_task_sync(move |_| {
524 Dispatcher::override_current(fdf_dispatcher, || {
525 let mut executor = fuchsia_async::LocalExecutor::default();
526 executor.run_singlethreaded(ping(ping_chan));
527 });
528 })
529 .unwrap();
530
531 pong(pong_chan).await
532 });
533 }
534
535 async fn recv_lots_of_bytes_with_cancellations(
536 mut rx: Channel<[u8]>,
537 fin_tx: oneshot::Sender<()>,
538 pending_count: Arc<AtomicU64>,
539 ) {
540 let mut immediate_count = 0;
541 let mut count = 0;
542 loop {
543 let mut next_fut = Box::pin(rx.read_bytes(CurrentDispatcher));
547 let next = match futures::poll!(&mut next_fut) {
548 Poll::Pending => {
549 pending_count.fetch_add(1, Ordering::Relaxed);
550 drop(next_fut);
551 rx.read_bytes(CurrentDispatcher).await
552 }
553 Poll::Ready(r) => {
554 immediate_count += 1;
555 r
556 }
557 };
558 match next {
559 Err(Status::PEER_CLOSED) | Ok(None) => break,
560 Err(_) => {
561 next.unwrap();
562 }
563 Ok(Some(msg)) => {
564 assert_eq!(msg.data().unwrap(), &[count as u8; 100]);
565 count += 1;
566 }
567 }
568 }
569 println!("read total: {count}, immediate: {immediate_count}, pending: {pending_count:?}");
570 fin_tx.send(()).unwrap();
572 }
573
574 async fn send_lots_of_bytes(
575 tx: Channel<[u8]>,
576 fin_rx: oneshot::Receiver<()>,
577 pending_count: Arc<AtomicU64>,
578 ) {
579 let arena = Arena::new();
584 print!("writing: ");
585 for i in 0..10000 {
586 tx.write_with_data(arena.clone(), |arena| arena.insert_slice(&[i as u8; 100])).unwrap();
587 print!(".");
591 stdout().flush().unwrap();
592 if pending_count.load(Ordering::Relaxed) > 500 {
593 break;
594 }
595 }
596 drop(tx);
597 fin_rx.await.unwrap();
598 }
599
600 async fn send_and_recv_lots_of_bytes_with_cancellations(dispatcher: DispatcherRef<'static>) {
601 let (tx, rx) = Channel::create();
602 let (fin_tx, fin_rx) = oneshot::channel();
603 let pending_count = Arc::new(AtomicU64::new(0));
604 dispatcher
605 .spawn_task(recv_lots_of_bytes_with_cancellations(rx, fin_tx, pending_count.clone()))
606 .unwrap();
607
608 send_lots_of_bytes(tx, fin_rx, pending_count).await;
609 }
610
611 #[test]
612 fn send_and_recv_lots_of_bytes_with_cancellations_on_synchronized_dispatcher() {
613 spawn_in_driver(
614 "lots of bytes and with some cancellations on a synchronized dispatcher",
615 async {
616 let dispatcher =
617 DispatcherBuilder::new().name("fdf-synchronized").create().unwrap().release();
618
619 send_and_recv_lots_of_bytes_with_cancellations(dispatcher).await;
620 },
621 );
622 }
623
624 #[test]
625 fn send_and_recv_lots_of_bytes_with_cancellations_on_unsynchronized_dispatcher() {
626 spawn_in_driver(
627 "lots of bytes and with some cancellations on an unsynchronized dispatcher",
628 async {
629 let dispatcher = DispatcherBuilder::new()
630 .name("fdf-unsynchronized")
631 .unsynchronized()
632 .create()
633 .unwrap()
634 .release();
635
636 send_and_recv_lots_of_bytes_with_cancellations(dispatcher).await;
637 },
638 );
639 }
640
641 #[test]
642 fn send_and_recv_lots_of_bytes_with_cancellations_on_fuchsia_async_dispatcher() {
643 spawn_in_driver(
644 "lots of bytes and with some cancellations on a fuchsia-async overridden dispatcher",
645 async {
646 let fdf_dispatcher = DispatcherBuilder::new()
647 .name("fdf-async")
648 .create()
649 .expect("failure creating non-blocking dispatcher for fdf operations on rust-async dispatcher")
650 .release();
651
652 let dispatcher = DispatcherBuilder::new()
653 .name("fdf-fuchsia-async")
654 .allow_thread_blocking()
655 .create()
656 .expect("failure creating blocking dispatcher for rust async")
657 .release();
658
659 let (tx, rx) = Channel::create();
660 let (fin_tx, fin_rx) = oneshot::channel();
661 let pending_count = Arc::new(AtomicU64::new(0));
662
663 let pending_count_clone = pending_count.clone();
664 dispatcher
665 .post_task_sync(move |_| {
666 Dispatcher::override_current(fdf_dispatcher, || {
667 let mut executor = fuchsia_async::LocalExecutor::default();
668 executor.run_singlethreaded(recv_lots_of_bytes_with_cancellations(
669 rx,
670 fin_tx,
671 pending_count_clone,
672 ));
673 });
674 })
675 .unwrap();
676
677 send_lots_of_bytes(tx, fin_rx, pending_count).await;
678 },
679 );
680 }
681}