1use super::rwhandle::{RWHandle, ReadableHandle as _, WritableHandle as _};
6use futures::ready;
7use std::fmt;
8use std::future::poll_fn;
9use std::mem::MaybeUninit;
10use std::task::{Context, Poll};
11use zerocopy::{FromBytes, Immutable, IntoBytes};
12use zx::{self as zx, AsHandleRef};
13
14pub trait FifoEntry: IntoBytes + FromBytes + Immutable {}
19
20impl<O: IntoBytes + FromBytes + Immutable> FifoEntry for O {}
21
22pub trait FifoWriteBuffer<T> {
24 fn as_slice(&self) -> &[T];
25}
26
27pub unsafe trait FifoReadBuffer<T> {
35 fn count(&self) -> usize;
37 fn as_mut_ptr(&mut self) -> *mut T;
45}
46
47impl<T: FifoEntry, const N: usize> FifoWriteBuffer<T> for [T; N] {
48 fn as_slice(&self) -> &[T] {
49 self
50 }
51}
52
53impl<T: FifoEntry> FifoWriteBuffer<T> for [T] {
54 fn as_slice(&self) -> &[T] {
55 self
56 }
57}
58
59unsafe impl<T: FifoEntry, const N: usize> FifoReadBuffer<T> for [T; N] {
60 fn count(&self) -> usize {
61 N
62 }
63
64 fn as_mut_ptr(&mut self) -> *mut T {
65 self.as_mut_slice().as_mut_ptr()
66 }
67}
68
69unsafe impl<T: FifoEntry> FifoReadBuffer<T> for [T] {
70 fn count(&self) -> usize {
71 self.len()
72 }
73
74 fn as_mut_ptr(&mut self) -> *mut T {
75 self.as_mut_ptr()
76 }
77}
78
79impl<T: FifoEntry> FifoWriteBuffer<T> for T {
80 fn as_slice(&self) -> &[T] {
81 std::slice::from_ref(self)
82 }
83}
84
85unsafe impl<T: FifoEntry> FifoReadBuffer<T> for T {
86 fn count(&self) -> usize {
87 1
88 }
89
90 fn as_mut_ptr(&mut self) -> *mut T {
91 self as *mut T
92 }
93}
94
95unsafe impl<T: FifoEntry> FifoReadBuffer<T> for MaybeUninit<T> {
96 fn count(&self) -> usize {
97 1
98 }
99
100 fn as_mut_ptr(&mut self) -> *mut T {
101 self.as_mut_ptr()
102 }
103}
104
105unsafe impl<T: FifoEntry> FifoReadBuffer<T> for [MaybeUninit<T>] {
106 fn count(&self) -> usize {
107 self.len()
108 }
109
110 fn as_mut_ptr(&mut self) -> *mut T {
111 self.as_mut_ptr() as *mut T
114 }
115}
116
117pub struct Fifo<R, W = R> {
119 handle: RWHandle<zx::Fifo<R, W>>,
120}
121
122impl<R, W> AsRef<zx::Fifo<R, W>> for Fifo<R, W> {
123 fn as_ref(&self) -> &zx::Fifo<R, W> {
124 self.handle.get_ref()
125 }
126}
127
128impl<R, W> AsHandleRef for Fifo<R, W> {
129 fn as_handle_ref(&self) -> zx::HandleRef<'_> {
130 self.handle.get_ref().as_handle_ref()
131 }
132}
133
134impl<R, W> From<Fifo<R, W>> for zx::Fifo<R, W> {
135 fn from(fifo: Fifo<R, W>) -> zx::Fifo<R, W> {
136 fifo.handle.into_inner()
137 }
138}
139
140impl<R: FifoEntry, W: FifoEntry> Fifo<R, W> {
141 pub fn from_fifo(fifo: impl Into<zx::Fifo<R, W>>) -> Self {
147 Fifo { handle: RWHandle::new(fifo.into()) }
148 }
149
150 pub fn try_write<B: ?Sized + FifoWriteBuffer<W>>(
158 &self,
159 cx: &mut Context<'_>,
160 entries: &B,
161 ) -> Poll<Result<usize, zx::Status>> {
162 ready!(self.handle.poll_writable(cx)?);
163
164 let entries = entries.as_slice();
165 let fifo = self.as_ref();
166 loop {
169 let result = unsafe { fifo.write_raw(entries.as_ptr(), entries.len()) };
170 match result {
171 Err(zx::Status::SHOULD_WAIT) => ready!(self.handle.need_writable(cx)?),
172 Err(e) => return Poll::Ready(Err(e)),
173 Ok(count) => return Poll::Ready(Ok(count)),
174 }
175 }
176 }
177
178 pub fn try_read<B: ?Sized + FifoReadBuffer<R>>(
184 &self,
185 cx: &mut Context<'_>,
186 entries: &mut B,
187 ) -> Poll<Result<usize, zx::Status>> {
188 ready!(self.handle.poll_readable(cx)?);
189
190 let buf = entries.as_mut_ptr();
191 let count = entries.count();
192 let fifo = self.as_ref();
193
194 loop {
195 let result = unsafe { fifo.read_raw(buf, count) };
199
200 match result {
201 Err(zx::Status::SHOULD_WAIT) => ready!(self.handle.need_readable(cx)?),
202 Err(e) => return Poll::Ready(Err(e)),
203 Ok(count) => return Poll::Ready(Ok(count)),
204 }
205 }
206 }
207
208 pub fn async_io(&mut self) -> (FifoReader<'_, R, W>, FifoWriter<'_, R, W>) {
211 (FifoReader(self), FifoWriter(self))
212 }
213}
214
215pub struct FifoWriter<'a, R, W>(&'a Fifo<R, W>);
216
217impl<R: FifoEntry, W: FifoEntry> FifoWriter<'_, R, W> {
218 pub async fn write_entries(
221 &mut self,
222 entries: &(impl ?Sized + FifoWriteBuffer<W>),
223 ) -> Result<(), zx::Status> {
224 let mut entries = entries.as_slice();
225 poll_fn(|cx| {
226 while !entries.is_empty() {
227 match ready!(self.0.try_write(cx, entries)) {
228 Ok(count) => entries = &entries[count..],
229 Err(status) => return Poll::Ready(Err(status)),
230 }
231 }
232 Poll::Ready(Ok(()))
233 })
234 .await
235 }
236
237 pub fn try_write<B: ?Sized + FifoWriteBuffer<W>>(
239 &mut self,
240 cx: &mut Context<'_>,
241 entries: &B,
242 ) -> Poll<Result<usize, zx::Status>> {
243 self.0.try_write(cx, entries)
244 }
245}
246
247pub struct FifoReader<'a, R, W>(&'a Fifo<R, W>);
248
249impl<R: FifoEntry, W: FifoEntry> FifoReader<'_, R, W> {
250 pub async fn read_entries(
251 &mut self,
252 entries: &mut (impl ?Sized + FifoReadBuffer<R>),
253 ) -> Result<usize, zx::Status> {
254 poll_fn(|cx| self.0.try_read(cx, entries)).await
255 }
256
257 pub fn try_read<B: ?Sized + FifoReadBuffer<R>>(
259 &mut self,
260 cx: &mut Context<'_>,
261 entries: &mut B,
262 ) -> Poll<Result<usize, zx::Status>> {
263 self.0.try_read(cx, entries)
264 }
265}
266
267impl<R, W> fmt::Debug for Fifo<R, W> {
268 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
269 self.handle.get_ref().fmt(f)
270 }
271}
272
273#[cfg(test)]
274mod tests {
275 use super::*;
276 use crate::{DurationExt, TestExecutor, Timer};
277 use futures::future::try_join;
278 use futures::prelude::*;
279 use zerocopy::{Immutable, KnownLayout};
280 use zx::prelude::*;
281
282 #[derive(
283 Copy, Clone, Debug, PartialEq, Eq, Default, IntoBytes, KnownLayout, FromBytes, Immutable,
284 )]
285 #[repr(C)]
286 struct Entry {
287 a: u32,
288 b: u32,
289 }
290
291 #[derive(
292 Clone, Debug, PartialEq, Eq, Default, IntoBytes, KnownLayout, FromBytes, Immutable,
293 )]
294 #[repr(C)]
295 struct WrongEntry {
296 a: u16,
297 }
298
299 #[test]
300 fn can_read_write() {
301 let mut exec = TestExecutor::new();
302 let element = Entry { a: 10, b: 20 };
303
304 let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
305 let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
306 let (_, mut tx) = tx.async_io();
307 let (mut rx, _) = rx.async_io();
308
309 let mut buffer = Entry::default();
310 let receiver = rx.read_entries(&mut buffer).map_ok(|count| {
311 assert_eq!(count, 1);
312 });
313
314 let sender = Timer::new(zx::MonotonicDuration::from_millis(10).after_now())
316 .then(|()| tx.write_entries(&element));
317
318 let done = try_join(receiver, sender);
319 exec.run_singlethreaded(done).expect("failed to run receive future on executor");
320 assert_eq!(buffer, element);
321 }
322
323 #[test]
324 fn read_wrong_size() {
325 let mut exec = TestExecutor::new();
326 let elements = &[Entry { a: 10, b: 20 }][..];
327
328 let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
329 let wrong_rx = zx::Fifo::<WrongEntry>::from(rx.into_handle());
330 let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(wrong_rx));
331 let (_, mut tx) = tx.async_io();
332 let (mut rx, _) = rx.async_io();
333
334 let mut buffer = WrongEntry::default();
335 let receiver = rx
336 .read_entries(&mut buffer)
337 .map_ok(|count| panic!("read should have failed, got {count}"));
338
339 let sender = Timer::new(zx::MonotonicDuration::from_millis(10).after_now())
341 .then(|()| tx.write_entries(elements));
342
343 let done = try_join(receiver, sender);
344 let res = exec.run_singlethreaded(done);
345 match res {
346 Err(zx::Status::OUT_OF_RANGE) => (),
347 _ => panic!("did not get out-of-range error"),
348 }
349 }
350
351 #[test]
352 fn write_wrong_size() {
353 let mut exec = TestExecutor::new();
354 let elements = &[WrongEntry { a: 10 }][..];
355
356 let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
357 let wrong_tx = zx::Fifo::<WrongEntry>::from(tx.into_handle());
358 let wrong_rx = zx::Fifo::<WrongEntry>::from(rx.into_handle());
359 let (mut tx, _rx) = (Fifo::from_fifo(wrong_tx), Fifo::from_fifo(wrong_rx));
360 let (_, mut tx) = tx.async_io();
361
362 let sender = Timer::new(zx::MonotonicDuration::from_millis(10).after_now())
363 .then(|()| tx.write_entries(elements));
364
365 let res = exec.run_singlethreaded(sender);
366 match res {
367 Err(zx::Status::OUT_OF_RANGE) => (),
368 _ => panic!("did not get out-of-range error"),
369 }
370 }
371
372 #[test]
373 fn write_into_full() {
374 use std::sync::atomic::{AtomicUsize, Ordering};
375
376 let mut exec = TestExecutor::new();
377 let elements =
378 &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
379
380 let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
381 let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
382
383 let writes_completed = AtomicUsize::new(0);
386 let sender = async {
387 let (_, mut writer) = tx.async_io();
388 writer.write_entries(&elements[..2]).await?;
389 writes_completed.fetch_add(1, Ordering::SeqCst);
390 writer.write_entries(&elements[2..]).await?;
391 writes_completed.fetch_add(1, Ordering::SeqCst);
392 Ok::<(), zx::Status>(())
393 };
394
395 let receiver = async {
397 Timer::new(zx::MonotonicDuration::from_millis(10).after_now()).await;
398 let mut buffer = Entry::default();
399 let (mut reader, _) = rx.async_io();
400 let count = reader.read_entries(&mut buffer).await?;
401 assert_eq!(writes_completed.load(Ordering::SeqCst), 1);
402 assert_eq!(count, 1);
403 assert_eq!(buffer, elements[0]);
404 let count = reader.read_entries(&mut buffer).await?;
405 assert_eq!(count, 1);
408 assert_eq!(buffer, elements[1]);
409 let count = reader.read_entries(&mut buffer).await?;
410 assert_eq!(writes_completed.load(Ordering::SeqCst), 2);
411 assert_eq!(count, 1);
412 assert_eq!(buffer, elements[2]);
413 Ok::<(), zx::Status>(())
414 };
415
416 let done = try_join(receiver, sender);
417
418 exec.run_singlethreaded(done).expect("failed to run receive future on executor");
419 }
420
421 #[test]
422 fn write_more_than_full() {
423 let mut exec = TestExecutor::new();
424 let elements =
425 &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
426
427 let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
428 let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
429 let (_, mut tx) = tx.async_io();
430 let (mut rx, _) = rx.async_io();
431
432 let sender = tx.write_entries(elements);
433
434 let receiver = async {
436 Timer::new(zx::MonotonicDuration::from_millis(10).after_now()).await;
437 for e in elements {
438 let mut buffer = [Entry::default(); 1];
439 let count = rx.read_entries(&mut buffer[..]).await?;
440 assert_eq!(count, 1);
441 assert_eq!(&buffer[0], e);
442 }
443 Ok::<(), zx::Status>(())
444 };
445
446 let done = try_join(receiver, sender);
447
448 exec.run_singlethreaded(done).expect("failed to run receive future on executor");
449 }
450
451 #[test]
452 fn read_multiple() {
453 let mut exec = TestExecutor::new();
454 let elements =
455 &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
456 let (tx, rx) = zx::Fifo::<Entry>::create(elements.len()).expect("failed to create zx fifo");
457 let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
458
459 let write_fut = async {
460 tx.async_io().1.write_entries(elements).await.expect("failed write entries");
461 };
462 let read_fut = async {
463 let mut buffer = [Entry::default(); 5];
465 let count = rx
466 .async_io()
467 .0
468 .read_entries(&mut buffer[..])
469 .await
470 .expect("failed to read entries");
471 assert_eq!(count, elements.len());
472 assert_eq!(&buffer[..count], elements);
473 };
474 let ((), ()) = exec.run_singlethreaded(futures::future::join(write_fut, read_fut));
475 }
476
477 #[test]
478 fn read_one() {
479 let mut exec = TestExecutor::new();
480 let elements =
481 &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
482 let (tx, rx) = zx::Fifo::<Entry>::create(elements.len()).expect("failed to create zx fifo");
483 let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
484
485 let write_fut = async {
486 tx.async_io().1.write_entries(elements).await.expect("failed write entries");
487 };
488 let read_fut = async {
489 let (mut reader, _) = rx.async_io();
490 for e in elements {
491 let mut entry = Entry::default();
492 assert_eq!(reader.read_entries(&mut entry).await.expect("failed to read entry"), 1);
493 assert_eq!(&entry, e);
494 }
495 };
496 let ((), ()) = exec.run_singlethreaded(futures::future::join(write_fut, read_fut));
497 }
498
499 #[test]
500 fn maybe_uninit_single() {
501 let mut exec = TestExecutor::new();
502 let element = Entry { a: 10, b: 20 };
503 let (tx, rx) = zx::Fifo::<Entry>::create(1).expect("failed to create zx fifo");
504 let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
505
506 let write_fut = async {
507 tx.async_io().1.write_entries(&element).await.expect("failed write entries");
508 };
509 let read_fut = async {
510 let mut buffer = MaybeUninit::<Entry>::uninit();
511 let count =
512 rx.async_io().0.read_entries(&mut buffer).await.expect("failed to read entries");
513 assert_eq!(count, 1);
514 let read = unsafe { buffer.assume_init() };
516 assert_eq!(read, element);
517 };
518 let ((), ()) = exec.run_singlethreaded(futures::future::join(write_fut, read_fut));
519 }
520
521 #[test]
522 fn maybe_uninit_slice() {
523 let mut exec = TestExecutor::new();
524 let elements =
525 &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
526 let (tx, rx) = zx::Fifo::<Entry>::create(elements.len()).expect("failed to create zx fifo");
527 let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
528
529 let write_fut = async {
530 tx.async_io().1.write_entries(elements).await.expect("failed write entries");
531 };
532 let read_fut = async {
533 let mut buffer = [MaybeUninit::<Entry>::uninit(); 15];
535 let count = rx
536 .async_io()
537 .0
538 .read_entries(&mut buffer[..])
539 .await
540 .expect("failed to read entries");
541 assert_eq!(count, elements.len());
542 let read = &mut buffer[..count];
543 for (i, v) in read.iter_mut().enumerate() {
544 let read = unsafe { v.assume_init_ref() };
547 assert_eq!(read, &elements[i]);
548 unsafe {
552 v.assume_init_drop();
553 }
554 }
555 };
556 let ((), ()) = exec.run_singlethreaded(futures::future::join(write_fut, read_fut));
557 }
558}