Skip to main content

fuchsia_async/handle/zircon/
fifo.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::rwhandle::{RWHandle, ReadableHandle as _, WritableHandle as _};
6use futures::ready;
7use std::fmt;
8use std::future::poll_fn;
9use std::mem::MaybeUninit;
10use std::task::{Context, Poll};
11use zerocopy::{FromBytes, Immutable, IntoBytes};
12use zx::{self as zx, AsHandleRef};
13
14/// Marker trait for types that can be read/written with a `Fifo`.
15///
16/// An implementation is provided for all types that implement
17/// [`IntoBytes`], [`FromBytes`], and [`Immutable`].
18pub trait FifoEntry: IntoBytes + FromBytes + Immutable {}
19
20impl<O: IntoBytes + FromBytes + Immutable> FifoEntry for O {}
21
22/// A buffer used to write `T` into [`Fifo`] objects.
23pub trait FifoWriteBuffer<T> {
24    fn as_slice(&self) -> &[T];
25}
26
27/// A buffer used to read `T` from [`Fifo`] objects.
28///
29/// # Safety
30///
31/// This trait is unsafe because the compiler cannot verify a correct
32/// implementation of `as_bytes_ptr_mut`. See
33/// [`FifoReadBuffer::as_bytes_ptr_mut`] for safety notes.
34pub unsafe trait FifoReadBuffer<T> {
35    /// Returns the number of slots available in the buffer to be rceived.
36    fn count(&self) -> usize;
37    /// Returns a mutable pointer to the buffer contents where FIFO entries must
38    /// be written into.
39    ///
40    /// # Safety
41    ///
42    /// The returned memory *must* be at least `count() * sizeof<T>()` bytes
43    /// long.
44    fn as_mut_ptr(&mut self) -> *mut T;
45}
46
47impl<T: FifoEntry, const N: usize> FifoWriteBuffer<T> for [T; N] {
48    fn as_slice(&self) -> &[T] {
49        self
50    }
51}
52
53impl<T: FifoEntry> FifoWriteBuffer<T> for [T] {
54    fn as_slice(&self) -> &[T] {
55        self
56    }
57}
58
59unsafe impl<T: FifoEntry, const N: usize> FifoReadBuffer<T> for [T; N] {
60    fn count(&self) -> usize {
61        N
62    }
63
64    fn as_mut_ptr(&mut self) -> *mut T {
65        self.as_mut_slice().as_mut_ptr()
66    }
67}
68
69unsafe impl<T: FifoEntry> FifoReadBuffer<T> for [T] {
70    fn count(&self) -> usize {
71        self.len()
72    }
73
74    fn as_mut_ptr(&mut self) -> *mut T {
75        self.as_mut_ptr()
76    }
77}
78
79impl<T: FifoEntry> FifoWriteBuffer<T> for T {
80    fn as_slice(&self) -> &[T] {
81        std::slice::from_ref(self)
82    }
83}
84
85unsafe impl<T: FifoEntry> FifoReadBuffer<T> for T {
86    fn count(&self) -> usize {
87        1
88    }
89
90    fn as_mut_ptr(&mut self) -> *mut T {
91        self as *mut T
92    }
93}
94
95unsafe impl<T: FifoEntry> FifoReadBuffer<T> for MaybeUninit<T> {
96    fn count(&self) -> usize {
97        1
98    }
99
100    fn as_mut_ptr(&mut self) -> *mut T {
101        self.as_mut_ptr()
102    }
103}
104
105unsafe impl<T: FifoEntry> FifoReadBuffer<T> for [MaybeUninit<T>] {
106    fn count(&self) -> usize {
107        self.len()
108    }
109
110    fn as_mut_ptr(&mut self) -> *mut T {
111        // TODO(https://github.com/rust-lang/rust/issues/63569): Use
112        // `MaybeUninit::slice_as_mut_ptr` once stable.
113        self.as_mut_ptr() as *mut T
114    }
115}
116
117/// An I/O object representing a `Fifo`.
118pub struct Fifo<R, W = R> {
119    handle: RWHandle<zx::Fifo<R, W>>,
120}
121
122impl<R: FromBytes + IntoBytes, W: FromBytes + IntoBytes> AsRef<zx::Fifo<R, W>> for Fifo<R, W> {
123    fn as_ref(&self) -> &zx::Fifo<R, W> {
124        self.handle.get_ref()
125    }
126}
127
128impl<R: FromBytes + IntoBytes, W: FromBytes + IntoBytes> AsHandleRef for Fifo<R, W> {
129    fn as_handle_ref(&self) -> zx::HandleRef<'_> {
130        self.handle.get_ref().as_handle_ref()
131    }
132}
133
134impl<R: FromBytes + IntoBytes, W: FromBytes + IntoBytes> From<Fifo<R, W>> for zx::Fifo<R, W> {
135    fn from(fifo: Fifo<R, W>) -> zx::Fifo<R, W> {
136        fifo.handle.into_inner()
137    }
138}
139
140impl<R: FromBytes + IntoBytes, W: FromBytes + IntoBytes> Fifo<R, W> {
141    /// Creates a new `Fifo` from a previously-created `zx::Fifo`.
142    ///
143    /// # Panics
144    ///
145    /// If called on a thread that does not have a current async executor.
146    pub fn from_fifo(fifo: impl Into<zx::Fifo<R, W>>) -> Self {
147        Fifo { handle: RWHandle::new(fifo.into()) }
148    }
149
150    /// Writes entries to the fifo and registers this `Fifo` as needing a write on receiving a
151    /// `zx::Status::SHOULD_WAIT`.
152    ///
153    /// Returns the number of elements processed.
154    ///
155    /// NOTE: Only one writer is supported; this will overwrite any waker registered with a previous
156    /// invocation to `try_write`.
157    pub fn try_write<B: ?Sized + FifoWriteBuffer<W>>(
158        &self,
159        cx: &mut Context<'_>,
160        entries: &B,
161    ) -> Poll<Result<usize, zx::Status>> {
162        ready!(self.handle.poll_writable(cx)?);
163
164        let entries = entries.as_slice();
165        let fifo = self.as_ref();
166        // SAFETY: Safety relies on us keeping the slice alive over the call to `write_raw`, which
167        // we do.
168        loop {
169            let result = unsafe { fifo.write_raw(entries.as_ptr(), entries.len()) };
170            match result {
171                Err(zx::Status::SHOULD_WAIT) => ready!(self.handle.need_writable(cx)?),
172                Err(e) => return Poll::Ready(Err(e)),
173                Ok(count) => return Poll::Ready(Ok(count)),
174            }
175        }
176    }
177
178    /// Reads entries from the fifo into `entries` and registers this `Fifo` as needing a read on
179    /// receiving a `zx::Status::SHOULD_WAIT`.
180    ///
181    /// NOTE: Only one reader is supported; this will overwrite any waker registered with a previous
182    /// invocation to `try_read`.
183    pub fn try_read<B: ?Sized + FifoReadBuffer<R>>(
184        &self,
185        cx: &mut Context<'_>,
186        entries: &mut B,
187    ) -> Poll<Result<usize, zx::Status>> {
188        ready!(self.handle.poll_readable(cx)?);
189
190        let buf = entries.as_mut_ptr();
191        let count = entries.count();
192        let fifo = self.as_ref();
193
194        loop {
195            // SAFETY: Safety relies on the pointer returned by `B` being valid,
196            // which itself depends on a correct implementation of `FifoEntry` for
197            // `R`.
198            let result = unsafe { fifo.read_raw(buf, count) };
199
200            match result {
201                Err(zx::Status::SHOULD_WAIT) => ready!(self.handle.need_readable(cx)?),
202                Err(e) => return Poll::Ready(Err(e)),
203                Ok(count) => return Poll::Ready(Ok(count)),
204            }
205        }
206    }
207
208    /// Returns a reader and writer which have async functions that can be used to read and write
209    /// requests.
210    pub fn async_io(&mut self) -> (FifoReader<'_, R, W>, FifoWriter<'_, R, W>) {
211        (FifoReader(self), FifoWriter(self))
212    }
213}
214
215pub struct FifoWriter<'a, R, W>(&'a Fifo<R, W>);
216
217impl<R: FifoEntry, W: FifoEntry> FifoWriter<'_, R, W> {
218    /// NOTE: If this future is dropped or there is an error, there is no indication how many
219    /// entries were successfully written.
220    pub async fn write_entries(
221        &mut self,
222        entries: &(impl ?Sized + FifoWriteBuffer<W>),
223    ) -> Result<(), zx::Status> {
224        let mut entries = entries.as_slice();
225        poll_fn(|cx| {
226            while !entries.is_empty() {
227                match ready!(self.0.try_write(cx, entries)) {
228                    Ok(count) => entries = &entries[count..],
229                    Err(status) => return Poll::Ready(Err(status)),
230                }
231            }
232            Poll::Ready(Ok(()))
233        })
234        .await
235    }
236
237    /// Same as Fifo::try_write.
238    pub fn try_write<B: ?Sized + FifoWriteBuffer<W>>(
239        &mut self,
240        cx: &mut Context<'_>,
241        entries: &B,
242    ) -> Poll<Result<usize, zx::Status>> {
243        self.0.try_write(cx, entries)
244    }
245}
246
247pub struct FifoReader<'a, R, W>(&'a Fifo<R, W>);
248
249impl<R: FifoEntry, W: FifoEntry> FifoReader<'_, R, W> {
250    pub async fn read_entries(
251        &mut self,
252        entries: &mut (impl ?Sized + FifoReadBuffer<R>),
253    ) -> Result<usize, zx::Status> {
254        poll_fn(|cx| self.0.try_read(cx, entries)).await
255    }
256
257    /// Same as Fifo::try_read.
258    pub fn try_read<B: ?Sized + FifoReadBuffer<R>>(
259        &mut self,
260        cx: &mut Context<'_>,
261        entries: &mut B,
262    ) -> Poll<Result<usize, zx::Status>> {
263        self.0.try_read(cx, entries)
264    }
265}
266
267impl<R: FromBytes + IntoBytes, W: FromBytes + IntoBytes> fmt::Debug for Fifo<R, W> {
268    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
269        self.handle.get_ref().fmt(f)
270    }
271}
272
273#[cfg(test)]
274mod tests {
275    use super::*;
276    use crate::{DurationExt, TestExecutor, Timer};
277    use futures::future::try_join;
278    use futures::prelude::*;
279    use zerocopy::{Immutable, KnownLayout};
280
281    #[derive(
282        Copy, Clone, Debug, PartialEq, Eq, Default, IntoBytes, KnownLayout, FromBytes, Immutable,
283    )]
284    #[repr(C)]
285    struct Entry {
286        a: u32,
287        b: u32,
288    }
289
290    #[derive(
291        Clone, Debug, PartialEq, Eq, Default, IntoBytes, KnownLayout, FromBytes, Immutable,
292    )]
293    #[repr(C)]
294    struct WrongEntry {
295        a: u16,
296    }
297
298    #[test]
299    fn can_read_write() {
300        let mut exec = TestExecutor::new();
301        let element = Entry { a: 10, b: 20 };
302
303        let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
304        let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
305        let (_, mut tx) = tx.async_io();
306        let (mut rx, _) = rx.async_io();
307
308        let mut buffer = Entry::default();
309        let receiver = rx.read_entries(&mut buffer).map_ok(|count| {
310            assert_eq!(count, 1);
311        });
312
313        // Sends an entry after the timeout has passed
314        let sender = Timer::new(zx::MonotonicDuration::from_millis(10).after_now())
315            .then(|()| tx.write_entries(&element));
316
317        let done = try_join(receiver, sender);
318        exec.run_singlethreaded(done).expect("failed to run receive future on executor");
319        assert_eq!(buffer, element);
320    }
321
322    #[test]
323    fn read_wrong_size() {
324        let mut exec = TestExecutor::new();
325        let elements = &[Entry { a: 10, b: 20 }][..];
326
327        let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
328        let wrong_rx = zx::Fifo::<WrongEntry>::from(rx.into_handle());
329        let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(wrong_rx));
330        let (_, mut tx) = tx.async_io();
331        let (mut rx, _) = rx.async_io();
332
333        let mut buffer = WrongEntry::default();
334        let receiver = rx
335            .read_entries(&mut buffer)
336            .map_ok(|count| panic!("read should have failed, got {count}"));
337
338        // Sends an entry after the timeout has passed
339        let sender = Timer::new(zx::MonotonicDuration::from_millis(10).after_now())
340            .then(|()| tx.write_entries(elements));
341
342        let done = try_join(receiver, sender);
343        let res = exec.run_singlethreaded(done);
344        match res {
345            Err(zx::Status::OUT_OF_RANGE) => (),
346            _ => panic!("did not get out-of-range error"),
347        }
348    }
349
350    #[test]
351    fn write_wrong_size() {
352        let mut exec = TestExecutor::new();
353        let elements = &[WrongEntry { a: 10 }][..];
354
355        let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
356        let wrong_tx = zx::Fifo::<WrongEntry>::from(tx.into_handle());
357        let wrong_rx = zx::Fifo::<WrongEntry>::from(rx.into_handle());
358        let (mut tx, _rx) = (Fifo::from_fifo(wrong_tx), Fifo::from_fifo(wrong_rx));
359        let (_, mut tx) = tx.async_io();
360
361        let sender = Timer::new(zx::MonotonicDuration::from_millis(10).after_now())
362            .then(|()| tx.write_entries(elements));
363
364        let res = exec.run_singlethreaded(sender);
365        match res {
366            Err(zx::Status::OUT_OF_RANGE) => (),
367            _ => panic!("did not get out-of-range error"),
368        }
369    }
370
371    #[test]
372    fn write_into_full() {
373        use std::sync::atomic::{AtomicUsize, Ordering};
374
375        let mut exec = TestExecutor::new();
376        let elements =
377            &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
378
379        let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
380        let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
381
382        // Use `writes_completed` to verify that not all writes
383        // are transmitted at once, and the last write is actually blocked.
384        let writes_completed = AtomicUsize::new(0);
385        let sender = async {
386            let (_, mut writer) = tx.async_io();
387            writer.write_entries(&elements[..2]).await?;
388            writes_completed.fetch_add(1, Ordering::SeqCst);
389            writer.write_entries(&elements[2..]).await?;
390            writes_completed.fetch_add(1, Ordering::SeqCst);
391            Ok::<(), zx::Status>(())
392        };
393
394        // Wait 10 ms, then read the messages from the fifo.
395        let receiver = async {
396            Timer::new(zx::MonotonicDuration::from_millis(10).after_now()).await;
397            let mut buffer = Entry::default();
398            let (mut reader, _) = rx.async_io();
399            let count = reader.read_entries(&mut buffer).await?;
400            assert_eq!(writes_completed.load(Ordering::SeqCst), 1);
401            assert_eq!(count, 1);
402            assert_eq!(buffer, elements[0]);
403            let count = reader.read_entries(&mut buffer).await?;
404            // At this point, the last write may or may not have
405            // been written.
406            assert_eq!(count, 1);
407            assert_eq!(buffer, elements[1]);
408            let count = reader.read_entries(&mut buffer).await?;
409            assert_eq!(writes_completed.load(Ordering::SeqCst), 2);
410            assert_eq!(count, 1);
411            assert_eq!(buffer, elements[2]);
412            Ok::<(), zx::Status>(())
413        };
414
415        let done = try_join(receiver, sender);
416
417        exec.run_singlethreaded(done).expect("failed to run receive future on executor");
418    }
419
420    #[test]
421    fn write_more_than_full() {
422        let mut exec = TestExecutor::new();
423        let elements =
424            &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
425
426        let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
427        let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
428        let (_, mut tx) = tx.async_io();
429        let (mut rx, _) = rx.async_io();
430
431        let sender = tx.write_entries(elements);
432
433        // Wait 10 ms, then read the messages from the fifo.
434        let receiver = async {
435            Timer::new(zx::MonotonicDuration::from_millis(10).after_now()).await;
436            for e in elements {
437                let mut buffer = [Entry::default(); 1];
438                let count = rx.read_entries(&mut buffer[..]).await?;
439                assert_eq!(count, 1);
440                assert_eq!(&buffer[0], e);
441            }
442            Ok::<(), zx::Status>(())
443        };
444
445        let done = try_join(receiver, sender);
446
447        exec.run_singlethreaded(done).expect("failed to run receive future on executor");
448    }
449
450    #[test]
451    fn read_multiple() {
452        let mut exec = TestExecutor::new();
453        let elements =
454            &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
455        let (tx, rx) = zx::Fifo::<Entry>::create(elements.len()).expect("failed to create zx fifo");
456        let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
457
458        let write_fut = async {
459            tx.async_io().1.write_entries(elements).await.expect("failed write entries");
460        };
461        let read_fut = async {
462            // Use a larger buffer to show partial reads.
463            let mut buffer = [Entry::default(); 5];
464            let count = rx
465                .async_io()
466                .0
467                .read_entries(&mut buffer[..])
468                .await
469                .expect("failed to read entries");
470            assert_eq!(count, elements.len());
471            assert_eq!(&buffer[..count], elements);
472        };
473        let ((), ()) = exec.run_singlethreaded(futures::future::join(write_fut, read_fut));
474    }
475
476    #[test]
477    fn read_one() {
478        let mut exec = TestExecutor::new();
479        let elements =
480            &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
481        let (tx, rx) = zx::Fifo::<Entry>::create(elements.len()).expect("failed to create zx fifo");
482        let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
483
484        let write_fut = async {
485            tx.async_io().1.write_entries(elements).await.expect("failed write entries");
486        };
487        let read_fut = async {
488            let (mut reader, _) = rx.async_io();
489            for e in elements {
490                let mut entry = Entry::default();
491                assert_eq!(reader.read_entries(&mut entry).await.expect("failed to read entry"), 1);
492                assert_eq!(&entry, e);
493            }
494        };
495        let ((), ()) = exec.run_singlethreaded(futures::future::join(write_fut, read_fut));
496    }
497
498    #[test]
499    fn maybe_uninit_single() {
500        let mut exec = TestExecutor::new();
501        let element = Entry { a: 10, b: 20 };
502        let (tx, rx) = zx::Fifo::<Entry>::create(1).expect("failed to create zx fifo");
503        let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
504
505        let write_fut = async {
506            tx.async_io().1.write_entries(&element).await.expect("failed write entries");
507        };
508        let read_fut = async {
509            let mut buffer = MaybeUninit::<Entry>::uninit();
510            let count =
511                rx.async_io().0.read_entries(&mut buffer).await.expect("failed to read entries");
512            assert_eq!(count, 1);
513            // SAFETY: We just read a new entry into the buffer.
514            let read = unsafe { buffer.assume_init() };
515            assert_eq!(read, element);
516        };
517        let ((), ()) = exec.run_singlethreaded(futures::future::join(write_fut, read_fut));
518    }
519
520    #[test]
521    fn maybe_uninit_slice() {
522        let mut exec = TestExecutor::new();
523        let elements =
524            &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
525        let (tx, rx) = zx::Fifo::<Entry>::create(elements.len()).expect("failed to create zx fifo");
526        let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
527
528        let write_fut = async {
529            tx.async_io().1.write_entries(elements).await.expect("failed write entries");
530        };
531        let read_fut = async {
532            // Use a larger buffer to show partial reads.
533            let mut buffer = [MaybeUninit::<Entry>::uninit(); 15];
534            let count = rx
535                .async_io()
536                .0
537                .read_entries(&mut buffer[..])
538                .await
539                .expect("failed to read entries");
540            assert_eq!(count, elements.len());
541            let read = &mut buffer[..count];
542            for (i, v) in read.iter_mut().enumerate() {
543                // SAFETY: This is the read region of the buffer, initialized by
544                // reading from the FIFO.
545                let read = unsafe { v.assume_init_ref() };
546                assert_eq!(read, &elements[i]);
547                // SAFETY: The buffer was partially initialized by reading from
548                // the FIFO, the correct thing to do here is to manually drop
549                // the elements that were initialized.
550                unsafe {
551                    v.assume_init_drop();
552                }
553            }
554        };
555        let ((), ()) = exec.run_singlethreaded(futures::future::join(write_fut, read_fut));
556    }
557}