zx/
stream.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Type-safe bindings for Zircon stream objects.
6
7use crate::{
8    AsHandleRef, HandleBased, HandleRef, NullableHandle, Property, PropertyQuery, Status, Vmo,
9    object_get_property, object_set_property, ok, sys,
10};
11use bitflags::bitflags;
12use std::io::SeekFrom;
13use std::mem::MaybeUninit;
14
15/// An object representing a Zircon [stream](https://fuchsia.dev/fuchsia-src/concepts/objects/stream.md).
16///
17/// As essentially a subtype of `NullableHandle`, it can be freely interconverted.
18#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
19#[repr(transparent)]
20pub struct Stream(NullableHandle);
21impl_handle_based!(Stream);
22
23bitflags! {
24    #[repr(transparent)]
25    #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
26    pub struct StreamOptions: u32 {
27        const MODE_READ = sys::ZX_STREAM_MODE_READ;
28        const MODE_WRITE = sys::ZX_STREAM_MODE_WRITE;
29        const MODE_APPEND = sys::ZX_STREAM_MODE_APPEND;
30    }
31}
32
33bitflags! {
34    #[repr(transparent)]
35    #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
36    pub struct StreamReadOptions: u32 {
37    }
38}
39
40bitflags! {
41    #[repr(transparent)]
42    #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
43    pub struct StreamWriteOptions: u32 {
44        const APPEND = sys::ZX_STREAM_APPEND;
45    }
46}
47
48impl Stream {
49    /// See [zx_stream_create](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_create)
50    pub fn create(options: StreamOptions, vmo: &Vmo, offset: u64) -> Result<Self, Status> {
51        let mut handle = 0;
52        let status =
53            unsafe { sys::zx_stream_create(options.bits(), vmo.raw_handle(), offset, &mut handle) };
54        ok(status)?;
55        unsafe { Ok(Stream::from(NullableHandle::from_raw(handle))) }
56    }
57
58    /// Wraps the
59    /// [`zx_stream_readv`](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_readv)
60    /// syscall.
61    ///
62    /// # Safety
63    ///
64    /// The caller is responsible for ensuring that the buffers in `iovecs` point to valid (albeit
65    /// not necessarily initialized) memory.
66    pub unsafe fn readv(
67        &self,
68        options: StreamReadOptions,
69        iovecs: &mut [sys::zx_iovec_t],
70    ) -> Result<usize, Status> {
71        let mut actual = 0;
72        let status = unsafe {
73            sys::zx_stream_readv(
74                self.raw_handle(),
75                options.bits(),
76                iovecs.as_mut_ptr(),
77                iovecs.len(),
78                &mut actual,
79            )
80        };
81        ok(status)?;
82        Ok(actual)
83    }
84
85    /// Attempts to read `buffer.len()` bytes from the stream starting at the stream's current seek
86    /// offset. Only the number of bytes read from the stream will be initialized in `buffer`.
87    /// Returns the number of bytes read from the stream.
88    ///
89    /// See [zx_stream_readv](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_readv).
90    pub fn read_uninit(
91        &self,
92        options: StreamReadOptions,
93        buffer: &mut [MaybeUninit<u8>],
94    ) -> Result<usize, Status> {
95        // TODO(https://fxbug.dev/42079723) use MaybeUninit::slice_as_mut_ptr when stable
96        let mut iovec =
97            [sys::zx_iovec_t { buffer: buffer.as_mut_ptr().cast::<u8>(), capacity: buffer.len() }];
98        // SAFETY: The buffer in `iovec` comes from a mutable slice so we know it's safe to pass it
99        // to `readv`.
100        unsafe { self.readv(options, &mut iovec) }
101    }
102
103    /// Attempts to read `length` bytes from the stream starting at the stream's current seek
104    /// offset. Returns the read bytes as a `Vec`.
105    ///
106    /// See [zx_stream_readv](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_readv).
107    pub fn read_to_vec(
108        &self,
109        options: StreamReadOptions,
110        length: usize,
111    ) -> Result<Vec<u8>, Status> {
112        let mut data = Vec::with_capacity(length);
113        let buffer = &mut data.spare_capacity_mut()[0..length];
114        let actual = self.read_uninit(options, buffer)?;
115        // SAFETY: read_uninit returns the number of bytes that were initialized.
116        unsafe { data.set_len(actual) };
117        Ok(data)
118    }
119
120    /// Wraps the
121    /// [`zx_stream_readv_at`](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_readv_at)
122    /// syscall.
123    ///
124    /// # Safety
125    ///
126    /// The caller is responsible for ensuring that the buffers in `iovecs` point to valid (albeit
127    /// not necessarily initialized) memory.
128    pub unsafe fn readv_at(
129        &self,
130        options: StreamReadOptions,
131        offset: u64,
132        iovecs: &mut [sys::zx_iovec_t],
133    ) -> Result<usize, Status> {
134        let mut actual = 0;
135        let status = unsafe {
136            sys::zx_stream_readv_at(
137                self.raw_handle(),
138                options.bits(),
139                offset,
140                iovecs.as_mut_ptr(),
141                iovecs.len(),
142                &mut actual,
143            )
144        };
145        ok(status)?;
146        Ok(actual)
147    }
148
149    /// Attempts to read `buffer.len()` bytes from the stream starting at `offset`. Only the number
150    /// of bytes read from the stream will be initialized in `buffer`. Returns the number of bytes
151    /// read from the stream.
152    ///
153    /// See
154    /// [zx_stream_readv_at](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_readv_at).
155    pub fn read_at_uninit(
156        &self,
157        options: StreamReadOptions,
158        offset: u64,
159        buffer: &mut [MaybeUninit<u8>],
160    ) -> Result<usize, Status> {
161        // TODO(https://fxbug.dev/42079723) Use MaybeUninit::slice_as_mut_ptr when stable.
162        let mut iovec =
163            [sys::zx_iovec_t { buffer: buffer.as_mut_ptr().cast::<u8>(), capacity: buffer.len() }];
164        // SAFETY: The buffer in `iovec` comes from a mutable slice so we know it's safe to pass it
165        // to `readv_at`.
166        unsafe { self.readv_at(options, offset, &mut iovec) }
167    }
168
169    /// Attempts to read `length` bytes from the stream starting at `offset`. Returns the read bytes
170    /// as a `Vec`.
171    ///
172    /// See
173    /// [zx_stream_readv_at](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_readv_at).
174    pub fn read_at_to_vec(
175        &self,
176        options: StreamReadOptions,
177        offset: u64,
178        length: usize,
179    ) -> Result<Vec<u8>, Status> {
180        let mut data = Vec::with_capacity(length);
181        let buffer = &mut data.spare_capacity_mut()[0..length];
182        let actual = self.read_at_uninit(options, offset, buffer)?;
183        // SAFETY: read_at_uninit returns the number of bytes that were initialized.
184        unsafe { data.set_len(actual) };
185        Ok(data)
186    }
187
188    /// See [zx_stream_seek](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_seek)
189    pub fn seek(&self, pos: SeekFrom) -> Result<u64, Status> {
190        let (whence, offset) = match pos {
191            SeekFrom::Start(start) => (
192                sys::ZX_STREAM_SEEK_ORIGIN_START,
193                start.try_into().map_err(|_| Status::OUT_OF_RANGE)?,
194            ),
195            SeekFrom::End(end) => (sys::ZX_STREAM_SEEK_ORIGIN_END, end),
196            SeekFrom::Current(current) => (sys::ZX_STREAM_SEEK_ORIGIN_CURRENT, current),
197        };
198        let mut pos = 0;
199        let status = unsafe { sys::zx_stream_seek(self.raw_handle(), whence, offset, &mut pos) };
200        ok(status)?;
201        Ok(pos)
202    }
203
204    /// Wraps the
205    /// [`zx_stream_writev`](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_writev)
206    /// syscall.
207    pub fn writev(
208        &self,
209        options: StreamWriteOptions,
210        iovecs: &[sys::zx_iovec_t],
211    ) -> Result<usize, Status> {
212        let mut actual = 0;
213        let status = unsafe {
214            sys::zx_stream_writev(
215                self.raw_handle(),
216                options.bits(),
217                iovecs.as_ptr(),
218                iovecs.len(),
219                &mut actual,
220            )
221        };
222        ok(status)?;
223        Ok(actual)
224    }
225
226    /// Writes `buffer` to the stream at the stream's current seek offset. Returns the number of
227    /// bytes written.
228    ///
229    /// See [zx_stream_writev](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_writev).
230    pub fn write(&self, options: StreamWriteOptions, buffer: &[u8]) -> Result<usize, Status> {
231        let iovec = [sys::zx_iovec_t { buffer: buffer.as_ptr(), capacity: buffer.len() }];
232        self.writev(options, &iovec)
233    }
234
235    /// Wraps the
236    /// [`zx_stream_writev_at`](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_writev_at)
237    /// syscall.
238    pub fn writev_at(
239        &self,
240        options: StreamWriteOptions,
241        offset: u64,
242        iovecs: &[sys::zx_iovec_t],
243    ) -> Result<usize, Status> {
244        let mut actual = 0;
245        let status = unsafe {
246            sys::zx_stream_writev_at(
247                self.raw_handle(),
248                options.bits(),
249                offset,
250                iovecs.as_ptr(),
251                iovecs.len(),
252                &mut actual,
253            )
254        };
255        ok(status)?;
256        Ok(actual)
257    }
258
259    /// Writes `buffer` to the stream at `offset``. Returns the number of bytes written.
260    ///
261    /// See
262    /// [zx_stream_writev_at](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_writev_at).
263    pub fn write_at(
264        &self,
265        options: StreamWriteOptions,
266        offset: u64,
267        buffer: &[u8],
268    ) -> Result<usize, Status> {
269        let iovec = [sys::zx_iovec_t { buffer: buffer.as_ptr(), capacity: buffer.len() }];
270        self.writev_at(options, offset, &iovec)
271    }
272}
273
274unsafe_handle_properties!(object: Stream,
275    props: [
276        {query_ty: STREAM_MODE_APPEND, tag: StreamModeAppendTag, prop_ty: u8, get: get_mode_append, set: set_mode_append},
277    ]
278);
279
280impl std::io::Read for Stream {
281    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
282        let mut iovec = [sys::zx_iovec_t { buffer: buf.as_mut_ptr(), capacity: buf.len() }];
283        // SAFETY: The buffer in `iovec` comes from a mutable slice so we know it's safe to pass it
284        // to `readv`.
285        Ok(unsafe { self.readv(StreamReadOptions::empty(), &mut iovec) }?)
286    }
287
288    fn read_vectored(&mut self, bufs: &mut [std::io::IoSliceMut<'_>]) -> std::io::Result<usize> {
289        // SAFETY: `zx_iovec_t` and `IoSliceMut` have the same layout.
290        let mut iovecs = unsafe {
291            std::slice::from_raw_parts_mut(bufs.as_mut_ptr().cast::<sys::zx_iovec_t>(), bufs.len())
292        };
293        // SAFETY: `IoSliceMut` can only be constructed from a mutable slice so we know it's safe to
294        // pass to `readv`.
295        Ok(unsafe { self.readv(StreamReadOptions::empty(), &mut iovecs) }?)
296    }
297}
298
299impl std::io::Seek for Stream {
300    fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
301        Ok(Self::seek(&self, pos)? as u64)
302    }
303}
304
305impl std::io::Write for Stream {
306    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
307        Ok(Self::write(&self, StreamWriteOptions::empty(), buf)?)
308    }
309
310    fn write_vectored(&mut self, bufs: &[std::io::IoSlice<'_>]) -> std::io::Result<usize> {
311        // SAFETY: `zx_iovec_t` and `IoSliceMut` have the same layout.
312        let iovecs = unsafe {
313            std::slice::from_raw_parts(bufs.as_ptr().cast::<sys::zx_iovec_t>(), bufs.len())
314        };
315        Ok(self.writev(StreamWriteOptions::empty(), &iovecs)?)
316    }
317
318    fn flush(&mut self) -> std::io::Result<()> {
319        Ok(())
320    }
321}
322
323#[cfg(test)]
324mod tests {
325    use super::*;
326    use crate as zx;
327
328    #[test]
329    fn create() {
330        let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
331
332        let stream =
333            Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
334
335        let basic_info = stream.basic_info().unwrap();
336        assert_eq!(basic_info.object_type, zx::ObjectType::STREAM);
337        assert!(basic_info.rights.contains(zx::Rights::READ));
338        assert!(basic_info.rights.contains(zx::Rights::WRITE));
339    }
340
341    #[test]
342    fn create_readonly() {
343        let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
344
345        let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
346
347        let basic_info = stream.basic_info().unwrap();
348        assert!(basic_info.rights.contains(zx::Rights::READ));
349        assert!(!basic_info.rights.contains(zx::Rights::WRITE));
350    }
351
352    #[test]
353    fn create_offset() {
354        let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
355        let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 24).unwrap();
356        assert_eq!(stream.seek(SeekFrom::Current(0)).unwrap(), 24);
357    }
358
359    #[test]
360    fn create_with_mode_append() {
361        let size: u64 = zx::system_get_page_size().into();
362        let vmo = zx::Vmo::create(size).unwrap();
363        let stream =
364            Stream::create(StreamOptions::MODE_WRITE | StreamOptions::MODE_APPEND, &vmo, 0)
365                .unwrap();
366        assert_eq!(stream.get_mode_append().unwrap(), 1);
367    }
368
369    #[test]
370    fn get_and_set_mode_append() {
371        let size: u64 = zx::system_get_page_size().into();
372        let vmo = zx::Vmo::create(size).unwrap();
373        let stream = Stream::create(StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
374        assert_eq!(stream.get_mode_append().unwrap(), 0);
375        stream.set_mode_append(&1).unwrap();
376        assert_eq!(stream.get_mode_append().unwrap(), 1);
377        stream.set_mode_append(&0).unwrap();
378        assert_eq!(stream.get_mode_append().unwrap(), 0);
379    }
380
381    #[test]
382    fn read_uninit() {
383        const DATA: &'static [u8] = b"vmo-contents";
384        let vmo = zx::Vmo::create(DATA.len() as u64).unwrap();
385        vmo.write(DATA, 0).unwrap();
386        let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
387
388        // Read from the stream.
389        let mut data = Vec::with_capacity(5);
390        let bytes_read =
391            stream.read_uninit(StreamReadOptions::empty(), data.spare_capacity_mut()).unwrap();
392        assert_eq!(bytes_read, 5);
393        unsafe { data.set_len(5) };
394        assert_eq!(data, DATA[0..5]);
395
396        // Try to read more data than is available in the stream.
397        let mut data = Vec::with_capacity(10);
398        let bytes_read =
399            stream.read_uninit(StreamReadOptions::empty(), data.spare_capacity_mut()).unwrap();
400        assert_eq!(bytes_read, 7);
401        unsafe { data.set_len(7) };
402        assert_eq!(data, DATA[5..]);
403
404        // Try to read at the end of the stream.
405        let mut data = Vec::with_capacity(10);
406        let bytes_read =
407            stream.read_uninit(StreamReadOptions::empty(), data.spare_capacity_mut()).unwrap();
408        assert_eq!(bytes_read, 0);
409    }
410
411    #[test]
412    fn read_to_vec() {
413        const DATA: &'static [u8] = b"vmo-contents";
414        let vmo = zx::Vmo::create(DATA.len() as u64).unwrap();
415        vmo.write(DATA, 0).unwrap();
416        let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
417
418        let data = stream.read_to_vec(StreamReadOptions::empty(), DATA.len()).unwrap();
419        assert_eq!(data, DATA);
420    }
421
422    #[test]
423    fn read_at_uninit() {
424        const DATA: &'static [u8] = b"vmo-contents";
425        let vmo = zx::Vmo::create(DATA.len() as u64).unwrap();
426        vmo.write(DATA, 0).unwrap();
427        let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
428
429        // Read from the stream.
430        let mut data = Vec::with_capacity(5);
431        let bytes_read = stream
432            .read_at_uninit(StreamReadOptions::empty(), 0, data.spare_capacity_mut())
433            .unwrap();
434        assert_eq!(bytes_read, 5);
435        unsafe { data.set_len(5) };
436        assert_eq!(data, DATA[0..5]);
437
438        // Try to read beyond the end of the stream.
439        let mut data = Vec::with_capacity(10);
440        let bytes_read = stream
441            .read_at_uninit(StreamReadOptions::empty(), 5, data.spare_capacity_mut())
442            .unwrap();
443        assert_eq!(bytes_read, 7);
444        unsafe { data.set_len(7) };
445        assert_eq!(data, DATA[5..]);
446
447        // Try to read starting beyond the end of the stream.
448        let mut data = Vec::with_capacity(10);
449        let bytes_read = stream
450            .read_at_uninit(StreamReadOptions::empty(), 20, data.spare_capacity_mut())
451            .unwrap();
452        assert_eq!(bytes_read, 0);
453    }
454
455    #[test]
456    fn read_at_to_vec() {
457        const DATA: &'static [u8] = b"vmo-contents";
458        let vmo = zx::Vmo::create(DATA.len() as u64).unwrap();
459        vmo.write(DATA, 0).unwrap();
460        let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
461
462        let data = stream.read_at_to_vec(StreamReadOptions::empty(), 5, DATA.len()).unwrap();
463        assert_eq!(data, DATA[5..]);
464    }
465
466    #[test]
467    fn write() {
468        const DATA: &'static [u8] = b"vmo-contents";
469        let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::UNBOUNDED, 0).unwrap();
470        let stream =
471            Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
472
473        let bytes_written = stream.write(zx::StreamWriteOptions::empty(), DATA).unwrap();
474        assert_eq!(bytes_written, DATA.len());
475
476        let data = stream.read_at_to_vec(StreamReadOptions::empty(), 0, DATA.len()).unwrap();
477        assert_eq!(data, DATA);
478    }
479
480    #[test]
481    fn write_at() {
482        const DATA: &'static [u8] = b"vmo-contents";
483        let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::UNBOUNDED, 0).unwrap();
484        let stream =
485            Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
486
487        let bytes_written =
488            stream.write_at(zx::StreamWriteOptions::empty(), 0, &DATA[0..3]).unwrap();
489        assert_eq!(bytes_written, 3);
490
491        let bytes_written =
492            stream.write_at(zx::StreamWriteOptions::empty(), 3, &DATA[3..]).unwrap();
493        assert_eq!(bytes_written, DATA.len() - 3);
494
495        let data = stream.read_at_to_vec(StreamReadOptions::empty(), 0, DATA.len()).unwrap();
496        assert_eq!(data, DATA);
497    }
498
499    #[test]
500    fn std_io_read_write_seek() {
501        const DATA: &'static str = "stream-contents";
502        let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::UNBOUNDED, 0).unwrap();
503        let mut stream =
504            Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
505
506        std::io::Write::write_all(&mut stream, DATA.as_bytes()).unwrap();
507        assert_eq!(std::io::Seek::stream_position(&mut stream).unwrap(), DATA.len() as u64);
508        std::io::Seek::rewind(&mut stream).unwrap();
509        assert_eq!(std::io::read_to_string(&mut stream).unwrap(), DATA);
510        assert_eq!(std::io::Seek::stream_position(&mut stream).unwrap(), DATA.len() as u64);
511    }
512
513    #[test]
514    fn std_io_read_vectored() {
515        const DATA: &'static [u8] = b"stream-contents";
516        let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::UNBOUNDED, 0).unwrap();
517        let mut stream =
518            Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
519        assert_eq!(stream.write(StreamWriteOptions::empty(), DATA).unwrap(), DATA.len());
520        std::io::Seek::rewind(&mut stream).unwrap();
521
522        let mut buf1 = [0; 6];
523        let mut buf2 = [0; 1];
524        let mut buf3 = [0; 8];
525        let mut bufs = [
526            std::io::IoSliceMut::new(&mut buf1),
527            std::io::IoSliceMut::new(&mut buf2),
528            std::io::IoSliceMut::new(&mut buf3),
529        ];
530        assert_eq!(std::io::Read::read_vectored(&mut stream, &mut bufs).unwrap(), DATA.len());
531        assert_eq!(buf1, DATA[0..6]);
532        assert_eq!(buf2, DATA[6..7]);
533        assert_eq!(buf3, DATA[7..]);
534    }
535
536    #[test]
537    fn std_io_write_vectored() {
538        let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::UNBOUNDED, 0).unwrap();
539        let mut stream =
540            Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
541
542        let bufs = [
543            std::io::IoSlice::new(b"stream"),
544            std::io::IoSlice::new(b"-"),
545            std::io::IoSlice::new(b"contents"),
546        ];
547        assert_eq!(std::io::Write::write_vectored(&mut stream, &bufs).unwrap(), 15);
548        std::io::Seek::rewind(&mut stream).unwrap();
549        assert_eq!(stream.read_to_vec(StreamReadOptions::empty(), 15).unwrap(), b"stream-contents");
550    }
551}