1use super::rwhandle::{RWHandle, ReadableHandle as _, WritableHandle as _};
6use futures::ready;
7use std::fmt;
8use std::future::poll_fn;
9use std::mem::MaybeUninit;
10use std::task::{Context, Poll};
11use zerocopy::{FromBytes, Immutable, IntoBytes};
12use zx::{self as zx, AsHandleRef};
13
14pub trait FifoEntry: IntoBytes + FromBytes + Immutable {}
19
20impl<O: IntoBytes + FromBytes + Immutable> FifoEntry for O {}
21
22pub trait FifoWriteBuffer<T> {
24 fn as_slice(&self) -> &[T];
25}
26
27pub unsafe trait FifoReadBuffer<T> {
35 fn count(&self) -> usize;
37 fn as_mut_ptr(&mut self) -> *mut T;
45}
46
47impl<T: FifoEntry, const N: usize> FifoWriteBuffer<T> for [T; N] {
48 fn as_slice(&self) -> &[T] {
49 self
50 }
51}
52
53impl<T: FifoEntry> FifoWriteBuffer<T> for [T] {
54 fn as_slice(&self) -> &[T] {
55 self
56 }
57}
58
59unsafe impl<T: FifoEntry, const N: usize> FifoReadBuffer<T> for [T; N] {
60 fn count(&self) -> usize {
61 N
62 }
63
64 fn as_mut_ptr(&mut self) -> *mut T {
65 self.as_mut_slice().as_mut_ptr()
66 }
67}
68
69unsafe impl<T: FifoEntry> FifoReadBuffer<T> for [T] {
70 fn count(&self) -> usize {
71 self.len()
72 }
73
74 fn as_mut_ptr(&mut self) -> *mut T {
75 self.as_mut_ptr()
76 }
77}
78
79impl<T: FifoEntry> FifoWriteBuffer<T> for T {
80 fn as_slice(&self) -> &[T] {
81 std::slice::from_ref(self)
82 }
83}
84
85unsafe impl<T: FifoEntry> FifoReadBuffer<T> for T {
86 fn count(&self) -> usize {
87 1
88 }
89
90 fn as_mut_ptr(&mut self) -> *mut T {
91 self as *mut T
92 }
93}
94
95unsafe impl<T: FifoEntry> FifoReadBuffer<T> for MaybeUninit<T> {
96 fn count(&self) -> usize {
97 1
98 }
99
100 fn as_mut_ptr(&mut self) -> *mut T {
101 self.as_mut_ptr()
102 }
103}
104
105unsafe impl<T: FifoEntry> FifoReadBuffer<T> for [MaybeUninit<T>] {
106 fn count(&self) -> usize {
107 self.len()
108 }
109
110 fn as_mut_ptr(&mut self) -> *mut T {
111 self.as_mut_ptr() as *mut T
114 }
115}
116
117pub struct Fifo<R, W = R> {
119 handle: RWHandle<zx::Fifo<R, W>>,
120}
121
122impl<R: FromBytes + IntoBytes, W: FromBytes + IntoBytes> AsRef<zx::Fifo<R, W>> for Fifo<R, W> {
123 fn as_ref(&self) -> &zx::Fifo<R, W> {
124 self.handle.get_ref()
125 }
126}
127
128impl<R: FromBytes + IntoBytes, W: FromBytes + IntoBytes> AsHandleRef for Fifo<R, W> {
129 fn as_handle_ref(&self) -> zx::HandleRef<'_> {
130 self.handle.get_ref().as_handle_ref()
131 }
132}
133
134impl<R: FromBytes + IntoBytes, W: FromBytes + IntoBytes> From<Fifo<R, W>> for zx::Fifo<R, W> {
135 fn from(fifo: Fifo<R, W>) -> zx::Fifo<R, W> {
136 fifo.handle.into_inner()
137 }
138}
139
140impl<R: FromBytes + IntoBytes, W: FromBytes + IntoBytes> Fifo<R, W> {
141 pub fn from_fifo(fifo: impl Into<zx::Fifo<R, W>>) -> Self {
147 Fifo { handle: RWHandle::new(fifo.into()) }
148 }
149
150 pub fn try_write<B: ?Sized + FifoWriteBuffer<W>>(
158 &self,
159 cx: &mut Context<'_>,
160 entries: &B,
161 ) -> Poll<Result<usize, zx::Status>> {
162 ready!(self.handle.poll_writable(cx)?);
163
164 let entries = entries.as_slice();
165 let fifo = self.as_ref();
166 loop {
169 let result = unsafe { fifo.write_raw(entries.as_ptr(), entries.len()) };
170 match result {
171 Err(zx::Status::SHOULD_WAIT) => ready!(self.handle.need_writable(cx)?),
172 Err(e) => return Poll::Ready(Err(e)),
173 Ok(count) => return Poll::Ready(Ok(count)),
174 }
175 }
176 }
177
178 pub fn try_read<B: ?Sized + FifoReadBuffer<R>>(
184 &self,
185 cx: &mut Context<'_>,
186 entries: &mut B,
187 ) -> Poll<Result<usize, zx::Status>> {
188 ready!(self.handle.poll_readable(cx)?);
189
190 let buf = entries.as_mut_ptr();
191 let count = entries.count();
192 let fifo = self.as_ref();
193
194 loop {
195 let result = unsafe { fifo.read_raw(buf, count) };
199
200 match result {
201 Err(zx::Status::SHOULD_WAIT) => ready!(self.handle.need_readable(cx)?),
202 Err(e) => return Poll::Ready(Err(e)),
203 Ok(count) => return Poll::Ready(Ok(count)),
204 }
205 }
206 }
207
208 pub fn async_io(&mut self) -> (FifoReader<'_, R, W>, FifoWriter<'_, R, W>) {
211 (FifoReader(self), FifoWriter(self))
212 }
213}
214
215pub struct FifoWriter<'a, R, W>(&'a Fifo<R, W>);
216
217impl<R: FifoEntry, W: FifoEntry> FifoWriter<'_, R, W> {
218 pub async fn write_entries(
221 &mut self,
222 entries: &(impl ?Sized + FifoWriteBuffer<W>),
223 ) -> Result<(), zx::Status> {
224 let mut entries = entries.as_slice();
225 poll_fn(|cx| {
226 while !entries.is_empty() {
227 match ready!(self.0.try_write(cx, entries)) {
228 Ok(count) => entries = &entries[count..],
229 Err(status) => return Poll::Ready(Err(status)),
230 }
231 }
232 Poll::Ready(Ok(()))
233 })
234 .await
235 }
236
237 pub fn try_write<B: ?Sized + FifoWriteBuffer<W>>(
239 &mut self,
240 cx: &mut Context<'_>,
241 entries: &B,
242 ) -> Poll<Result<usize, zx::Status>> {
243 self.0.try_write(cx, entries)
244 }
245}
246
247pub struct FifoReader<'a, R, W>(&'a Fifo<R, W>);
248
249impl<R: FifoEntry, W: FifoEntry> FifoReader<'_, R, W> {
250 pub async fn read_entries(
251 &mut self,
252 entries: &mut (impl ?Sized + FifoReadBuffer<R>),
253 ) -> Result<usize, zx::Status> {
254 poll_fn(|cx| self.0.try_read(cx, entries)).await
255 }
256
257 pub fn try_read<B: ?Sized + FifoReadBuffer<R>>(
259 &mut self,
260 cx: &mut Context<'_>,
261 entries: &mut B,
262 ) -> Poll<Result<usize, zx::Status>> {
263 self.0.try_read(cx, entries)
264 }
265}
266
267impl<R: FromBytes + IntoBytes, W: FromBytes + IntoBytes> fmt::Debug for Fifo<R, W> {
268 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
269 self.handle.get_ref().fmt(f)
270 }
271}
272
273#[cfg(test)]
274mod tests {
275 use super::*;
276 use crate::{DurationExt, TestExecutor, Timer};
277 use futures::future::try_join;
278 use futures::prelude::*;
279 use zerocopy::{Immutable, KnownLayout};
280
281 #[derive(
282 Copy, Clone, Debug, PartialEq, Eq, Default, IntoBytes, KnownLayout, FromBytes, Immutable,
283 )]
284 #[repr(C)]
285 struct Entry {
286 a: u32,
287 b: u32,
288 }
289
290 #[derive(
291 Clone, Debug, PartialEq, Eq, Default, IntoBytes, KnownLayout, FromBytes, Immutable,
292 )]
293 #[repr(C)]
294 struct WrongEntry {
295 a: u16,
296 }
297
298 #[test]
299 fn can_read_write() {
300 let mut exec = TestExecutor::new();
301 let element = Entry { a: 10, b: 20 };
302
303 let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
304 let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
305 let (_, mut tx) = tx.async_io();
306 let (mut rx, _) = rx.async_io();
307
308 let mut buffer = Entry::default();
309 let receiver = rx.read_entries(&mut buffer).map_ok(|count| {
310 assert_eq!(count, 1);
311 });
312
313 let sender = Timer::new(zx::MonotonicDuration::from_millis(10).after_now())
315 .then(|()| tx.write_entries(&element));
316
317 let done = try_join(receiver, sender);
318 exec.run_singlethreaded(done).expect("failed to run receive future on executor");
319 assert_eq!(buffer, element);
320 }
321
322 #[test]
323 fn read_wrong_size() {
324 let mut exec = TestExecutor::new();
325 let elements = &[Entry { a: 10, b: 20 }][..];
326
327 let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
328 let wrong_rx = zx::Fifo::<WrongEntry>::from(rx.into_handle());
329 let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(wrong_rx));
330 let (_, mut tx) = tx.async_io();
331 let (mut rx, _) = rx.async_io();
332
333 let mut buffer = WrongEntry::default();
334 let receiver = rx
335 .read_entries(&mut buffer)
336 .map_ok(|count| panic!("read should have failed, got {count}"));
337
338 let sender = Timer::new(zx::MonotonicDuration::from_millis(10).after_now())
340 .then(|()| tx.write_entries(elements));
341
342 let done = try_join(receiver, sender);
343 let res = exec.run_singlethreaded(done);
344 match res {
345 Err(zx::Status::OUT_OF_RANGE) => (),
346 _ => panic!("did not get out-of-range error"),
347 }
348 }
349
350 #[test]
351 fn write_wrong_size() {
352 let mut exec = TestExecutor::new();
353 let elements = &[WrongEntry { a: 10 }][..];
354
355 let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
356 let wrong_tx = zx::Fifo::<WrongEntry>::from(tx.into_handle());
357 let wrong_rx = zx::Fifo::<WrongEntry>::from(rx.into_handle());
358 let (mut tx, _rx) = (Fifo::from_fifo(wrong_tx), Fifo::from_fifo(wrong_rx));
359 let (_, mut tx) = tx.async_io();
360
361 let sender = Timer::new(zx::MonotonicDuration::from_millis(10).after_now())
362 .then(|()| tx.write_entries(elements));
363
364 let res = exec.run_singlethreaded(sender);
365 match res {
366 Err(zx::Status::OUT_OF_RANGE) => (),
367 _ => panic!("did not get out-of-range error"),
368 }
369 }
370
371 #[test]
372 fn write_into_full() {
373 use std::sync::atomic::{AtomicUsize, Ordering};
374
375 let mut exec = TestExecutor::new();
376 let elements =
377 &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
378
379 let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
380 let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
381
382 let writes_completed = AtomicUsize::new(0);
385 let sender = async {
386 let (_, mut writer) = tx.async_io();
387 writer.write_entries(&elements[..2]).await?;
388 writes_completed.fetch_add(1, Ordering::SeqCst);
389 writer.write_entries(&elements[2..]).await?;
390 writes_completed.fetch_add(1, Ordering::SeqCst);
391 Ok::<(), zx::Status>(())
392 };
393
394 let receiver = async {
396 Timer::new(zx::MonotonicDuration::from_millis(10).after_now()).await;
397 let mut buffer = Entry::default();
398 let (mut reader, _) = rx.async_io();
399 let count = reader.read_entries(&mut buffer).await?;
400 assert_eq!(writes_completed.load(Ordering::SeqCst), 1);
401 assert_eq!(count, 1);
402 assert_eq!(buffer, elements[0]);
403 let count = reader.read_entries(&mut buffer).await?;
404 assert_eq!(count, 1);
407 assert_eq!(buffer, elements[1]);
408 let count = reader.read_entries(&mut buffer).await?;
409 assert_eq!(writes_completed.load(Ordering::SeqCst), 2);
410 assert_eq!(count, 1);
411 assert_eq!(buffer, elements[2]);
412 Ok::<(), zx::Status>(())
413 };
414
415 let done = try_join(receiver, sender);
416
417 exec.run_singlethreaded(done).expect("failed to run receive future on executor");
418 }
419
420 #[test]
421 fn write_more_than_full() {
422 let mut exec = TestExecutor::new();
423 let elements =
424 &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
425
426 let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
427 let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
428 let (_, mut tx) = tx.async_io();
429 let (mut rx, _) = rx.async_io();
430
431 let sender = tx.write_entries(elements);
432
433 let receiver = async {
435 Timer::new(zx::MonotonicDuration::from_millis(10).after_now()).await;
436 for e in elements {
437 let mut buffer = [Entry::default(); 1];
438 let count = rx.read_entries(&mut buffer[..]).await?;
439 assert_eq!(count, 1);
440 assert_eq!(&buffer[0], e);
441 }
442 Ok::<(), zx::Status>(())
443 };
444
445 let done = try_join(receiver, sender);
446
447 exec.run_singlethreaded(done).expect("failed to run receive future on executor");
448 }
449
450 #[test]
451 fn read_multiple() {
452 let mut exec = TestExecutor::new();
453 let elements =
454 &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
455 let (tx, rx) = zx::Fifo::<Entry>::create(elements.len()).expect("failed to create zx fifo");
456 let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
457
458 let write_fut = async {
459 tx.async_io().1.write_entries(elements).await.expect("failed write entries");
460 };
461 let read_fut = async {
462 let mut buffer = [Entry::default(); 5];
464 let count = rx
465 .async_io()
466 .0
467 .read_entries(&mut buffer[..])
468 .await
469 .expect("failed to read entries");
470 assert_eq!(count, elements.len());
471 assert_eq!(&buffer[..count], elements);
472 };
473 let ((), ()) = exec.run_singlethreaded(futures::future::join(write_fut, read_fut));
474 }
475
476 #[test]
477 fn read_one() {
478 let mut exec = TestExecutor::new();
479 let elements =
480 &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
481 let (tx, rx) = zx::Fifo::<Entry>::create(elements.len()).expect("failed to create zx fifo");
482 let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
483
484 let write_fut = async {
485 tx.async_io().1.write_entries(elements).await.expect("failed write entries");
486 };
487 let read_fut = async {
488 let (mut reader, _) = rx.async_io();
489 for e in elements {
490 let mut entry = Entry::default();
491 assert_eq!(reader.read_entries(&mut entry).await.expect("failed to read entry"), 1);
492 assert_eq!(&entry, e);
493 }
494 };
495 let ((), ()) = exec.run_singlethreaded(futures::future::join(write_fut, read_fut));
496 }
497
498 #[test]
499 fn maybe_uninit_single() {
500 let mut exec = TestExecutor::new();
501 let element = Entry { a: 10, b: 20 };
502 let (tx, rx) = zx::Fifo::<Entry>::create(1).expect("failed to create zx fifo");
503 let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
504
505 let write_fut = async {
506 tx.async_io().1.write_entries(&element).await.expect("failed write entries");
507 };
508 let read_fut = async {
509 let mut buffer = MaybeUninit::<Entry>::uninit();
510 let count =
511 rx.async_io().0.read_entries(&mut buffer).await.expect("failed to read entries");
512 assert_eq!(count, 1);
513 let read = unsafe { buffer.assume_init() };
515 assert_eq!(read, element);
516 };
517 let ((), ()) = exec.run_singlethreaded(futures::future::join(write_fut, read_fut));
518 }
519
520 #[test]
521 fn maybe_uninit_slice() {
522 let mut exec = TestExecutor::new();
523 let elements =
524 &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
525 let (tx, rx) = zx::Fifo::<Entry>::create(elements.len()).expect("failed to create zx fifo");
526 let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
527
528 let write_fut = async {
529 tx.async_io().1.write_entries(elements).await.expect("failed write entries");
530 };
531 let read_fut = async {
532 let mut buffer = [MaybeUninit::<Entry>::uninit(); 15];
534 let count = rx
535 .async_io()
536 .0
537 .read_entries(&mut buffer[..])
538 .await
539 .expect("failed to read entries");
540 assert_eq!(count, elements.len());
541 let read = &mut buffer[..count];
542 for (i, v) in read.iter_mut().enumerate() {
543 let read = unsafe { v.assume_init_ref() };
546 assert_eq!(read, &elements[i]);
547 unsafe {
551 v.assume_init_drop();
552 }
553 }
554 };
555 let ((), ()) = exec.run_singlethreaded(futures::future::join(write_fut, read_fut));
556 }
557}