1use crate::{
8 AsHandleRef, HandleBased, HandleRef, NullableHandle, Property, PropertyQuery, Status, Vmo,
9 object_get_property, object_set_property, ok, sys,
10};
11use bitflags::bitflags;
12use std::io::SeekFrom;
13use std::mem::MaybeUninit;
14
15#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
19#[repr(transparent)]
20pub struct Stream(NullableHandle);
21impl_handle_based!(Stream);
22
23bitflags! {
24 #[repr(transparent)]
25 #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
26 pub struct StreamOptions: u32 {
27 const MODE_READ = sys::ZX_STREAM_MODE_READ;
28 const MODE_WRITE = sys::ZX_STREAM_MODE_WRITE;
29 const MODE_APPEND = sys::ZX_STREAM_MODE_APPEND;
30 }
31}
32
33bitflags! {
34 #[repr(transparent)]
35 #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
36 pub struct StreamReadOptions: u32 {
37 }
38}
39
40bitflags! {
41 #[repr(transparent)]
42 #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
43 pub struct StreamWriteOptions: u32 {
44 const APPEND = sys::ZX_STREAM_APPEND;
45 }
46}
47
48impl Stream {
49 pub fn create(options: StreamOptions, vmo: &Vmo, offset: u64) -> Result<Self, Status> {
51 let mut handle = 0;
52 let status =
53 unsafe { sys::zx_stream_create(options.bits(), vmo.raw_handle(), offset, &mut handle) };
54 ok(status)?;
55 unsafe { Ok(Stream::from(NullableHandle::from_raw(handle))) }
56 }
57
58 pub unsafe fn readv(
67 &self,
68 options: StreamReadOptions,
69 iovecs: &mut [sys::zx_iovec_t],
70 ) -> Result<usize, Status> {
71 let mut actual = 0;
72 let status = unsafe {
73 sys::zx_stream_readv(
74 self.raw_handle(),
75 options.bits(),
76 iovecs.as_mut_ptr(),
77 iovecs.len(),
78 &mut actual,
79 )
80 };
81 ok(status)?;
82 Ok(actual)
83 }
84
85 pub fn read_uninit(
91 &self,
92 options: StreamReadOptions,
93 buffer: &mut [MaybeUninit<u8>],
94 ) -> Result<usize, Status> {
95 let mut iovec =
97 [sys::zx_iovec_t { buffer: buffer.as_mut_ptr().cast::<u8>(), capacity: buffer.len() }];
98 unsafe { self.readv(options, &mut iovec) }
101 }
102
103 pub fn read_to_vec(
108 &self,
109 options: StreamReadOptions,
110 length: usize,
111 ) -> Result<Vec<u8>, Status> {
112 let mut data = Vec::with_capacity(length);
113 let buffer = &mut data.spare_capacity_mut()[0..length];
114 let actual = self.read_uninit(options, buffer)?;
115 unsafe { data.set_len(actual) };
117 Ok(data)
118 }
119
120 pub unsafe fn readv_at(
129 &self,
130 options: StreamReadOptions,
131 offset: u64,
132 iovecs: &mut [sys::zx_iovec_t],
133 ) -> Result<usize, Status> {
134 let mut actual = 0;
135 let status = unsafe {
136 sys::zx_stream_readv_at(
137 self.raw_handle(),
138 options.bits(),
139 offset,
140 iovecs.as_mut_ptr(),
141 iovecs.len(),
142 &mut actual,
143 )
144 };
145 ok(status)?;
146 Ok(actual)
147 }
148
149 pub fn read_at_uninit(
156 &self,
157 options: StreamReadOptions,
158 offset: u64,
159 buffer: &mut [MaybeUninit<u8>],
160 ) -> Result<usize, Status> {
161 let mut iovec =
163 [sys::zx_iovec_t { buffer: buffer.as_mut_ptr().cast::<u8>(), capacity: buffer.len() }];
164 unsafe { self.readv_at(options, offset, &mut iovec) }
167 }
168
169 pub fn read_at_to_vec(
175 &self,
176 options: StreamReadOptions,
177 offset: u64,
178 length: usize,
179 ) -> Result<Vec<u8>, Status> {
180 let mut data = Vec::with_capacity(length);
181 let buffer = &mut data.spare_capacity_mut()[0..length];
182 let actual = self.read_at_uninit(options, offset, buffer)?;
183 unsafe { data.set_len(actual) };
185 Ok(data)
186 }
187
188 pub fn seek(&self, pos: SeekFrom) -> Result<u64, Status> {
190 let (whence, offset) = match pos {
191 SeekFrom::Start(start) => (
192 sys::ZX_STREAM_SEEK_ORIGIN_START,
193 start.try_into().map_err(|_| Status::OUT_OF_RANGE)?,
194 ),
195 SeekFrom::End(end) => (sys::ZX_STREAM_SEEK_ORIGIN_END, end),
196 SeekFrom::Current(current) => (sys::ZX_STREAM_SEEK_ORIGIN_CURRENT, current),
197 };
198 let mut pos = 0;
199 let status = unsafe { sys::zx_stream_seek(self.raw_handle(), whence, offset, &mut pos) };
200 ok(status)?;
201 Ok(pos)
202 }
203
204 pub fn writev(
208 &self,
209 options: StreamWriteOptions,
210 iovecs: &[sys::zx_iovec_t],
211 ) -> Result<usize, Status> {
212 let mut actual = 0;
213 let status = unsafe {
214 sys::zx_stream_writev(
215 self.raw_handle(),
216 options.bits(),
217 iovecs.as_ptr(),
218 iovecs.len(),
219 &mut actual,
220 )
221 };
222 ok(status)?;
223 Ok(actual)
224 }
225
226 pub fn write(&self, options: StreamWriteOptions, buffer: &[u8]) -> Result<usize, Status> {
231 let iovec = [sys::zx_iovec_t { buffer: buffer.as_ptr(), capacity: buffer.len() }];
232 self.writev(options, &iovec)
233 }
234
235 pub fn writev_at(
239 &self,
240 options: StreamWriteOptions,
241 offset: u64,
242 iovecs: &[sys::zx_iovec_t],
243 ) -> Result<usize, Status> {
244 let mut actual = 0;
245 let status = unsafe {
246 sys::zx_stream_writev_at(
247 self.raw_handle(),
248 options.bits(),
249 offset,
250 iovecs.as_ptr(),
251 iovecs.len(),
252 &mut actual,
253 )
254 };
255 ok(status)?;
256 Ok(actual)
257 }
258
259 pub fn write_at(
264 &self,
265 options: StreamWriteOptions,
266 offset: u64,
267 buffer: &[u8],
268 ) -> Result<usize, Status> {
269 let iovec = [sys::zx_iovec_t { buffer: buffer.as_ptr(), capacity: buffer.len() }];
270 self.writev_at(options, offset, &iovec)
271 }
272}
273
274unsafe_handle_properties!(object: Stream,
275 props: [
276 {query_ty: STREAM_MODE_APPEND, tag: StreamModeAppendTag, prop_ty: u8, get: get_mode_append, set: set_mode_append},
277 ]
278);
279
280impl std::io::Read for Stream {
281 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
282 let mut iovec = [sys::zx_iovec_t { buffer: buf.as_mut_ptr(), capacity: buf.len() }];
283 Ok(unsafe { self.readv(StreamReadOptions::empty(), &mut iovec) }?)
286 }
287
288 fn read_vectored(&mut self, bufs: &mut [std::io::IoSliceMut<'_>]) -> std::io::Result<usize> {
289 let mut iovecs = unsafe {
291 std::slice::from_raw_parts_mut(bufs.as_mut_ptr().cast::<sys::zx_iovec_t>(), bufs.len())
292 };
293 Ok(unsafe { self.readv(StreamReadOptions::empty(), &mut iovecs) }?)
296 }
297}
298
299impl std::io::Seek for Stream {
300 fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
301 Ok(Self::seek(&self, pos)? as u64)
302 }
303}
304
305impl std::io::Write for Stream {
306 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
307 Ok(Self::write(&self, StreamWriteOptions::empty(), buf)?)
308 }
309
310 fn write_vectored(&mut self, bufs: &[std::io::IoSlice<'_>]) -> std::io::Result<usize> {
311 let iovecs = unsafe {
313 std::slice::from_raw_parts(bufs.as_ptr().cast::<sys::zx_iovec_t>(), bufs.len())
314 };
315 Ok(self.writev(StreamWriteOptions::empty(), &iovecs)?)
316 }
317
318 fn flush(&mut self) -> std::io::Result<()> {
319 Ok(())
320 }
321}
322
323#[cfg(test)]
324mod tests {
325 use super::*;
326 use crate as zx;
327
328 #[test]
329 fn create() {
330 let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
331
332 let stream =
333 Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
334
335 let basic_info = stream.basic_info().unwrap();
336 assert_eq!(basic_info.object_type, zx::ObjectType::STREAM);
337 assert!(basic_info.rights.contains(zx::Rights::READ));
338 assert!(basic_info.rights.contains(zx::Rights::WRITE));
339 }
340
341 #[test]
342 fn create_readonly() {
343 let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
344
345 let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
346
347 let basic_info = stream.basic_info().unwrap();
348 assert!(basic_info.rights.contains(zx::Rights::READ));
349 assert!(!basic_info.rights.contains(zx::Rights::WRITE));
350 }
351
352 #[test]
353 fn create_offset() {
354 let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
355 let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 24).unwrap();
356 assert_eq!(stream.seek(SeekFrom::Current(0)).unwrap(), 24);
357 }
358
359 #[test]
360 fn create_invalid() {
361 let result = Stream::create(
362 StreamOptions::MODE_READ,
363 &zx::Vmo::from(zx::NullableHandle::invalid()),
364 0,
365 );
366 assert_eq!(result, Err(zx::Status::BAD_HANDLE));
367 }
368
369 #[test]
370 fn create_with_mode_append() {
371 let size: u64 = zx::system_get_page_size().into();
372 let vmo = zx::Vmo::create(size).unwrap();
373 let stream =
374 Stream::create(StreamOptions::MODE_WRITE | StreamOptions::MODE_APPEND, &vmo, 0)
375 .unwrap();
376 assert_eq!(stream.get_mode_append().unwrap(), 1);
377 }
378
379 #[test]
380 fn get_and_set_mode_append() {
381 let size: u64 = zx::system_get_page_size().into();
382 let vmo = zx::Vmo::create(size).unwrap();
383 let stream = Stream::create(StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
384 assert_eq!(stream.get_mode_append().unwrap(), 0);
385 stream.set_mode_append(&1).unwrap();
386 assert_eq!(stream.get_mode_append().unwrap(), 1);
387 stream.set_mode_append(&0).unwrap();
388 assert_eq!(stream.get_mode_append().unwrap(), 0);
389 }
390
391 #[test]
392 fn read_uninit() {
393 const DATA: &'static [u8] = b"vmo-contents";
394 let vmo = zx::Vmo::create(DATA.len() as u64).unwrap();
395 vmo.write(DATA, 0).unwrap();
396 let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
397
398 let mut data = Vec::with_capacity(5);
400 let bytes_read =
401 stream.read_uninit(StreamReadOptions::empty(), data.spare_capacity_mut()).unwrap();
402 assert_eq!(bytes_read, 5);
403 unsafe { data.set_len(5) };
404 assert_eq!(data, DATA[0..5]);
405
406 let mut data = Vec::with_capacity(10);
408 let bytes_read =
409 stream.read_uninit(StreamReadOptions::empty(), data.spare_capacity_mut()).unwrap();
410 assert_eq!(bytes_read, 7);
411 unsafe { data.set_len(7) };
412 assert_eq!(data, DATA[5..]);
413
414 let mut data = Vec::with_capacity(10);
416 let bytes_read =
417 stream.read_uninit(StreamReadOptions::empty(), data.spare_capacity_mut()).unwrap();
418 assert_eq!(bytes_read, 0);
419 }
420
421 #[test]
422 fn read_to_vec() {
423 const DATA: &'static [u8] = b"vmo-contents";
424 let vmo = zx::Vmo::create(DATA.len() as u64).unwrap();
425 vmo.write(DATA, 0).unwrap();
426 let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
427
428 let data = stream.read_to_vec(StreamReadOptions::empty(), DATA.len()).unwrap();
429 assert_eq!(data, DATA);
430 }
431
432 #[test]
433 fn read_at_uninit() {
434 const DATA: &'static [u8] = b"vmo-contents";
435 let vmo = zx::Vmo::create(DATA.len() as u64).unwrap();
436 vmo.write(DATA, 0).unwrap();
437 let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
438
439 let mut data = Vec::with_capacity(5);
441 let bytes_read = stream
442 .read_at_uninit(StreamReadOptions::empty(), 0, data.spare_capacity_mut())
443 .unwrap();
444 assert_eq!(bytes_read, 5);
445 unsafe { data.set_len(5) };
446 assert_eq!(data, DATA[0..5]);
447
448 let mut data = Vec::with_capacity(10);
450 let bytes_read = stream
451 .read_at_uninit(StreamReadOptions::empty(), 5, data.spare_capacity_mut())
452 .unwrap();
453 assert_eq!(bytes_read, 7);
454 unsafe { data.set_len(7) };
455 assert_eq!(data, DATA[5..]);
456
457 let mut data = Vec::with_capacity(10);
459 let bytes_read = stream
460 .read_at_uninit(StreamReadOptions::empty(), 20, data.spare_capacity_mut())
461 .unwrap();
462 assert_eq!(bytes_read, 0);
463 }
464
465 #[test]
466 fn read_at_to_vec() {
467 const DATA: &'static [u8] = b"vmo-contents";
468 let vmo = zx::Vmo::create(DATA.len() as u64).unwrap();
469 vmo.write(DATA, 0).unwrap();
470 let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
471
472 let data = stream.read_at_to_vec(StreamReadOptions::empty(), 5, DATA.len()).unwrap();
473 assert_eq!(data, DATA[5..]);
474 }
475
476 #[test]
477 fn write() {
478 const DATA: &'static [u8] = b"vmo-contents";
479 let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::UNBOUNDED, 0).unwrap();
480 let stream =
481 Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
482
483 let bytes_written = stream.write(zx::StreamWriteOptions::empty(), DATA).unwrap();
484 assert_eq!(bytes_written, DATA.len());
485
486 let data = stream.read_at_to_vec(StreamReadOptions::empty(), 0, DATA.len()).unwrap();
487 assert_eq!(data, DATA);
488 }
489
490 #[test]
491 fn write_at() {
492 const DATA: &'static [u8] = b"vmo-contents";
493 let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::UNBOUNDED, 0).unwrap();
494 let stream =
495 Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
496
497 let bytes_written =
498 stream.write_at(zx::StreamWriteOptions::empty(), 0, &DATA[0..3]).unwrap();
499 assert_eq!(bytes_written, 3);
500
501 let bytes_written =
502 stream.write_at(zx::StreamWriteOptions::empty(), 3, &DATA[3..]).unwrap();
503 assert_eq!(bytes_written, DATA.len() - 3);
504
505 let data = stream.read_at_to_vec(StreamReadOptions::empty(), 0, DATA.len()).unwrap();
506 assert_eq!(data, DATA);
507 }
508
509 #[test]
510 fn std_io_read_write_seek() {
511 const DATA: &'static str = "stream-contents";
512 let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::UNBOUNDED, 0).unwrap();
513 let mut stream =
514 Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
515
516 std::io::Write::write_all(&mut stream, DATA.as_bytes()).unwrap();
517 assert_eq!(std::io::Seek::stream_position(&mut stream).unwrap(), DATA.len() as u64);
518 std::io::Seek::rewind(&mut stream).unwrap();
519 assert_eq!(std::io::read_to_string(&mut stream).unwrap(), DATA);
520 assert_eq!(std::io::Seek::stream_position(&mut stream).unwrap(), DATA.len() as u64);
521 }
522
523 #[test]
524 fn std_io_read_vectored() {
525 const DATA: &'static [u8] = b"stream-contents";
526 let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::UNBOUNDED, 0).unwrap();
527 let mut stream =
528 Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
529 assert_eq!(stream.write(StreamWriteOptions::empty(), DATA).unwrap(), DATA.len());
530 std::io::Seek::rewind(&mut stream).unwrap();
531
532 let mut buf1 = [0; 6];
533 let mut buf2 = [0; 1];
534 let mut buf3 = [0; 8];
535 let mut bufs = [
536 std::io::IoSliceMut::new(&mut buf1),
537 std::io::IoSliceMut::new(&mut buf2),
538 std::io::IoSliceMut::new(&mut buf3),
539 ];
540 assert_eq!(std::io::Read::read_vectored(&mut stream, &mut bufs).unwrap(), DATA.len());
541 assert_eq!(buf1, DATA[0..6]);
542 assert_eq!(buf2, DATA[6..7]);
543 assert_eq!(buf3, DATA[7..]);
544 }
545
546 #[test]
547 fn std_io_write_vectored() {
548 let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::UNBOUNDED, 0).unwrap();
549 let mut stream =
550 Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
551
552 let bufs = [
553 std::io::IoSlice::new(b"stream"),
554 std::io::IoSlice::new(b"-"),
555 std::io::IoSlice::new(b"contents"),
556 ];
557 assert_eq!(std::io::Write::write_vectored(&mut stream, &bufs).unwrap(), 15);
558 std::io::Seek::rewind(&mut stream).unwrap();
559 assert_eq!(stream.read_to_vec(StreamReadOptions::empty(), 15).unwrap(), b"stream-contents");
560 }
561}