1use crate::{
8 AsHandleRef, HandleBased, HandleRef, NullableHandle, Property, PropertyQuery, Status, Vmo,
9 object_get_property, object_set_property, ok, sys,
10};
11use bitflags::bitflags;
12use std::io::SeekFrom;
13use std::mem::MaybeUninit;
14
15#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
19#[repr(transparent)]
20pub struct Stream(NullableHandle);
21impl_handle_based!(Stream);
22
23bitflags! {
24 #[repr(transparent)]
25 #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
26 pub struct StreamOptions: u32 {
27 const MODE_READ = sys::ZX_STREAM_MODE_READ;
28 const MODE_WRITE = sys::ZX_STREAM_MODE_WRITE;
29 const MODE_APPEND = sys::ZX_STREAM_MODE_APPEND;
30 }
31}
32
33bitflags! {
34 #[repr(transparent)]
35 #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
36 pub struct StreamReadOptions: u32 {
37 }
38}
39
40bitflags! {
41 #[repr(transparent)]
42 #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
43 pub struct StreamWriteOptions: u32 {
44 const APPEND = sys::ZX_STREAM_APPEND;
45 }
46}
47
48impl Stream {
49 pub fn create(options: StreamOptions, vmo: &Vmo, offset: u64) -> Result<Self, Status> {
51 let mut handle = 0;
52 let status =
53 unsafe { sys::zx_stream_create(options.bits(), vmo.raw_handle(), offset, &mut handle) };
54 ok(status)?;
55 unsafe { Ok(Stream::from(NullableHandle::from_raw(handle))) }
56 }
57
58 pub unsafe fn readv(
67 &self,
68 options: StreamReadOptions,
69 iovecs: &mut [sys::zx_iovec_t],
70 ) -> Result<usize, Status> {
71 let mut actual = 0;
72 let status = unsafe {
73 sys::zx_stream_readv(
74 self.raw_handle(),
75 options.bits(),
76 iovecs.as_mut_ptr(),
77 iovecs.len(),
78 &mut actual,
79 )
80 };
81 ok(status)?;
82 Ok(actual)
83 }
84
85 pub fn read_uninit(
91 &self,
92 options: StreamReadOptions,
93 buffer: &mut [MaybeUninit<u8>],
94 ) -> Result<usize, Status> {
95 let mut iovec =
97 [sys::zx_iovec_t { buffer: buffer.as_mut_ptr().cast::<u8>(), capacity: buffer.len() }];
98 unsafe { self.readv(options, &mut iovec) }
101 }
102
103 pub fn read_to_vec(
108 &self,
109 options: StreamReadOptions,
110 length: usize,
111 ) -> Result<Vec<u8>, Status> {
112 let mut data = Vec::with_capacity(length);
113 let buffer = &mut data.spare_capacity_mut()[0..length];
114 let actual = self.read_uninit(options, buffer)?;
115 unsafe { data.set_len(actual) };
117 Ok(data)
118 }
119
120 pub unsafe fn readv_at(
129 &self,
130 options: StreamReadOptions,
131 offset: u64,
132 iovecs: &mut [sys::zx_iovec_t],
133 ) -> Result<usize, Status> {
134 let mut actual = 0;
135 let status = unsafe {
136 sys::zx_stream_readv_at(
137 self.raw_handle(),
138 options.bits(),
139 offset,
140 iovecs.as_mut_ptr(),
141 iovecs.len(),
142 &mut actual,
143 )
144 };
145 ok(status)?;
146 Ok(actual)
147 }
148
149 pub fn read_at_uninit(
156 &self,
157 options: StreamReadOptions,
158 offset: u64,
159 buffer: &mut [MaybeUninit<u8>],
160 ) -> Result<usize, Status> {
161 let mut iovec =
163 [sys::zx_iovec_t { buffer: buffer.as_mut_ptr().cast::<u8>(), capacity: buffer.len() }];
164 unsafe { self.readv_at(options, offset, &mut iovec) }
167 }
168
169 pub fn read_at_to_vec(
175 &self,
176 options: StreamReadOptions,
177 offset: u64,
178 length: usize,
179 ) -> Result<Vec<u8>, Status> {
180 let mut data = Vec::with_capacity(length);
181 let buffer = &mut data.spare_capacity_mut()[0..length];
182 let actual = self.read_at_uninit(options, offset, buffer)?;
183 unsafe { data.set_len(actual) };
185 Ok(data)
186 }
187
188 pub fn seek(&self, pos: SeekFrom) -> Result<u64, Status> {
190 let (whence, offset) = match pos {
191 SeekFrom::Start(start) => (
192 sys::ZX_STREAM_SEEK_ORIGIN_START,
193 start.try_into().map_err(|_| Status::OUT_OF_RANGE)?,
194 ),
195 SeekFrom::End(end) => (sys::ZX_STREAM_SEEK_ORIGIN_END, end),
196 SeekFrom::Current(current) => (sys::ZX_STREAM_SEEK_ORIGIN_CURRENT, current),
197 };
198 let mut pos = 0;
199 let status = unsafe { sys::zx_stream_seek(self.raw_handle(), whence, offset, &mut pos) };
200 ok(status)?;
201 Ok(pos)
202 }
203
204 pub fn writev(
208 &self,
209 options: StreamWriteOptions,
210 iovecs: &[sys::zx_iovec_t],
211 ) -> Result<usize, Status> {
212 let mut actual = 0;
213 let status = unsafe {
214 sys::zx_stream_writev(
215 self.raw_handle(),
216 options.bits(),
217 iovecs.as_ptr(),
218 iovecs.len(),
219 &mut actual,
220 )
221 };
222 ok(status)?;
223 Ok(actual)
224 }
225
226 pub fn write(&self, options: StreamWriteOptions, buffer: &[u8]) -> Result<usize, Status> {
231 let iovec = [sys::zx_iovec_t { buffer: buffer.as_ptr(), capacity: buffer.len() }];
232 self.writev(options, &iovec)
233 }
234
235 pub fn writev_at(
239 &self,
240 options: StreamWriteOptions,
241 offset: u64,
242 iovecs: &[sys::zx_iovec_t],
243 ) -> Result<usize, Status> {
244 let mut actual = 0;
245 let status = unsafe {
246 sys::zx_stream_writev_at(
247 self.raw_handle(),
248 options.bits(),
249 offset,
250 iovecs.as_ptr(),
251 iovecs.len(),
252 &mut actual,
253 )
254 };
255 ok(status)?;
256 Ok(actual)
257 }
258
259 pub fn write_at(
264 &self,
265 options: StreamWriteOptions,
266 offset: u64,
267 buffer: &[u8],
268 ) -> Result<usize, Status> {
269 let iovec = [sys::zx_iovec_t { buffer: buffer.as_ptr(), capacity: buffer.len() }];
270 self.writev_at(options, offset, &iovec)
271 }
272}
273
274unsafe_handle_properties!(object: Stream,
275 props: [
276 {query_ty: STREAM_MODE_APPEND, tag: StreamModeAppendTag, prop_ty: u8, get: get_mode_append, set: set_mode_append},
277 ]
278);
279
280impl std::io::Read for Stream {
281 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
282 let mut iovec = [sys::zx_iovec_t { buffer: buf.as_mut_ptr(), capacity: buf.len() }];
283 Ok(unsafe { self.readv(StreamReadOptions::empty(), &mut iovec) }?)
286 }
287
288 fn read_vectored(&mut self, bufs: &mut [std::io::IoSliceMut<'_>]) -> std::io::Result<usize> {
289 let mut iovecs = unsafe {
291 std::slice::from_raw_parts_mut(bufs.as_mut_ptr().cast::<sys::zx_iovec_t>(), bufs.len())
292 };
293 Ok(unsafe { self.readv(StreamReadOptions::empty(), &mut iovecs) }?)
296 }
297}
298
299impl std::io::Seek for Stream {
300 fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
301 Ok(Self::seek(&self, pos)? as u64)
302 }
303}
304
305impl std::io::Write for Stream {
306 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
307 Ok(Self::write(&self, StreamWriteOptions::empty(), buf)?)
308 }
309
310 fn write_vectored(&mut self, bufs: &[std::io::IoSlice<'_>]) -> std::io::Result<usize> {
311 let iovecs = unsafe {
313 std::slice::from_raw_parts(bufs.as_ptr().cast::<sys::zx_iovec_t>(), bufs.len())
314 };
315 Ok(self.writev(StreamWriteOptions::empty(), &iovecs)?)
316 }
317
318 fn flush(&mut self) -> std::io::Result<()> {
319 Ok(())
320 }
321}
322
323#[cfg(test)]
324mod tests {
325 use super::*;
326 use crate as zx;
327
328 #[test]
329 fn create() {
330 let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
331
332 let stream =
333 Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
334
335 let basic_info = stream.basic_info().unwrap();
336 assert_eq!(basic_info.object_type, zx::ObjectType::STREAM);
337 assert!(basic_info.rights.contains(zx::Rights::READ));
338 assert!(basic_info.rights.contains(zx::Rights::WRITE));
339 }
340
341 #[test]
342 fn create_readonly() {
343 let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
344
345 let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
346
347 let basic_info = stream.basic_info().unwrap();
348 assert!(basic_info.rights.contains(zx::Rights::READ));
349 assert!(!basic_info.rights.contains(zx::Rights::WRITE));
350 }
351
352 #[test]
353 fn create_offset() {
354 let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
355 let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 24).unwrap();
356 assert_eq!(stream.seek(SeekFrom::Current(0)).unwrap(), 24);
357 }
358
359 #[test]
360 fn create_with_mode_append() {
361 let size: u64 = zx::system_get_page_size().into();
362 let vmo = zx::Vmo::create(size).unwrap();
363 let stream =
364 Stream::create(StreamOptions::MODE_WRITE | StreamOptions::MODE_APPEND, &vmo, 0)
365 .unwrap();
366 assert_eq!(stream.get_mode_append().unwrap(), 1);
367 }
368
369 #[test]
370 fn get_and_set_mode_append() {
371 let size: u64 = zx::system_get_page_size().into();
372 let vmo = zx::Vmo::create(size).unwrap();
373 let stream = Stream::create(StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
374 assert_eq!(stream.get_mode_append().unwrap(), 0);
375 stream.set_mode_append(&1).unwrap();
376 assert_eq!(stream.get_mode_append().unwrap(), 1);
377 stream.set_mode_append(&0).unwrap();
378 assert_eq!(stream.get_mode_append().unwrap(), 0);
379 }
380
381 #[test]
382 fn read_uninit() {
383 const DATA: &'static [u8] = b"vmo-contents";
384 let vmo = zx::Vmo::create(DATA.len() as u64).unwrap();
385 vmo.write(DATA, 0).unwrap();
386 let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
387
388 let mut data = Vec::with_capacity(5);
390 let bytes_read =
391 stream.read_uninit(StreamReadOptions::empty(), data.spare_capacity_mut()).unwrap();
392 assert_eq!(bytes_read, 5);
393 unsafe { data.set_len(5) };
394 assert_eq!(data, DATA[0..5]);
395
396 let mut data = Vec::with_capacity(10);
398 let bytes_read =
399 stream.read_uninit(StreamReadOptions::empty(), data.spare_capacity_mut()).unwrap();
400 assert_eq!(bytes_read, 7);
401 unsafe { data.set_len(7) };
402 assert_eq!(data, DATA[5..]);
403
404 let mut data = Vec::with_capacity(10);
406 let bytes_read =
407 stream.read_uninit(StreamReadOptions::empty(), data.spare_capacity_mut()).unwrap();
408 assert_eq!(bytes_read, 0);
409 }
410
411 #[test]
412 fn read_to_vec() {
413 const DATA: &'static [u8] = b"vmo-contents";
414 let vmo = zx::Vmo::create(DATA.len() as u64).unwrap();
415 vmo.write(DATA, 0).unwrap();
416 let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
417
418 let data = stream.read_to_vec(StreamReadOptions::empty(), DATA.len()).unwrap();
419 assert_eq!(data, DATA);
420 }
421
422 #[test]
423 fn read_at_uninit() {
424 const DATA: &'static [u8] = b"vmo-contents";
425 let vmo = zx::Vmo::create(DATA.len() as u64).unwrap();
426 vmo.write(DATA, 0).unwrap();
427 let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
428
429 let mut data = Vec::with_capacity(5);
431 let bytes_read = stream
432 .read_at_uninit(StreamReadOptions::empty(), 0, data.spare_capacity_mut())
433 .unwrap();
434 assert_eq!(bytes_read, 5);
435 unsafe { data.set_len(5) };
436 assert_eq!(data, DATA[0..5]);
437
438 let mut data = Vec::with_capacity(10);
440 let bytes_read = stream
441 .read_at_uninit(StreamReadOptions::empty(), 5, data.spare_capacity_mut())
442 .unwrap();
443 assert_eq!(bytes_read, 7);
444 unsafe { data.set_len(7) };
445 assert_eq!(data, DATA[5..]);
446
447 let mut data = Vec::with_capacity(10);
449 let bytes_read = stream
450 .read_at_uninit(StreamReadOptions::empty(), 20, data.spare_capacity_mut())
451 .unwrap();
452 assert_eq!(bytes_read, 0);
453 }
454
455 #[test]
456 fn read_at_to_vec() {
457 const DATA: &'static [u8] = b"vmo-contents";
458 let vmo = zx::Vmo::create(DATA.len() as u64).unwrap();
459 vmo.write(DATA, 0).unwrap();
460 let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
461
462 let data = stream.read_at_to_vec(StreamReadOptions::empty(), 5, DATA.len()).unwrap();
463 assert_eq!(data, DATA[5..]);
464 }
465
466 #[test]
467 fn write() {
468 const DATA: &'static [u8] = b"vmo-contents";
469 let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::UNBOUNDED, 0).unwrap();
470 let stream =
471 Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
472
473 let bytes_written = stream.write(zx::StreamWriteOptions::empty(), DATA).unwrap();
474 assert_eq!(bytes_written, DATA.len());
475
476 let data = stream.read_at_to_vec(StreamReadOptions::empty(), 0, DATA.len()).unwrap();
477 assert_eq!(data, DATA);
478 }
479
480 #[test]
481 fn write_at() {
482 const DATA: &'static [u8] = b"vmo-contents";
483 let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::UNBOUNDED, 0).unwrap();
484 let stream =
485 Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
486
487 let bytes_written =
488 stream.write_at(zx::StreamWriteOptions::empty(), 0, &DATA[0..3]).unwrap();
489 assert_eq!(bytes_written, 3);
490
491 let bytes_written =
492 stream.write_at(zx::StreamWriteOptions::empty(), 3, &DATA[3..]).unwrap();
493 assert_eq!(bytes_written, DATA.len() - 3);
494
495 let data = stream.read_at_to_vec(StreamReadOptions::empty(), 0, DATA.len()).unwrap();
496 assert_eq!(data, DATA);
497 }
498
499 #[test]
500 fn std_io_read_write_seek() {
501 const DATA: &'static str = "stream-contents";
502 let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::UNBOUNDED, 0).unwrap();
503 let mut stream =
504 Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
505
506 std::io::Write::write_all(&mut stream, DATA.as_bytes()).unwrap();
507 assert_eq!(std::io::Seek::stream_position(&mut stream).unwrap(), DATA.len() as u64);
508 std::io::Seek::rewind(&mut stream).unwrap();
509 assert_eq!(std::io::read_to_string(&mut stream).unwrap(), DATA);
510 assert_eq!(std::io::Seek::stream_position(&mut stream).unwrap(), DATA.len() as u64);
511 }
512
513 #[test]
514 fn std_io_read_vectored() {
515 const DATA: &'static [u8] = b"stream-contents";
516 let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::UNBOUNDED, 0).unwrap();
517 let mut stream =
518 Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
519 assert_eq!(stream.write(StreamWriteOptions::empty(), DATA).unwrap(), DATA.len());
520 std::io::Seek::rewind(&mut stream).unwrap();
521
522 let mut buf1 = [0; 6];
523 let mut buf2 = [0; 1];
524 let mut buf3 = [0; 8];
525 let mut bufs = [
526 std::io::IoSliceMut::new(&mut buf1),
527 std::io::IoSliceMut::new(&mut buf2),
528 std::io::IoSliceMut::new(&mut buf3),
529 ];
530 assert_eq!(std::io::Read::read_vectored(&mut stream, &mut bufs).unwrap(), DATA.len());
531 assert_eq!(buf1, DATA[0..6]);
532 assert_eq!(buf2, DATA[6..7]);
533 assert_eq!(buf3, DATA[7..]);
534 }
535
536 #[test]
537 fn std_io_write_vectored() {
538 let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::UNBOUNDED, 0).unwrap();
539 let mut stream =
540 Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
541
542 let bufs = [
543 std::io::IoSlice::new(b"stream"),
544 std::io::IoSlice::new(b"-"),
545 std::io::IoSlice::new(b"contents"),
546 ];
547 assert_eq!(std::io::Write::write_vectored(&mut stream, &bufs).unwrap(), 15);
548 std::io::Seek::rewind(&mut stream).unwrap();
549 assert_eq!(stream.read_to_vec(StreamReadOptions::empty(), 15).unwrap(), b"stream-contents");
550 }
551}