zx/
stream.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Type-safe bindings for Zircon stream objects.
6
7use crate::{
8    AsHandleRef, HandleBased, HandleRef, NullableHandle, Property, PropertyQuery, Status, Vmo,
9    object_get_property, object_set_property, ok, sys,
10};
11use bitflags::bitflags;
12use std::io::SeekFrom;
13use std::mem::MaybeUninit;
14
15/// An object representing a Zircon [stream](https://fuchsia.dev/fuchsia-src/concepts/objects/stream.md).
16///
17/// As essentially a subtype of `NullableHandle`, it can be freely interconverted.
18#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
19#[repr(transparent)]
20pub struct Stream(NullableHandle);
21impl_handle_based!(Stream);
22
23bitflags! {
24    #[repr(transparent)]
25    #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
26    pub struct StreamOptions: u32 {
27        const MODE_READ = sys::ZX_STREAM_MODE_READ;
28        const MODE_WRITE = sys::ZX_STREAM_MODE_WRITE;
29        const MODE_APPEND = sys::ZX_STREAM_MODE_APPEND;
30    }
31}
32
33bitflags! {
34    #[repr(transparent)]
35    #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
36    pub struct StreamReadOptions: u32 {
37    }
38}
39
40bitflags! {
41    #[repr(transparent)]
42    #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
43    pub struct StreamWriteOptions: u32 {
44        const APPEND = sys::ZX_STREAM_APPEND;
45    }
46}
47
48impl Stream {
49    /// See [zx_stream_create](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_create)
50    pub fn create(options: StreamOptions, vmo: &Vmo, offset: u64) -> Result<Self, Status> {
51        let mut handle = 0;
52        let status =
53            unsafe { sys::zx_stream_create(options.bits(), vmo.raw_handle(), offset, &mut handle) };
54        ok(status)?;
55        unsafe { Ok(Stream::from(NullableHandle::from_raw(handle))) }
56    }
57
58    /// Wraps the
59    /// [`zx_stream_readv`](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_readv)
60    /// syscall.
61    ///
62    /// # Safety
63    ///
64    /// The caller is responsible for ensuring that the buffers in `iovecs` point to valid (albeit
65    /// not necessarily initialized) memory.
66    pub unsafe fn readv(
67        &self,
68        options: StreamReadOptions,
69        iovecs: &mut [sys::zx_iovec_t],
70    ) -> Result<usize, Status> {
71        let mut actual = 0;
72        let status = unsafe {
73            sys::zx_stream_readv(
74                self.raw_handle(),
75                options.bits(),
76                iovecs.as_mut_ptr(),
77                iovecs.len(),
78                &mut actual,
79            )
80        };
81        ok(status)?;
82        Ok(actual)
83    }
84
85    /// Attempts to read `buffer.len()` bytes from the stream starting at the stream's current seek
86    /// offset. Only the number of bytes read from the stream will be initialized in `buffer`.
87    /// Returns the number of bytes read from the stream.
88    ///
89    /// See [zx_stream_readv](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_readv).
90    pub fn read_uninit(
91        &self,
92        options: StreamReadOptions,
93        buffer: &mut [MaybeUninit<u8>],
94    ) -> Result<usize, Status> {
95        // TODO(https://fxbug.dev/42079723) use MaybeUninit::slice_as_mut_ptr when stable
96        let mut iovec =
97            [sys::zx_iovec_t { buffer: buffer.as_mut_ptr().cast::<u8>(), capacity: buffer.len() }];
98        // SAFETY: The buffer in `iovec` comes from a mutable slice so we know it's safe to pass it
99        // to `readv`.
100        unsafe { self.readv(options, &mut iovec) }
101    }
102
103    /// Attempts to read `length` bytes from the stream starting at the stream's current seek
104    /// offset. Returns the read bytes as a `Vec`.
105    ///
106    /// See [zx_stream_readv](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_readv).
107    pub fn read_to_vec(
108        &self,
109        options: StreamReadOptions,
110        length: usize,
111    ) -> Result<Vec<u8>, Status> {
112        let mut data = Vec::with_capacity(length);
113        let buffer = &mut data.spare_capacity_mut()[0..length];
114        let actual = self.read_uninit(options, buffer)?;
115        // SAFETY: read_uninit returns the number of bytes that were initialized.
116        unsafe { data.set_len(actual) };
117        Ok(data)
118    }
119
120    /// Wraps the
121    /// [`zx_stream_readv_at`](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_readv_at)
122    /// syscall.
123    ///
124    /// # Safety
125    ///
126    /// The caller is responsible for ensuring that the buffers in `iovecs` point to valid (albeit
127    /// not necessarily initialized) memory.
128    pub unsafe fn readv_at(
129        &self,
130        options: StreamReadOptions,
131        offset: u64,
132        iovecs: &mut [sys::zx_iovec_t],
133    ) -> Result<usize, Status> {
134        let mut actual = 0;
135        let status = unsafe {
136            sys::zx_stream_readv_at(
137                self.raw_handle(),
138                options.bits(),
139                offset,
140                iovecs.as_mut_ptr(),
141                iovecs.len(),
142                &mut actual,
143            )
144        };
145        ok(status)?;
146        Ok(actual)
147    }
148
149    /// Attempts to read `buffer.len()` bytes from the stream starting at `offset`. Only the number
150    /// of bytes read from the stream will be initialized in `buffer`. Returns the number of bytes
151    /// read from the stream.
152    ///
153    /// See
154    /// [zx_stream_readv_at](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_readv_at).
155    pub fn read_at_uninit(
156        &self,
157        options: StreamReadOptions,
158        offset: u64,
159        buffer: &mut [MaybeUninit<u8>],
160    ) -> Result<usize, Status> {
161        // TODO(https://fxbug.dev/42079723) Use MaybeUninit::slice_as_mut_ptr when stable.
162        let mut iovec =
163            [sys::zx_iovec_t { buffer: buffer.as_mut_ptr().cast::<u8>(), capacity: buffer.len() }];
164        // SAFETY: The buffer in `iovec` comes from a mutable slice so we know it's safe to pass it
165        // to `readv_at`.
166        unsafe { self.readv_at(options, offset, &mut iovec) }
167    }
168
169    /// Attempts to read `length` bytes from the stream starting at `offset`. Returns the read bytes
170    /// as a `Vec`.
171    ///
172    /// See
173    /// [zx_stream_readv_at](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_readv_at).
174    pub fn read_at_to_vec(
175        &self,
176        options: StreamReadOptions,
177        offset: u64,
178        length: usize,
179    ) -> Result<Vec<u8>, Status> {
180        let mut data = Vec::with_capacity(length);
181        let buffer = &mut data.spare_capacity_mut()[0..length];
182        let actual = self.read_at_uninit(options, offset, buffer)?;
183        // SAFETY: read_at_uninit returns the number of bytes that were initialized.
184        unsafe { data.set_len(actual) };
185        Ok(data)
186    }
187
188    /// See [zx_stream_seek](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_seek)
189    pub fn seek(&self, pos: SeekFrom) -> Result<u64, Status> {
190        let (whence, offset) = match pos {
191            SeekFrom::Start(start) => (
192                sys::ZX_STREAM_SEEK_ORIGIN_START,
193                start.try_into().map_err(|_| Status::OUT_OF_RANGE)?,
194            ),
195            SeekFrom::End(end) => (sys::ZX_STREAM_SEEK_ORIGIN_END, end),
196            SeekFrom::Current(current) => (sys::ZX_STREAM_SEEK_ORIGIN_CURRENT, current),
197        };
198        let mut pos = 0;
199        let status = unsafe { sys::zx_stream_seek(self.raw_handle(), whence, offset, &mut pos) };
200        ok(status)?;
201        Ok(pos)
202    }
203
204    /// Wraps the
205    /// [`zx_stream_writev`](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_writev)
206    /// syscall.
207    pub fn writev(
208        &self,
209        options: StreamWriteOptions,
210        iovecs: &[sys::zx_iovec_t],
211    ) -> Result<usize, Status> {
212        let mut actual = 0;
213        let status = unsafe {
214            sys::zx_stream_writev(
215                self.raw_handle(),
216                options.bits(),
217                iovecs.as_ptr(),
218                iovecs.len(),
219                &mut actual,
220            )
221        };
222        ok(status)?;
223        Ok(actual)
224    }
225
226    /// Writes `buffer` to the stream at the stream's current seek offset. Returns the number of
227    /// bytes written.
228    ///
229    /// See [zx_stream_writev](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_writev).
230    pub fn write(&self, options: StreamWriteOptions, buffer: &[u8]) -> Result<usize, Status> {
231        let iovec = [sys::zx_iovec_t { buffer: buffer.as_ptr(), capacity: buffer.len() }];
232        self.writev(options, &iovec)
233    }
234
235    /// Wraps the
236    /// [`zx_stream_writev_at`](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_writev_at)
237    /// syscall.
238    pub fn writev_at(
239        &self,
240        options: StreamWriteOptions,
241        offset: u64,
242        iovecs: &[sys::zx_iovec_t],
243    ) -> Result<usize, Status> {
244        let mut actual = 0;
245        let status = unsafe {
246            sys::zx_stream_writev_at(
247                self.raw_handle(),
248                options.bits(),
249                offset,
250                iovecs.as_ptr(),
251                iovecs.len(),
252                &mut actual,
253            )
254        };
255        ok(status)?;
256        Ok(actual)
257    }
258
259    /// Writes `buffer` to the stream at `offset``. Returns the number of bytes written.
260    ///
261    /// See
262    /// [zx_stream_writev_at](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_writev_at).
263    pub fn write_at(
264        &self,
265        options: StreamWriteOptions,
266        offset: u64,
267        buffer: &[u8],
268    ) -> Result<usize, Status> {
269        let iovec = [sys::zx_iovec_t { buffer: buffer.as_ptr(), capacity: buffer.len() }];
270        self.writev_at(options, offset, &iovec)
271    }
272}
273
274unsafe_handle_properties!(object: Stream,
275    props: [
276        {query_ty: STREAM_MODE_APPEND, tag: StreamModeAppendTag, prop_ty: u8, get: get_mode_append, set: set_mode_append},
277    ]
278);
279
280impl std::io::Read for Stream {
281    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
282        let mut iovec = [sys::zx_iovec_t { buffer: buf.as_mut_ptr(), capacity: buf.len() }];
283        // SAFETY: The buffer in `iovec` comes from a mutable slice so we know it's safe to pass it
284        // to `readv`.
285        Ok(unsafe { self.readv(StreamReadOptions::empty(), &mut iovec) }?)
286    }
287
288    fn read_vectored(&mut self, bufs: &mut [std::io::IoSliceMut<'_>]) -> std::io::Result<usize> {
289        // SAFETY: `zx_iovec_t` and `IoSliceMut` have the same layout.
290        let mut iovecs = unsafe {
291            std::slice::from_raw_parts_mut(bufs.as_mut_ptr().cast::<sys::zx_iovec_t>(), bufs.len())
292        };
293        // SAFETY: `IoSliceMut` can only be constructed from a mutable slice so we know it's safe to
294        // pass to `readv`.
295        Ok(unsafe { self.readv(StreamReadOptions::empty(), &mut iovecs) }?)
296    }
297}
298
299impl std::io::Seek for Stream {
300    fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
301        Ok(Self::seek(&self, pos)? as u64)
302    }
303}
304
305impl std::io::Write for Stream {
306    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
307        Ok(Self::write(&self, StreamWriteOptions::empty(), buf)?)
308    }
309
310    fn write_vectored(&mut self, bufs: &[std::io::IoSlice<'_>]) -> std::io::Result<usize> {
311        // SAFETY: `zx_iovec_t` and `IoSliceMut` have the same layout.
312        let iovecs = unsafe {
313            std::slice::from_raw_parts(bufs.as_ptr().cast::<sys::zx_iovec_t>(), bufs.len())
314        };
315        Ok(self.writev(StreamWriteOptions::empty(), &iovecs)?)
316    }
317
318    fn flush(&mut self) -> std::io::Result<()> {
319        Ok(())
320    }
321}
322
323#[cfg(test)]
324mod tests {
325    use super::*;
326    use crate as zx;
327
328    #[test]
329    fn create() {
330        let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
331
332        let stream =
333            Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
334
335        let basic_info = stream.basic_info().unwrap();
336        assert_eq!(basic_info.object_type, zx::ObjectType::STREAM);
337        assert!(basic_info.rights.contains(zx::Rights::READ));
338        assert!(basic_info.rights.contains(zx::Rights::WRITE));
339    }
340
341    #[test]
342    fn create_readonly() {
343        let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
344
345        let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
346
347        let basic_info = stream.basic_info().unwrap();
348        assert!(basic_info.rights.contains(zx::Rights::READ));
349        assert!(!basic_info.rights.contains(zx::Rights::WRITE));
350    }
351
352    #[test]
353    fn create_offset() {
354        let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
355        let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 24).unwrap();
356        assert_eq!(stream.seek(SeekFrom::Current(0)).unwrap(), 24);
357    }
358
359    #[test]
360    fn create_invalid() {
361        let result = Stream::create(
362            StreamOptions::MODE_READ,
363            &zx::Vmo::from(zx::NullableHandle::invalid()),
364            0,
365        );
366        assert_eq!(result, Err(zx::Status::BAD_HANDLE));
367    }
368
369    #[test]
370    fn create_with_mode_append() {
371        let size: u64 = zx::system_get_page_size().into();
372        let vmo = zx::Vmo::create(size).unwrap();
373        let stream =
374            Stream::create(StreamOptions::MODE_WRITE | StreamOptions::MODE_APPEND, &vmo, 0)
375                .unwrap();
376        assert_eq!(stream.get_mode_append().unwrap(), 1);
377    }
378
379    #[test]
380    fn get_and_set_mode_append() {
381        let size: u64 = zx::system_get_page_size().into();
382        let vmo = zx::Vmo::create(size).unwrap();
383        let stream = Stream::create(StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
384        assert_eq!(stream.get_mode_append().unwrap(), 0);
385        stream.set_mode_append(&1).unwrap();
386        assert_eq!(stream.get_mode_append().unwrap(), 1);
387        stream.set_mode_append(&0).unwrap();
388        assert_eq!(stream.get_mode_append().unwrap(), 0);
389    }
390
391    #[test]
392    fn read_uninit() {
393        const DATA: &'static [u8] = b"vmo-contents";
394        let vmo = zx::Vmo::create(DATA.len() as u64).unwrap();
395        vmo.write(DATA, 0).unwrap();
396        let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
397
398        // Read from the stream.
399        let mut data = Vec::with_capacity(5);
400        let bytes_read =
401            stream.read_uninit(StreamReadOptions::empty(), data.spare_capacity_mut()).unwrap();
402        assert_eq!(bytes_read, 5);
403        unsafe { data.set_len(5) };
404        assert_eq!(data, DATA[0..5]);
405
406        // Try to read more data than is available in the stream.
407        let mut data = Vec::with_capacity(10);
408        let bytes_read =
409            stream.read_uninit(StreamReadOptions::empty(), data.spare_capacity_mut()).unwrap();
410        assert_eq!(bytes_read, 7);
411        unsafe { data.set_len(7) };
412        assert_eq!(data, DATA[5..]);
413
414        // Try to read at the end of the stream.
415        let mut data = Vec::with_capacity(10);
416        let bytes_read =
417            stream.read_uninit(StreamReadOptions::empty(), data.spare_capacity_mut()).unwrap();
418        assert_eq!(bytes_read, 0);
419    }
420
421    #[test]
422    fn read_to_vec() {
423        const DATA: &'static [u8] = b"vmo-contents";
424        let vmo = zx::Vmo::create(DATA.len() as u64).unwrap();
425        vmo.write(DATA, 0).unwrap();
426        let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
427
428        let data = stream.read_to_vec(StreamReadOptions::empty(), DATA.len()).unwrap();
429        assert_eq!(data, DATA);
430    }
431
432    #[test]
433    fn read_at_uninit() {
434        const DATA: &'static [u8] = b"vmo-contents";
435        let vmo = zx::Vmo::create(DATA.len() as u64).unwrap();
436        vmo.write(DATA, 0).unwrap();
437        let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
438
439        // Read from the stream.
440        let mut data = Vec::with_capacity(5);
441        let bytes_read = stream
442            .read_at_uninit(StreamReadOptions::empty(), 0, data.spare_capacity_mut())
443            .unwrap();
444        assert_eq!(bytes_read, 5);
445        unsafe { data.set_len(5) };
446        assert_eq!(data, DATA[0..5]);
447
448        // Try to read beyond the end of the stream.
449        let mut data = Vec::with_capacity(10);
450        let bytes_read = stream
451            .read_at_uninit(StreamReadOptions::empty(), 5, data.spare_capacity_mut())
452            .unwrap();
453        assert_eq!(bytes_read, 7);
454        unsafe { data.set_len(7) };
455        assert_eq!(data, DATA[5..]);
456
457        // Try to read starting beyond the end of the stream.
458        let mut data = Vec::with_capacity(10);
459        let bytes_read = stream
460            .read_at_uninit(StreamReadOptions::empty(), 20, data.spare_capacity_mut())
461            .unwrap();
462        assert_eq!(bytes_read, 0);
463    }
464
465    #[test]
466    fn read_at_to_vec() {
467        const DATA: &'static [u8] = b"vmo-contents";
468        let vmo = zx::Vmo::create(DATA.len() as u64).unwrap();
469        vmo.write(DATA, 0).unwrap();
470        let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
471
472        let data = stream.read_at_to_vec(StreamReadOptions::empty(), 5, DATA.len()).unwrap();
473        assert_eq!(data, DATA[5..]);
474    }
475
476    #[test]
477    fn write() {
478        const DATA: &'static [u8] = b"vmo-contents";
479        let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::UNBOUNDED, 0).unwrap();
480        let stream =
481            Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
482
483        let bytes_written = stream.write(zx::StreamWriteOptions::empty(), DATA).unwrap();
484        assert_eq!(bytes_written, DATA.len());
485
486        let data = stream.read_at_to_vec(StreamReadOptions::empty(), 0, DATA.len()).unwrap();
487        assert_eq!(data, DATA);
488    }
489
490    #[test]
491    fn write_at() {
492        const DATA: &'static [u8] = b"vmo-contents";
493        let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::UNBOUNDED, 0).unwrap();
494        let stream =
495            Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
496
497        let bytes_written =
498            stream.write_at(zx::StreamWriteOptions::empty(), 0, &DATA[0..3]).unwrap();
499        assert_eq!(bytes_written, 3);
500
501        let bytes_written =
502            stream.write_at(zx::StreamWriteOptions::empty(), 3, &DATA[3..]).unwrap();
503        assert_eq!(bytes_written, DATA.len() - 3);
504
505        let data = stream.read_at_to_vec(StreamReadOptions::empty(), 0, DATA.len()).unwrap();
506        assert_eq!(data, DATA);
507    }
508
509    #[test]
510    fn std_io_read_write_seek() {
511        const DATA: &'static str = "stream-contents";
512        let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::UNBOUNDED, 0).unwrap();
513        let mut stream =
514            Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
515
516        std::io::Write::write_all(&mut stream, DATA.as_bytes()).unwrap();
517        assert_eq!(std::io::Seek::stream_position(&mut stream).unwrap(), DATA.len() as u64);
518        std::io::Seek::rewind(&mut stream).unwrap();
519        assert_eq!(std::io::read_to_string(&mut stream).unwrap(), DATA);
520        assert_eq!(std::io::Seek::stream_position(&mut stream).unwrap(), DATA.len() as u64);
521    }
522
523    #[test]
524    fn std_io_read_vectored() {
525        const DATA: &'static [u8] = b"stream-contents";
526        let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::UNBOUNDED, 0).unwrap();
527        let mut stream =
528            Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
529        assert_eq!(stream.write(StreamWriteOptions::empty(), DATA).unwrap(), DATA.len());
530        std::io::Seek::rewind(&mut stream).unwrap();
531
532        let mut buf1 = [0; 6];
533        let mut buf2 = [0; 1];
534        let mut buf3 = [0; 8];
535        let mut bufs = [
536            std::io::IoSliceMut::new(&mut buf1),
537            std::io::IoSliceMut::new(&mut buf2),
538            std::io::IoSliceMut::new(&mut buf3),
539        ];
540        assert_eq!(std::io::Read::read_vectored(&mut stream, &mut bufs).unwrap(), DATA.len());
541        assert_eq!(buf1, DATA[0..6]);
542        assert_eq!(buf2, DATA[6..7]);
543        assert_eq!(buf3, DATA[7..]);
544    }
545
546    #[test]
547    fn std_io_write_vectored() {
548        let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::UNBOUNDED, 0).unwrap();
549        let mut stream =
550            Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
551
552        let bufs = [
553            std::io::IoSlice::new(b"stream"),
554            std::io::IoSlice::new(b"-"),
555            std::io::IoSlice::new(b"contents"),
556        ];
557        assert_eq!(std::io::Write::write_vectored(&mut stream, &bufs).unwrap(), 15);
558        std::io::Seek::rewind(&mut stream).unwrap();
559        assert_eq!(stream.read_to_vec(StreamReadOptions::empty(), 15).unwrap(), b"stream-contents");
560    }
561}