fuchsia_async/handle/zircon/
fifo.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::rwhandle::{RWHandle, ReadableHandle as _, WritableHandle as _};
6use futures::ready;
7use std::fmt;
8use std::future::poll_fn;
9use std::mem::MaybeUninit;
10use std::task::{Context, Poll};
11use zerocopy::{FromBytes, Immutable, IntoBytes};
12use zx::{self as zx, AsHandleRef};
13
14/// Marker trait for types that can be read/written with a `Fifo`.
15///
16/// An implementation is provided for all types that implement
17/// [`IntoBytes`], [`FromBytes`], and [`Immutable`].
18pub trait FifoEntry: IntoBytes + FromBytes + Immutable {}
19
20impl<O: IntoBytes + FromBytes + Immutable> FifoEntry for O {}
21
22/// A buffer used to write `T` into [`Fifo`] objects.
23pub trait FifoWriteBuffer<T> {
24    fn as_slice(&self) -> &[T];
25}
26
27/// A buffer used to read `T` from [`Fifo`] objects.
28///
29/// # Safety
30///
31/// This trait is unsafe because the compiler cannot verify a correct
32/// implementation of `as_bytes_ptr_mut`. See
33/// [`FifoReadBuffer::as_bytes_ptr_mut`] for safety notes.
34pub unsafe trait FifoReadBuffer<T> {
35    /// Returns the number of slots available in the buffer to be rceived.
36    fn count(&self) -> usize;
37    /// Returns a mutable pointer to the buffer contents where FIFO entries must
38    /// be written into.
39    ///
40    /// # Safety
41    ///
42    /// The returned memory *must* be at least `count() * sizeof<T>()` bytes
43    /// long.
44    fn as_mut_ptr(&mut self) -> *mut T;
45}
46
47impl<T: FifoEntry, const N: usize> FifoWriteBuffer<T> for [T; N] {
48    fn as_slice(&self) -> &[T] {
49        self
50    }
51}
52
53impl<T: FifoEntry> FifoWriteBuffer<T> for [T] {
54    fn as_slice(&self) -> &[T] {
55        self
56    }
57}
58
59unsafe impl<T: FifoEntry, const N: usize> FifoReadBuffer<T> for [T; N] {
60    fn count(&self) -> usize {
61        N
62    }
63
64    fn as_mut_ptr(&mut self) -> *mut T {
65        self.as_mut_slice().as_mut_ptr()
66    }
67}
68
69unsafe impl<T: FifoEntry> FifoReadBuffer<T> for [T] {
70    fn count(&self) -> usize {
71        self.len()
72    }
73
74    fn as_mut_ptr(&mut self) -> *mut T {
75        self.as_mut_ptr()
76    }
77}
78
79impl<T: FifoEntry> FifoWriteBuffer<T> for T {
80    fn as_slice(&self) -> &[T] {
81        std::slice::from_ref(self)
82    }
83}
84
85unsafe impl<T: FifoEntry> FifoReadBuffer<T> for T {
86    fn count(&self) -> usize {
87        1
88    }
89
90    fn as_mut_ptr(&mut self) -> *mut T {
91        self as *mut T
92    }
93}
94
95unsafe impl<T: FifoEntry> FifoReadBuffer<T> for MaybeUninit<T> {
96    fn count(&self) -> usize {
97        1
98    }
99
100    fn as_mut_ptr(&mut self) -> *mut T {
101        self.as_mut_ptr()
102    }
103}
104
105unsafe impl<T: FifoEntry> FifoReadBuffer<T> for [MaybeUninit<T>] {
106    fn count(&self) -> usize {
107        self.len()
108    }
109
110    fn as_mut_ptr(&mut self) -> *mut T {
111        // TODO(https://github.com/rust-lang/rust/issues/63569): Use
112        // `MaybeUninit::slice_as_mut_ptr` once stable.
113        self.as_mut_ptr() as *mut T
114    }
115}
116
117/// An I/O object representing a `Fifo`.
118pub struct Fifo<R, W = R> {
119    handle: RWHandle<zx::Fifo<R, W>>,
120}
121
122impl<R, W> AsRef<zx::Fifo<R, W>> for Fifo<R, W> {
123    fn as_ref(&self) -> &zx::Fifo<R, W> {
124        self.handle.get_ref()
125    }
126}
127
128impl<R, W> AsHandleRef for Fifo<R, W> {
129    fn as_handle_ref(&self) -> zx::HandleRef<'_> {
130        self.handle.get_ref().as_handle_ref()
131    }
132}
133
134impl<R, W> From<Fifo<R, W>> for zx::Fifo<R, W> {
135    fn from(fifo: Fifo<R, W>) -> zx::Fifo<R, W> {
136        fifo.handle.into_inner()
137    }
138}
139
140impl<R: FifoEntry, W: FifoEntry> Fifo<R, W> {
141    /// Creates a new `Fifo` from a previously-created `zx::Fifo`.
142    ///
143    /// # Panics
144    ///
145    /// If called on a thread that does not have a current async executor.
146    pub fn from_fifo(fifo: impl Into<zx::Fifo<R, W>>) -> Self {
147        Fifo { handle: RWHandle::new(fifo.into()) }
148    }
149
150    /// Writes entries to the fifo and registers this `Fifo` as needing a write on receiving a
151    /// `zx::Status::SHOULD_WAIT`.
152    ///
153    /// Returns the number of elements processed.
154    ///
155    /// NOTE: Only one writer is supported; this will overwrite any waker registered with a previous
156    /// invocation to `try_write`.
157    pub fn try_write<B: ?Sized + FifoWriteBuffer<W>>(
158        &self,
159        cx: &mut Context<'_>,
160        entries: &B,
161    ) -> Poll<Result<usize, zx::Status>> {
162        ready!(self.handle.poll_writable(cx)?);
163
164        let entries = entries.as_slice();
165        let fifo = self.as_ref();
166        // SAFETY: Safety relies on us keeping the slice alive over the call to `write_raw`, which
167        // we do.
168        loop {
169            let result = unsafe { fifo.write_raw(entries.as_ptr(), entries.len()) };
170            match result {
171                Err(zx::Status::SHOULD_WAIT) => ready!(self.handle.need_writable(cx)?),
172                Err(e) => return Poll::Ready(Err(e)),
173                Ok(count) => return Poll::Ready(Ok(count)),
174            }
175        }
176    }
177
178    /// Reads entries from the fifo into `entries` and registers this `Fifo` as needing a read on
179    /// receiving a `zx::Status::SHOULD_WAIT`.
180    ///
181    /// NOTE: Only one reader is supported; this will overwrite any waker registered with a previous
182    /// invocation to `try_read`.
183    pub fn try_read<B: ?Sized + FifoReadBuffer<R>>(
184        &self,
185        cx: &mut Context<'_>,
186        entries: &mut B,
187    ) -> Poll<Result<usize, zx::Status>> {
188        ready!(self.handle.poll_readable(cx)?);
189
190        let buf = entries.as_mut_ptr();
191        let count = entries.count();
192        let fifo = self.as_ref();
193
194        loop {
195            // SAFETY: Safety relies on the pointer returned by `B` being valid,
196            // which itself depends on a correct implementation of `FifoEntry` for
197            // `R`.
198            let result = unsafe { fifo.read_raw(buf, count) };
199
200            match result {
201                Err(zx::Status::SHOULD_WAIT) => ready!(self.handle.need_readable(cx)?),
202                Err(e) => return Poll::Ready(Err(e)),
203                Ok(count) => return Poll::Ready(Ok(count)),
204            }
205        }
206    }
207
208    /// Returns a reader and writer which have async functions that can be used to read and write
209    /// requests.
210    pub fn async_io(&mut self) -> (FifoReader<'_, R, W>, FifoWriter<'_, R, W>) {
211        (FifoReader(self), FifoWriter(self))
212    }
213}
214
215pub struct FifoWriter<'a, R, W>(&'a Fifo<R, W>);
216
217impl<R: FifoEntry, W: FifoEntry> FifoWriter<'_, R, W> {
218    /// NOTE: If this future is dropped or there is an error, there is no indication how many
219    /// entries were successfully written.
220    pub async fn write_entries(
221        &mut self,
222        entries: &(impl ?Sized + FifoWriteBuffer<W>),
223    ) -> Result<(), zx::Status> {
224        let mut entries = entries.as_slice();
225        poll_fn(|cx| {
226            while !entries.is_empty() {
227                match ready!(self.0.try_write(cx, entries)) {
228                    Ok(count) => entries = &entries[count..],
229                    Err(status) => return Poll::Ready(Err(status)),
230                }
231            }
232            Poll::Ready(Ok(()))
233        })
234        .await
235    }
236
237    /// Same as Fifo::try_write.
238    pub fn try_write<B: ?Sized + FifoWriteBuffer<W>>(
239        &mut self,
240        cx: &mut Context<'_>,
241        entries: &B,
242    ) -> Poll<Result<usize, zx::Status>> {
243        self.0.try_write(cx, entries)
244    }
245}
246
247pub struct FifoReader<'a, R, W>(&'a Fifo<R, W>);
248
249impl<R: FifoEntry, W: FifoEntry> FifoReader<'_, R, W> {
250    pub async fn read_entries(
251        &mut self,
252        entries: &mut (impl ?Sized + FifoReadBuffer<R>),
253    ) -> Result<usize, zx::Status> {
254        poll_fn(|cx| self.0.try_read(cx, entries)).await
255    }
256
257    /// Same as Fifo::try_read.
258    pub fn try_read<B: ?Sized + FifoReadBuffer<R>>(
259        &mut self,
260        cx: &mut Context<'_>,
261        entries: &mut B,
262    ) -> Poll<Result<usize, zx::Status>> {
263        self.0.try_read(cx, entries)
264    }
265}
266
267impl<R, W> fmt::Debug for Fifo<R, W> {
268    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
269        self.handle.get_ref().fmt(f)
270    }
271}
272
273#[cfg(test)]
274mod tests {
275    use super::*;
276    use crate::{DurationExt, TestExecutor, Timer};
277    use futures::future::try_join;
278    use futures::prelude::*;
279    use zerocopy::{Immutable, KnownLayout};
280    use zx::prelude::*;
281
282    #[derive(
283        Copy, Clone, Debug, PartialEq, Eq, Default, IntoBytes, KnownLayout, FromBytes, Immutable,
284    )]
285    #[repr(C)]
286    struct Entry {
287        a: u32,
288        b: u32,
289    }
290
291    #[derive(
292        Clone, Debug, PartialEq, Eq, Default, IntoBytes, KnownLayout, FromBytes, Immutable,
293    )]
294    #[repr(C)]
295    struct WrongEntry {
296        a: u16,
297    }
298
299    #[test]
300    fn can_read_write() {
301        let mut exec = TestExecutor::new();
302        let element = Entry { a: 10, b: 20 };
303
304        let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
305        let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
306        let (_, mut tx) = tx.async_io();
307        let (mut rx, _) = rx.async_io();
308
309        let mut buffer = Entry::default();
310        let receiver = rx.read_entries(&mut buffer).map_ok(|count| {
311            assert_eq!(count, 1);
312        });
313
314        // Sends an entry after the timeout has passed
315        let sender = Timer::new(zx::MonotonicDuration::from_millis(10).after_now())
316            .then(|()| tx.write_entries(&element));
317
318        let done = try_join(receiver, sender);
319        exec.run_singlethreaded(done).expect("failed to run receive future on executor");
320        assert_eq!(buffer, element);
321    }
322
323    #[test]
324    fn read_wrong_size() {
325        let mut exec = TestExecutor::new();
326        let elements = &[Entry { a: 10, b: 20 }][..];
327
328        let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
329        let wrong_rx = zx::Fifo::<WrongEntry>::from(rx.into_handle());
330        let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(wrong_rx));
331        let (_, mut tx) = tx.async_io();
332        let (mut rx, _) = rx.async_io();
333
334        let mut buffer = WrongEntry::default();
335        let receiver = rx
336            .read_entries(&mut buffer)
337            .map_ok(|count| panic!("read should have failed, got {count}"));
338
339        // Sends an entry after the timeout has passed
340        let sender = Timer::new(zx::MonotonicDuration::from_millis(10).after_now())
341            .then(|()| tx.write_entries(elements));
342
343        let done = try_join(receiver, sender);
344        let res = exec.run_singlethreaded(done);
345        match res {
346            Err(zx::Status::OUT_OF_RANGE) => (),
347            _ => panic!("did not get out-of-range error"),
348        }
349    }
350
351    #[test]
352    fn write_wrong_size() {
353        let mut exec = TestExecutor::new();
354        let elements = &[WrongEntry { a: 10 }][..];
355
356        let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
357        let wrong_tx = zx::Fifo::<WrongEntry>::from(tx.into_handle());
358        let wrong_rx = zx::Fifo::<WrongEntry>::from(rx.into_handle());
359        let (mut tx, _rx) = (Fifo::from_fifo(wrong_tx), Fifo::from_fifo(wrong_rx));
360        let (_, mut tx) = tx.async_io();
361
362        let sender = Timer::new(zx::MonotonicDuration::from_millis(10).after_now())
363            .then(|()| tx.write_entries(elements));
364
365        let res = exec.run_singlethreaded(sender);
366        match res {
367            Err(zx::Status::OUT_OF_RANGE) => (),
368            _ => panic!("did not get out-of-range error"),
369        }
370    }
371
372    #[test]
373    fn write_into_full() {
374        use std::sync::atomic::{AtomicUsize, Ordering};
375
376        let mut exec = TestExecutor::new();
377        let elements =
378            &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
379
380        let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
381        let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
382
383        // Use `writes_completed` to verify that not all writes
384        // are transmitted at once, and the last write is actually blocked.
385        let writes_completed = AtomicUsize::new(0);
386        let sender = async {
387            let (_, mut writer) = tx.async_io();
388            writer.write_entries(&elements[..2]).await?;
389            writes_completed.fetch_add(1, Ordering::SeqCst);
390            writer.write_entries(&elements[2..]).await?;
391            writes_completed.fetch_add(1, Ordering::SeqCst);
392            Ok::<(), zx::Status>(())
393        };
394
395        // Wait 10 ms, then read the messages from the fifo.
396        let receiver = async {
397            Timer::new(zx::MonotonicDuration::from_millis(10).after_now()).await;
398            let mut buffer = Entry::default();
399            let (mut reader, _) = rx.async_io();
400            let count = reader.read_entries(&mut buffer).await?;
401            assert_eq!(writes_completed.load(Ordering::SeqCst), 1);
402            assert_eq!(count, 1);
403            assert_eq!(buffer, elements[0]);
404            let count = reader.read_entries(&mut buffer).await?;
405            // At this point, the last write may or may not have
406            // been written.
407            assert_eq!(count, 1);
408            assert_eq!(buffer, elements[1]);
409            let count = reader.read_entries(&mut buffer).await?;
410            assert_eq!(writes_completed.load(Ordering::SeqCst), 2);
411            assert_eq!(count, 1);
412            assert_eq!(buffer, elements[2]);
413            Ok::<(), zx::Status>(())
414        };
415
416        let done = try_join(receiver, sender);
417
418        exec.run_singlethreaded(done).expect("failed to run receive future on executor");
419    }
420
421    #[test]
422    fn write_more_than_full() {
423        let mut exec = TestExecutor::new();
424        let elements =
425            &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
426
427        let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
428        let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
429        let (_, mut tx) = tx.async_io();
430        let (mut rx, _) = rx.async_io();
431
432        let sender = tx.write_entries(elements);
433
434        // Wait 10 ms, then read the messages from the fifo.
435        let receiver = async {
436            Timer::new(zx::MonotonicDuration::from_millis(10).after_now()).await;
437            for e in elements {
438                let mut buffer = [Entry::default(); 1];
439                let count = rx.read_entries(&mut buffer[..]).await?;
440                assert_eq!(count, 1);
441                assert_eq!(&buffer[0], e);
442            }
443            Ok::<(), zx::Status>(())
444        };
445
446        let done = try_join(receiver, sender);
447
448        exec.run_singlethreaded(done).expect("failed to run receive future on executor");
449    }
450
451    #[test]
452    fn read_multiple() {
453        let mut exec = TestExecutor::new();
454        let elements =
455            &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
456        let (tx, rx) = zx::Fifo::<Entry>::create(elements.len()).expect("failed to create zx fifo");
457        let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
458
459        let write_fut = async {
460            tx.async_io().1.write_entries(elements).await.expect("failed write entries");
461        };
462        let read_fut = async {
463            // Use a larger buffer to show partial reads.
464            let mut buffer = [Entry::default(); 5];
465            let count = rx
466                .async_io()
467                .0
468                .read_entries(&mut buffer[..])
469                .await
470                .expect("failed to read entries");
471            assert_eq!(count, elements.len());
472            assert_eq!(&buffer[..count], elements);
473        };
474        let ((), ()) = exec.run_singlethreaded(futures::future::join(write_fut, read_fut));
475    }
476
477    #[test]
478    fn read_one() {
479        let mut exec = TestExecutor::new();
480        let elements =
481            &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
482        let (tx, rx) = zx::Fifo::<Entry>::create(elements.len()).expect("failed to create zx fifo");
483        let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
484
485        let write_fut = async {
486            tx.async_io().1.write_entries(elements).await.expect("failed write entries");
487        };
488        let read_fut = async {
489            let (mut reader, _) = rx.async_io();
490            for e in elements {
491                let mut entry = Entry::default();
492                assert_eq!(reader.read_entries(&mut entry).await.expect("failed to read entry"), 1);
493                assert_eq!(&entry, e);
494            }
495        };
496        let ((), ()) = exec.run_singlethreaded(futures::future::join(write_fut, read_fut));
497    }
498
499    #[test]
500    fn maybe_uninit_single() {
501        let mut exec = TestExecutor::new();
502        let element = Entry { a: 10, b: 20 };
503        let (tx, rx) = zx::Fifo::<Entry>::create(1).expect("failed to create zx fifo");
504        let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
505
506        let write_fut = async {
507            tx.async_io().1.write_entries(&element).await.expect("failed write entries");
508        };
509        let read_fut = async {
510            let mut buffer = MaybeUninit::<Entry>::uninit();
511            let count =
512                rx.async_io().0.read_entries(&mut buffer).await.expect("failed to read entries");
513            assert_eq!(count, 1);
514            // SAFETY: We just read a new entry into the buffer.
515            let read = unsafe { buffer.assume_init() };
516            assert_eq!(read, element);
517        };
518        let ((), ()) = exec.run_singlethreaded(futures::future::join(write_fut, read_fut));
519    }
520
521    #[test]
522    fn maybe_uninit_slice() {
523        let mut exec = TestExecutor::new();
524        let elements =
525            &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
526        let (tx, rx) = zx::Fifo::<Entry>::create(elements.len()).expect("failed to create zx fifo");
527        let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
528
529        let write_fut = async {
530            tx.async_io().1.write_entries(elements).await.expect("failed write entries");
531        };
532        let read_fut = async {
533            // Use a larger buffer to show partial reads.
534            let mut buffer = [MaybeUninit::<Entry>::uninit(); 15];
535            let count = rx
536                .async_io()
537                .0
538                .read_entries(&mut buffer[..])
539                .await
540                .expect("failed to read entries");
541            assert_eq!(count, elements.len());
542            let read = &mut buffer[..count];
543            for (i, v) in read.iter_mut().enumerate() {
544                // SAFETY: This is the read region of the buffer, initialized by
545                // reading from the FIFO.
546                let read = unsafe { v.assume_init_ref() };
547                assert_eq!(read, &elements[i]);
548                // SAFETY: The buffer was partially initialized by reading from
549                // the FIFO, the correct thing to do here is to manually drop
550                // the elements that were initialized.
551                unsafe {
552                    v.assume_init_drop();
553                }
554            }
555        };
556        let ((), ()) = exec.run_singlethreaded(futures::future::join(write_fut, read_fut));
557    }
558}