1use fidl::endpoints::SynchronousProxy;
10use fidl_fuchsia_io as fio;
11use fuchsia_sync::Mutex;
12use smallvec::SmallVec;
13use std::ops::{ControlFlow, Range};
14use syncio::{AllocateMode, zxio_fsverity_descriptor_t, zxio_node_attributes_t};
15use zerocopy::FromBytes;
16
17fn zxio_attr_from_fidl(
21 mutable: &fio::MutableNodeAttributes,
22 immutable: &fio::ImmutableNodeAttributes,
23) -> zxio_node_attributes_t {
24 let mut out_attr = zxio_node_attributes_t::default();
25 if let Some(protocols) = immutable.protocols {
26 out_attr.protocols = protocols.bits();
27 out_attr.has.protocols = true;
28 }
29 if let Some(abilities) = immutable.abilities {
30 out_attr.abilities = abilities.bits();
31 out_attr.has.abilities = true;
32 }
33 if let Some(id) = immutable.id {
34 out_attr.id = id;
35 out_attr.has.id = true;
36 }
37 if let Some(content_size) = immutable.content_size {
38 out_attr.content_size = content_size;
39 out_attr.has.content_size = true;
40 }
41 if let Some(storage_size) = immutable.storage_size {
42 out_attr.storage_size = storage_size;
43 out_attr.has.storage_size = true;
44 }
45 if let Some(link_count) = immutable.link_count {
46 out_attr.link_count = link_count;
47 out_attr.has.link_count = true;
48 }
49 if let Some(creation_time) = mutable.creation_time {
50 out_attr.creation_time = creation_time;
51 out_attr.has.creation_time = true;
52 }
53 if let Some(modification_time) = mutable.modification_time {
54 out_attr.modification_time = modification_time;
55 out_attr.has.modification_time = true;
56 }
57 if let Some(access_time) = mutable.access_time {
58 out_attr.access_time = access_time;
59 out_attr.has.access_time = true;
60 }
61 if let Some(mode) = mutable.mode {
62 out_attr.mode = mode;
63 out_attr.has.mode = true;
64 }
65 if let Some(uid) = mutable.uid {
66 out_attr.uid = uid;
67 out_attr.has.uid = true;
68 }
69 if let Some(gid) = mutable.gid {
70 out_attr.gid = gid;
71 out_attr.has.gid = true;
72 }
73 if let Some(rdev) = mutable.rdev {
74 out_attr.rdev = rdev;
75 out_attr.has.rdev = true;
76 }
77 if let Some(change_time) = immutable.change_time {
78 out_attr.change_time = change_time;
79 out_attr.has.change_time = true;
80 }
81 if let Some(casefold) = mutable.casefold {
82 out_attr.casefold = casefold;
83 out_attr.has.casefold = true;
84 }
85 if let Some(verity_enabled) = immutable.verity_enabled {
86 out_attr.fsverity_enabled = verity_enabled;
87 out_attr.has.fsverity_enabled = true;
88 }
89 if let Some(wrapping_key_id) = mutable.wrapping_key_id {
90 out_attr.wrapping_key_id = wrapping_key_id;
91 out_attr.has.wrapping_key_id = true;
92 }
93 out_attr
94}
95
96pub trait Factory {
98 type Result;
99
100 fn create_node(self, io: RemoteIo, info: fio::NodeInfo) -> Self::Result;
101 fn create_directory(self, io: RemoteIo, info: fio::DirectoryInfo) -> Self::Result;
102 fn create_file(self, io: RemoteIo, info: fio::FileInfo) -> Self::Result;
103 fn create_symlink(self, io: RemoteIo, info: fio::SymlinkInfo) -> Self::Result;
104}
105
106pub fn create_with_on_representation<F: Factory>(
113 proxy: fio::NodeSynchronousProxy,
114 factory: F,
115) -> Result<F::Result, zx::Status> {
116 match proxy.wait_for_event(zx::MonotonicInstant::INFINITE) {
117 Ok(fio::NodeEvent::OnRepresentation { payload }) => match payload {
118 fio::Representation::Node(info) => Ok(factory.create_node(RemoteIo::new(proxy), info)),
119 fio::Representation::Directory(info) => {
120 Ok(factory.create_directory(RemoteIo::new(proxy), info))
121 }
122 fio::Representation::File(mut info) => {
123 let io = RemoteIo {
124 proxy,
125 stream: info
126 .stream
127 .take()
128 .map(zx::Stream::from)
129 .unwrap_or_else(|| zx::NullableHandle::invalid().into()),
130 };
131 Ok(factory.create_file(io, info))
132 }
133 fio::Representation::Symlink(info) => {
134 Ok(factory.create_symlink(RemoteIo::new(proxy), info))
135 }
136 _ => Err(zx::Status::NOT_SUPPORTED),
137 },
138 Err(fidl::Error::ClientChannelClosed { status, .. }) => Err(status),
139 _ => Err(zx::Status::IO),
140 }
141}
142
143pub struct RemoteIo {
149 proxy: fio::NodeSynchronousProxy,
150 stream: zx::Stream,
153}
154
155impl RemoteIo {
156 pub fn new(proxy: fio::NodeSynchronousProxy) -> Self {
157 Self { proxy, stream: zx::NullableHandle::invalid().into() }
158 }
159
160 pub fn with_stream(proxy: fio::NodeSynchronousProxy, stream: zx::Stream) -> Self {
161 Self { proxy, stream }
162 }
163
164 pub fn into_proxy(self) -> fio::NodeSynchronousProxy {
165 self.proxy
166 }
167
168 fn cast_proxy<T: From<zx::Channel> + Into<zx::NullableHandle>>(&self) -> zx::Unowned<'_, T> {
169 zx::Unowned::new(self.proxy.as_channel())
170 }
171
172 pub fn attr_get(
174 &self,
175 query: fio::NodeAttributesQuery,
176 ) -> Result<(fio::MutableNodeAttributes, fio::ImmutableNodeAttributes), zx::Status> {
177 self.proxy
178 .get_attributes(query, zx::MonotonicInstant::INFINITE)
179 .map_err(|_| zx::Status::IO)?
180 .map_err(zx::Status::from_raw)
181 }
182
183 pub fn attr_get_zxio(
188 &self,
189 query: fio::NodeAttributesQuery,
190 ) -> Result<zxio_node_attributes_t, zx::Status> {
191 self.attr_get(query).map(|(m, i)| zxio_attr_from_fidl(&m, &i))
192 }
193
194 pub fn attr_set(&self, attributes: fio::MutableNodeAttributes) -> Result<(), zx::Status> {
196 self.proxy
197 .update_attributes(&attributes, zx::MonotonicInstant::INFINITE)
198 .map_err(|_| zx::Status::IO)?
199 .map_err(zx::Status::from_raw)
200 }
201
202 pub fn open<F: Factory>(
204 &self,
205 path: &str,
206 flags: fio::Flags,
207 create_attributes: Option<fio::MutableNodeAttributes>,
208 query: fio::NodeAttributesQuery,
209 factory: F,
210 ) -> Result<F::Result, zx::Status> {
211 let (client_end, server_end) = zx::Channel::create();
212 let dir_proxy = self.cast_proxy::<fio::DirectorySynchronousProxy>();
213 dir_proxy
214 .open(
215 path,
216 flags | fio::Flags::FLAG_SEND_REPRESENTATION,
217 &fio::Options {
218 attributes: (!query.is_empty()).then_some(query),
219 create_attributes,
220 ..Default::default()
221 },
222 server_end,
223 )
224 .map_err(|_| zx::Status::IO)?;
225 create_with_on_representation(client_end.into(), factory)
226 }
227
228 pub fn open_pipelined<F: Factory>(
240 &self,
241 paths: &[&str],
242 flags: fio::Flags,
243 query: fio::NodeAttributesQuery,
244 mut factory_fn: impl FnMut() -> F,
245 ) -> impl Iterator<Item = Result<F::Result, zx::Status>> {
246 let mut proxy_to_result = move |proxy: fio::NodeSynchronousProxy| {
247 create_with_on_representation(proxy, factory_fn())
248 };
249
250 let mut proxies = SmallVec::<[fio::NodeSynchronousProxy; 8]>::new();
251 let (client_end, server_end) = zx::Channel::create();
252 proxies.push(fio::NodeSynchronousProxy::new(client_end));
253 let mut next_server_end = Some(server_end);
254
255 for (i, path) in paths.iter().enumerate().rev() {
256 let server_end = next_server_end.take().unwrap();
257 let channel = if i == 0 {
258 self.proxy.as_channel()
259 } else {
260 let (client_end, server_end) = zx::Channel::create();
261 next_server_end = Some(server_end);
262 proxies.push(fio::NodeSynchronousProxy::new(client_end));
263 proxies.last().unwrap().as_channel()
264 };
265 let dir_proxy = zx::Unowned::<fio::DirectorySynchronousProxy>::new(channel);
266
267 let mut open_flags = flags | fio::Flags::FLAG_SEND_REPRESENTATION;
268 if i < paths.len() - 1 {
269 open_flags |= fio::Flags::PROTOCOL_DIRECTORY | fio::Flags::PROTOCOL_SYMLINK;
270 }
271
272 if let Err(_) = dir_proxy.open(
273 path,
274 open_flags,
275 &fio::Options {
276 attributes: (!query.is_empty()).then_some(query),
277 ..Default::default()
278 },
279 server_end,
280 ) {
281 debug_assert!(i == 0);
284 }
287 }
288
289 let mut proxies = proxies.into_iter().rev();
290 std::iter::successors(proxies.next().map(&mut proxy_to_result), move |prev| match prev {
291 Err(_) => None,
292 Ok(_) => proxies.next().map(&mut proxy_to_result),
293 })
294 }
295
296 pub fn read_partial(&self, offset: u64, max: usize) -> Result<(Vec<u8>, bool), zx::Status> {
300 if self.stream.is_invalid() {
301 let file_proxy = self.cast_proxy::<fio::FileSynchronousProxy>();
302 let max = std::cmp::min(max as u64, fio::MAX_TRANSFER_SIZE);
303 let data = file_proxy
304 .read_at(max, offset, zx::MonotonicInstant::INFINITE)
305 .map_err(|_| zx::Status::IO)?
306 .map_err(zx::Status::from_raw)?;
307 let eof = (data.len() as u64) < max;
308 Ok((data, eof))
309 } else {
310 let bytes =
312 self.stream.read_at_to_vec(zx::StreamReadOptions::empty(), offset as u64, max)?;
313 let eof = bytes.len() < max;
314 Ok((bytes, eof))
315 }
316 }
317
318 pub fn read<E>(
323 &self,
324 offset: u64,
325 len: usize,
326 mut callback: impl FnMut(Vec<u8>) -> Result<usize, E>,
327 map_err: impl FnOnce(zx::Status) -> E,
328 ) -> Result<usize, E> {
329 let mut total = 0;
330 while total < len {
331 match self.read_partial(offset + total as u64, len - total) {
332 Ok((data, eof)) => {
333 if data.is_empty() {
334 break;
335 }
336 let data_len = data.len();
337 let written = callback(data)?;
338 total += written;
339 if eof || written < data_len {
340 break;
341 }
342 }
343 Err(e) => {
344 if total > 0 {
345 break;
346 }
347 return Err(map_err(e));
348 }
349 }
350 }
351 Ok(total)
352 }
353
354 pub fn write(&self, offset: u64, data: &[u8]) -> Result<usize, zx::Status> {
356 let file_proxy = self.cast_proxy::<fio::FileSynchronousProxy>();
357 let mut total_written = 0;
358 for chunk in data.chunks(fio::MAX_TRANSFER_SIZE as usize) {
359 let result = file_proxy
360 .write_at(chunk, offset + total_written as u64, zx::MonotonicInstant::INFINITE)
361 .map_err(|_| zx::Status::IO)
362 .and_then(|res| res.map_err(zx::Status::from_raw));
363 match result {
364 Ok(actual) => {
365 let actual = actual as usize;
366 total_written += actual;
367 if actual < chunk.len() {
368 return Ok(total_written);
369 }
370 }
371 Err(e) => {
372 if total_written > 0 {
373 break;
374 }
375 return Err(e);
376 }
377 }
378 }
379 Ok(total_written)
380 }
381
382 pub fn supports_vectored(&self) -> bool {
384 !self.stream.is_invalid()
386 }
387
388 pub unsafe fn readv(
395 &self,
396 offset: u64,
397 iovecs: &mut [zx::sys::zx_iovec_t],
398 ) -> Result<usize, zx::Status> {
399 if self.stream.is_invalid() {
400 return Err(zx::Status::NOT_SUPPORTED);
401 }
402 unsafe { self.stream.readv_at(zx::StreamReadOptions::empty(), offset as u64, iovecs) }
404 }
405
406 pub fn writev(&self, offset: u64, iovecs: &[zx::sys::zx_iovec_t]) -> Result<usize, zx::Status> {
409 if self.stream.is_invalid() {
410 return Err(zx::Status::NOT_SUPPORTED);
411 }
412 self.stream.writev_at(zx::StreamWriteOptions::empty(), offset, &iovecs)
413 }
414
415 pub fn truncate(&self, length: u64) -> Result<(), zx::Status> {
417 self.cast_proxy::<fio::FileSynchronousProxy>()
418 .resize(length, zx::MonotonicInstant::INFINITE)
419 .map_err(|_| zx::Status::IO)?
420 .map_err(zx::Status::from_raw)
421 }
422
423 pub fn vmo_get(&self, flags: zx::VmarFlags) -> Result<zx::Vmo, zx::Status> {
425 let mut fio_flags = fio::VmoFlags::empty();
426 if flags.contains(zx::VmarFlags::PERM_READ) {
427 fio_flags |= fio::VmoFlags::READ;
428 }
429 if flags.contains(zx::VmarFlags::PERM_WRITE) {
430 fio_flags |= fio::VmoFlags::WRITE;
431 }
432 if flags.contains(zx::VmarFlags::PERM_EXECUTE) {
433 fio_flags |= fio::VmoFlags::EXECUTE;
434 }
435 let file_proxy = self.cast_proxy::<fio::FileSynchronousProxy>();
436 let vmo = file_proxy
437 .get_backing_memory(fio_flags, zx::MonotonicInstant::INFINITE)
438 .map_err(|_| zx::Status::IO)?
439 .map_err(zx::Status::from_raw)?;
440 Ok(vmo)
441 }
442
443 pub fn sync(&self) -> Result<(), zx::Status> {
445 self.proxy
446 .sync(zx::MonotonicInstant::INFINITE)
447 .map_err(|_| zx::Status::IO)?
448 .map_err(zx::Status::from_raw)
449 }
450
451 pub fn close_and_update_access_time(self) {
453 let _ = self.proxy.get_attributes(
454 fio::NodeAttributesQuery::PENDING_ACCESS_TIME_UPDATE,
455 zx::MonotonicInstant::INFINITE_PAST,
456 );
457 }
458
459 pub fn clone_proxy(&self) -> Result<fio::NodeSynchronousProxy, zx::Status> {
461 let (client_end, server_end) = zx::Channel::create();
462 self.proxy.clone(server_end.into()).map_err(|_| zx::Status::IO)?;
463 Ok(client_end.into())
464 }
465
466 pub fn link_into(&self, target_dir: &Self, name: &str) -> Result<(), zx::Status> {
468 let target_dir_proxy = target_dir.cast_proxy::<fio::DirectorySynchronousProxy>();
469 let (status, token) = target_dir_proxy
470 .get_token(zx::MonotonicInstant::INFINITE)
471 .map_err(|_| zx::Status::IO)?;
472 zx::Status::ok(status)?;
473 let token = token.ok_or(zx::Status::NOT_SUPPORTED)?;
474
475 let linkable_proxy = self.cast_proxy::<fio::LinkableSynchronousProxy>();
477
478 linkable_proxy
479 .link_into(token.into(), name, zx::MonotonicInstant::INFINITE)
480 .map_err(|_| zx::Status::IO)?
481 .map_err(zx::Status::from_raw)
482 }
483
484 pub fn unlink(&self, name: &str, flags: fio::UnlinkFlags) -> Result<(), zx::Status> {
486 let options = fio::UnlinkOptions { flags: Some(flags), ..Default::default() };
487 let dir_proxy = self.cast_proxy::<fio::DirectorySynchronousProxy>();
488 dir_proxy
489 .unlink(name, &options, zx::MonotonicInstant::INFINITE)
490 .map_err(|_| zx::Status::IO)?
491 .map_err(zx::Status::from_raw)
492 }
493
494 pub fn rename(
496 &self,
497 old_path: &str,
498 new_directory: &Self,
499 new_path: &str,
500 ) -> Result<(), zx::Status> {
501 let new_dir_proxy = new_directory.cast_proxy::<fio::DirectorySynchronousProxy>();
502 let (status, token) =
503 new_dir_proxy.get_token(zx::MonotonicInstant::INFINITE).map_err(|_| zx::Status::IO)?;
504 zx::Status::ok(status)?;
505 let token = token.ok_or(zx::Status::NOT_SUPPORTED)?;
506 let dir_proxy = self.cast_proxy::<fio::DirectorySynchronousProxy>();
507 dir_proxy
508 .rename(old_path, token.into(), new_path, zx::MonotonicInstant::INFINITE)
509 .map_err(|_| zx::Status::IO)?
510 .map_err(zx::Status::from_raw)
511 }
512
513 pub fn create_symlink(&self, name: &str, target: &[u8]) -> Result<RemoteIo, zx::Status> {
515 let dir_proxy = self.cast_proxy::<fio::DirectorySynchronousProxy>();
516 let (client_end, server_end) = zx::Channel::create();
517 dir_proxy
518 .create_symlink(name, target, Some(server_end.into()), zx::MonotonicInstant::INFINITE)
519 .map_err(|_| zx::Status::IO)?
520 .map_err(zx::Status::from_raw)?;
521 Ok(RemoteIo::new(client_end.into()))
522 }
523
524 pub fn enable_verity(&self, descriptor: &zxio_fsverity_descriptor_t) -> Result<(), zx::Status> {
526 let file_proxy = self.cast_proxy::<fio::FileSynchronousProxy>();
527 let options = fio::VerificationOptions {
528 hash_algorithm: Some(match descriptor.hash_algorithm {
529 1 => fio::HashAlgorithm::Sha256,
530 2 => fio::HashAlgorithm::Sha512,
531 _ => return Err(zx::Status::INVALID_ARGS),
532 }),
533 salt: Some(descriptor.salt[..descriptor.salt_size as usize].to_vec()),
534 ..Default::default()
535 };
536 file_proxy
537 .enable_verity(&options, zx::MonotonicInstant::INFINITE)
538 .map_err(|_| zx::Status::IO)?
539 .map_err(zx::Status::from_raw)
540 }
541
542 pub fn allocate(&self, offset: u64, len: u64, mode: AllocateMode) -> Result<(), zx::Status> {
544 let file_proxy = self.cast_proxy::<fio::FileSynchronousProxy>();
545 let mut fio_mode = fio::AllocateMode::empty();
546 if mode.contains(AllocateMode::KEEP_SIZE) {
547 fio_mode |= fio::AllocateMode::KEEP_SIZE;
548 }
549 if mode.contains(AllocateMode::UNSHARE_RANGE) {
550 fio_mode |= fio::AllocateMode::UNSHARE_RANGE;
551 }
552 if mode.contains(AllocateMode::PUNCH_HOLE) {
553 fio_mode |= fio::AllocateMode::PUNCH_HOLE;
554 }
555 if mode.contains(AllocateMode::COLLAPSE_RANGE) {
556 fio_mode |= fio::AllocateMode::COLLAPSE_RANGE;
557 }
558 if mode.contains(AllocateMode::ZERO_RANGE) {
559 fio_mode |= fio::AllocateMode::ZERO_RANGE;
560 }
561 if mode.contains(AllocateMode::INSERT_RANGE) {
562 fio_mode |= fio::AllocateMode::INSERT_RANGE;
563 }
564 file_proxy
565 .allocate(offset, len, fio_mode, zx::MonotonicInstant::INFINITE)
566 .map_err(|_| zx::Status::IO)?
567 .map_err(zx::Status::from_raw)
568 }
569
570 pub fn xattr_get(&self, name: &[u8]) -> Result<Vec<u8>, zx::Status> {
572 let name_str = std::str::from_utf8(name.as_ref()).map_err(|_| zx::Status::INVALID_ARGS)?;
573 let result = self
574 .proxy
575 .get_extended_attribute(name_str.as_bytes(), zx::MonotonicInstant::INFINITE)
576 .map_err(|_| zx::Status::IO)?
577 .map_err(zx::Status::from_raw)?;
578 match result {
579 fio::ExtendedAttributeValue::Bytes(bytes) => Ok(bytes),
580 fio::ExtendedAttributeValue::Buffer(vmo) => {
581 let size = vmo.get_content_size().map_err(|_| zx::Status::IO)?;
582 let mut bytes = vec![0u8; size as usize];
583 vmo.read(&mut bytes, 0).map_err(|_| zx::Status::IO)?;
584 Ok(bytes)
585 }
586 _ => Err(zx::Status::NOT_SUPPORTED),
587 }
588 }
589
590 pub fn xattr_set(
592 &self,
593 name: &[u8],
594 value: &[u8],
595 mode: syncio::XattrSetMode,
596 ) -> Result<(), zx::Status> {
597 let val = fio::ExtendedAttributeValue::Bytes(value.to_vec());
598 let fidl_mode = match mode {
599 syncio::XattrSetMode::Set => fio::SetExtendedAttributeMode::Set,
600 syncio::XattrSetMode::Create => fio::SetExtendedAttributeMode::Create,
601 syncio::XattrSetMode::Replace => fio::SetExtendedAttributeMode::Replace,
602 };
603 self.proxy
604 .set_extended_attribute(name, val, fidl_mode, zx::MonotonicInstant::INFINITE)
605 .map_err(|_| zx::Status::IO)?
606 .map_err(zx::Status::from_raw)
607 }
608
609 pub fn xattr_remove(&self, name: &[u8]) -> Result<(), zx::Status> {
611 let name_str = std::str::from_utf8(name.as_ref()).map_err(|_| zx::Status::INVALID_ARGS)?;
612 self.proxy
613 .remove_extended_attribute(name_str.as_bytes(), zx::MonotonicInstant::INFINITE)
614 .map_err(|_| zx::Status::IO)?
615 .map_err(zx::Status::from_raw)
616 }
617
618 pub fn xattr_list(&self) -> Result<Vec<Vec<u8>>, zx::Status> {
620 let (client_end, server_end) = zx::Channel::create();
621 self.proxy.list_extended_attributes(server_end.into()).map_err(|_| zx::Status::IO)?;
622 let iterator = fio::ExtendedAttributeIteratorSynchronousProxy::new(client_end);
623 let mut all_attrs = vec![];
624 loop {
625 let (attributes, last) = iterator
626 .get_next(zx::MonotonicInstant::INFINITE)
627 .map_err(|_| zx::Status::IO)?
628 .map_err(zx::Status::from_raw)?;
629 all_attrs.extend(attributes);
630 if last {
631 break;
632 }
633 }
634 Ok(all_attrs)
635 }
636}
637
638pub struct RemoteDirectory {
643 proxy: fio::DirectorySynchronousProxy,
644 state: Mutex<State>,
645}
646
647#[derive(Default)]
648struct State {
649 buffer: Vec<u8>,
651
652 offset: usize,
654
655 pending_entry: Entry,
658
659 current_index: u64,
661}
662
663impl State {
664 fn name(&self, range: Range<usize>) -> &[u8] {
665 &self.buffer[range]
666 }
667
668 fn next(&mut self, proxy: &fio::DirectorySynchronousProxy) -> Result<Entry, zx::Status> {
671 let mut next_dirent = || -> Result<Entry, zx::Status> {
672 if self.offset >= self.buffer.len() {
673 match proxy.read_dirents(fio::MAX_BUF, zx::MonotonicInstant::INFINITE) {
674 Ok((status, dirents)) => {
675 zx::Status::ok(status)?;
676 if dirents.is_empty() {
677 return Ok(Entry::None);
678 }
679 self.buffer = dirents;
680 self.offset = 0;
681 }
682 Err(_) => return Err(zx::Status::IO),
683 }
684 }
685
686 #[repr(C, packed)]
687 #[derive(FromBytes)]
688 struct DirectoryEntry {
689 ino: u64,
690 name_len: u8,
691 entry_type: u8,
692 }
693
694 let Some((ino, name, entry_type)) =
695 DirectoryEntry::read_from_prefix(&self.buffer[self.offset..]).ok().and_then(
696 |(DirectoryEntry { ino, name_len, entry_type }, remainder)| {
697 let name_len = name_len as usize;
698 let name_start = self.offset + std::mem::size_of::<DirectoryEntry>();
699 (remainder.len() >= name_len).then_some((
700 ino,
701 name_start..name_start + name_len,
702 entry_type,
703 ))
704 },
705 )
706 else {
707 return Ok(Entry::None);
709 };
710
711 self.offset = name.end;
712
713 Ok(Entry::Some {
714 ino,
715 entry_type: fio::DirentType::from_primitive(entry_type).ok_or(zx::Status::IO)?,
716 name,
717 })
718 };
719
720 let mut next = self.pending_entry.take();
721 if let Entry::None = next {
722 next = next_dirent()?;
723 }
724 match &next {
728 Entry::Some { name, .. } if self.name(name.clone()) == b"." => {
729 self.pending_entry = Entry::DotDot;
730 }
731 _ => {}
732 }
733 self.current_index += 1;
734 Ok(next)
735 }
736
737 fn rewind(&mut self, proxy: &fio::DirectorySynchronousProxy) -> Result<(), zx::Status> {
738 self.pending_entry = Entry::None;
739 let status = proxy.rewind(zx::MonotonicInstant::INFINITE).map_err(|_| zx::Status::IO)?;
740 zx::Status::ok(status)?;
741 self.buffer.clear();
742 self.offset = 0;
743 self.current_index = 0;
744 Ok(())
745 }
746}
747
748#[derive(Default)]
749enum Entry {
750 #[default]
752 None,
753
754 Some {
755 ino: u64,
756 entry_type: fio::DirentType,
757 name: Range<usize>,
758 },
759
760 DotDot,
762}
763
764impl Entry {
765 fn take(&mut self) -> Entry {
766 std::mem::replace(self, Entry::None)
767 }
768}
769
770impl RemoteDirectory {
771 pub fn new(proxy: fio::DirectorySynchronousProxy) -> Self {
772 Self { proxy, state: Mutex::default() }
773 }
774
775 pub fn seek(&self, new_index: u64) -> Result<u64, zx::Status> {
777 let mut state = self.state.lock();
778
779 if new_index < state.current_index {
780 state.rewind(&self.proxy)?;
783 state.current_index = 0;
784 }
785
786 for i in state.current_index..new_index {
788 match state.next(&self.proxy) {
789 Ok(Entry::Some { .. } | Entry::DotDot) => {}
790 Ok(Entry::None) => break, Err(_) => {
792 return Ok(i);
798 }
799 }
800 }
801
802 Ok(new_index)
803 }
804
805 pub fn readdir<B, S: FnMut(u64, fio::DirentType, &[u8]) -> ControlFlow<B, ()>>(
809 &self,
810 mut sink: S,
811 ) -> Result<Option<B>, zx::Status> {
812 let mut state = self.state.lock();
813 loop {
814 let entry = state.next(&self.proxy)?;
815 if let ControlFlow::Break(b) = match &entry {
816 Entry::Some { ino, entry_type, name } => {
817 sink(*ino, *entry_type, state.name(name.clone()))
818 }
819 Entry::DotDot => sink(0, fio::DirentType::Directory, b".."),
820 Entry::None => break,
821 } {
822 state.pending_entry = entry;
823 return Ok(Some(b));
824 }
825 }
826 Ok(None)
827 }
828
829 pub fn sync(&self) -> Result<(), zx::Status> {
831 self.proxy
832 .sync(zx::MonotonicInstant::INFINITE)
833 .map_err(|_| zx::Status::IO)?
834 .map_err(zx::Status::from_raw)
835 }
836
837 pub fn clone_proxy(&self) -> Result<fio::DirectorySynchronousProxy, zx::Status> {
839 let (client_end, server_end) = zx::Channel::create();
840 self.proxy.clone(server_end.into()).map_err(|_| zx::Status::IO)?;
841 Ok(client_end.into())
842 }
843}
844
845#[cfg(test)]
846mod tests {
847 use super::*;
848 use fidl::endpoints::{ControlHandle, RequestStream};
849 use fuchsia_async as fasync;
850 use futures::StreamExt;
851 use std::sync::Arc;
852 use std::sync::atomic::{AtomicU64, Ordering};
853
854 #[fuchsia::test]
855 async fn test_read_chunking() {
856 let (client, mut stream) = fidl::endpoints::create_request_stream::<fio::FileMarker>();
857 let content = vec![0xAB; (fio::MAX_TRANSFER_SIZE + 100) as usize];
858 let content_clone = content.clone();
859
860 let _server_task = fasync::Task::spawn(async move {
861 while let Some(Ok(request)) = stream.next().await {
862 match request {
863 fio::FileRequest::ReadAt { count, offset, responder } => {
864 let start = offset as usize;
865 let end = std::cmp::min(start + count as usize, content_clone.len());
866 let data = if start < content_clone.len() {
867 &content_clone[start..end]
868 } else {
869 &[]
870 };
871 responder.send(Ok(data)).unwrap();
872 }
873 _ => panic!("Unexpected request: {:?}", request),
874 }
875 }
876 });
877
878 let io = RemoteIo::new(client.into_channel().into());
879 fasync::unblock(move || {
880 let mut data = Vec::new();
881 let actual = io
882 .read(
883 0,
884 content.len(),
885 |chunk| -> Result<usize, zx::Status> {
886 let len = chunk.len();
887 data.extend(chunk);
888 Ok(len)
889 },
890 |status| status,
891 )
892 .unwrap();
893 assert_eq!(actual, content.len());
894 assert_eq!(data, content);
895 })
896 .await;
897 }
898
899 #[fuchsia::test]
900 async fn test_read_error_after_data() {
901 let (client, mut stream) = fidl::endpoints::create_request_stream::<fio::FileMarker>();
902 let chunk_size = 100;
903 let content = vec![0xAA; chunk_size];
904 let content_clone = content.clone();
905
906 let _server_task = fasync::Task::spawn(async move {
907 let mut request_count = 0;
908 while let Some(Ok(request)) = stream.next().await {
909 match request {
910 fio::FileRequest::ReadAt { count: _, offset: _, responder } => {
911 request_count += 1;
912 if request_count == 1 {
913 responder.send(Ok(&content_clone)).unwrap();
914 } else {
915 responder.send(Err(zx::sys::ZX_ERR_IO)).unwrap();
916 }
917 }
918 _ => panic!("Unexpected request: {:?}", request),
919 }
920 }
921 });
922
923 let io = RemoteIo::new(client.into_channel().into());
924 fasync::unblock(move || {
925 let mut data = Vec::new();
926 let actual = io
928 .read(
929 0,
930 chunk_size * 2,
931 |chunk| -> Result<usize, zx::Status> {
932 let len = chunk.len();
933 data.extend(chunk);
934 Ok(len)
935 },
936 |status| status,
937 )
938 .expect("read should succeed even if later chunks fail");
939 assert_eq!(actual, chunk_size);
940 assert_eq!(data.len(), chunk_size);
941 assert_eq!(data, content);
942 })
943 .await;
944 }
945
946 #[fuchsia::test]
947 async fn test_write_chunking() {
948 let (client, mut stream) = fidl::endpoints::create_request_stream::<fio::FileMarker>();
949 let content = vec![0xCD; (fio::MAX_TRANSFER_SIZE + 100) as usize];
950 let content2 = content.clone();
951
952 let server_task = fasync::Task::spawn(async move {
953 let mut written = vec![0; content2.len()];
954 while let Some(Ok(request)) = stream.next().await {
955 match request {
956 fio::FileRequest::WriteAt { offset, data, responder, .. } => {
957 let offset = offset as usize;
958 written[offset..offset + data.len()].copy_from_slice(&data);
959 responder.send(Ok(data.len() as u64)).unwrap();
960 }
961 _ => panic!("Unexpected request: {:?}", request),
962 }
963 }
964 assert_eq!(written, content2);
965 });
966
967 let io = RemoteIo::new(client.into_channel().into());
968 fasync::unblock(move || {
969 let written = io.write(0, &content).expect("write failed");
970 assert_eq!(written, content.len());
971 })
972 .await;
973
974 server_task.await;
975 }
976
977 #[fuchsia::test]
978 async fn test_write_error_after_data() {
979 let (client, mut stream) = fidl::endpoints::create_request_stream::<fio::FileMarker>();
980 let chunk_size = fio::MAX_TRANSFER_SIZE as usize;
981 let content = vec![0xEE; chunk_size * 2];
982
983 let _server_task = fasync::Task::spawn(async move {
984 let mut request_count = 0;
985 while let Some(Ok(request)) = stream.next().await {
986 match request {
987 fio::FileRequest::WriteAt { offset: _, data, responder, .. } => {
988 request_count += 1;
989 if request_count == 1 {
990 responder.send(Ok(data.len() as u64)).unwrap();
991 } else {
992 responder.send(Err(zx::sys::ZX_ERR_IO)).unwrap();
993 }
994 }
995 _ => panic!("Unexpected request: {:?}", request),
996 }
997 }
998 });
999
1000 let io = RemoteIo::new(client.into_channel().into());
1001 fasync::unblock(move || {
1002 let written = io.write(0, &content).expect("write should succeed partial");
1003 assert_eq!(written, chunk_size);
1004 })
1005 .await;
1006 }
1007
1008 #[fuchsia::test]
1009 async fn test_large_directory() {
1010 let (client, mut stream) = fidl::endpoints::create_request_stream::<fio::DirectoryMarker>();
1011 let num_entries = 2000;
1012
1013 let task = fasync::Task::spawn(async move {
1014 let mut sent_count = 0;
1015 let mut num_requests = 0;
1016 while let Some(Ok(request)) = stream.next().await {
1017 match request {
1018 fio::DirectoryRequest::ReadDirents { max_bytes, responder } => {
1019 num_requests += 1;
1020 let mut buffer = Vec::new();
1021 while sent_count < num_entries {
1022 let name = if sent_count == 0 {
1023 ".".to_string()
1024 } else {
1025 format!("file_{}", sent_count - 1)
1026 };
1027 let name_bytes = name.as_bytes();
1028 let entry_size = 10 + name_bytes.len();
1029 if buffer.len() + entry_size > max_bytes as usize {
1030 break;
1031 }
1032 buffer.extend_from_slice(&(sent_count as u64 + 1).to_le_bytes());
1033 buffer.push(name_bytes.len() as u8);
1034 let entry_type = if sent_count == 0 {
1035 fio::DirentType::Directory
1036 } else {
1037 fio::DirentType::File
1038 };
1039 buffer.push(entry_type.into_primitive());
1040 buffer.extend_from_slice(name_bytes);
1041 sent_count += 1;
1042 }
1043 let _ = responder.send(0, &buffer);
1044 }
1045 fio::DirectoryRequest::Rewind { responder } => {
1046 sent_count = 0;
1047 let _ = responder.send(0);
1048 }
1049 fio::DirectoryRequest::Close { responder } => {
1050 let _ = responder.send(Ok(()));
1051 }
1052 _ => {}
1053 }
1054 }
1055 assert!(num_requests > 0);
1056 });
1057
1058 let dir = RemoteDirectory::new(client.into_channel().into());
1059 let count = Arc::new(AtomicU64::new(0));
1060 let count2 = count.clone();
1061 fasync::unblock(move || {
1062 dir.readdir::<(), _>(|_ino, _type, _name| {
1063 count.fetch_add(1, Ordering::Relaxed);
1064 ControlFlow::Continue(())
1065 })
1066 .unwrap();
1067 })
1068 .await;
1069 assert_eq!(count2.load(Ordering::Relaxed), num_entries + 1);
1071 task.await;
1072 }
1073
1074 #[fuchsia::test]
1075 async fn test_seek_backwards() {
1076 let (client, mut stream) = fidl::endpoints::create_request_stream::<fio::DirectoryMarker>();
1077 let _server_task = fasync::Task::spawn(async move {
1078 let entries = vec![
1079 (1, fio::DirentType::Directory, "."),
1080 (2, fio::DirentType::File, "file_0"),
1081 (3, fio::DirentType::File, "file_1"),
1082 (4, fio::DirentType::File, "file_2"),
1083 ];
1084 let mut current_entry = 0;
1085
1086 while let Some(Ok(request)) = stream.next().await {
1087 match request {
1088 fio::DirectoryRequest::ReadDirents { max_bytes, responder } => {
1089 let mut buffer = Vec::new();
1090 while current_entry < entries.len() {
1091 let (ino, type_, name) = entries[current_entry];
1092 let name_bytes = name.as_bytes();
1093 let entry_size = 10 + name_bytes.len();
1094 if buffer.len() + entry_size > max_bytes as usize {
1095 break;
1096 }
1097 buffer.extend_from_slice(&(ino as u64).to_le_bytes());
1098 buffer.push(name_bytes.len() as u8);
1099 buffer.push(type_.into_primitive());
1100 buffer.extend_from_slice(name_bytes);
1101 current_entry += 1;
1102 }
1103 responder.send(0, &buffer).unwrap();
1104 }
1105 fio::DirectoryRequest::Rewind { responder } => {
1106 current_entry = 0;
1107 responder.send(0).unwrap();
1108 }
1109 _ => {}
1110 }
1111 }
1112 });
1113
1114 let dir = RemoteDirectory::new(client.into_channel().into());
1115 fasync::unblock(move || {
1116 let mut names = Vec::new();
1117 dir.readdir::<(), _>(|_ino, _type, name| {
1119 names.push(name.to_vec());
1120 if names.len() == 3 { ControlFlow::Break(()) } else { ControlFlow::Continue(()) }
1121 })
1122 .unwrap();
1123
1124 assert_eq!(names[0], b".");
1125 assert_eq!(names[1], b"..");
1126 assert_eq!(names[2], b"file_0");
1127
1128 dir.seek(1).unwrap();
1131
1132 let mut names_after_seek = Vec::new();
1133 dir.readdir::<(), _>(|_ino, _type, name| {
1135 names_after_seek.push(name.to_vec());
1136 if names_after_seek.len() == 2 {
1137 ControlFlow::Break(())
1138 } else {
1139 ControlFlow::Continue(())
1140 }
1141 })
1142 .unwrap();
1143
1144 assert_eq!(names_after_seek[0], b"..");
1145 assert_eq!(names_after_seek[1], b"file_0");
1146 })
1147 .await;
1148 }
1149
1150 struct DummyFactory;
1151
1152 impl Factory for DummyFactory {
1153 type Result = RemoteIo;
1154 fn create_node(self, io: RemoteIo, _info: fio::NodeInfo) -> Self::Result {
1155 io
1156 }
1157 fn create_directory(self, io: RemoteIo, _info: fio::DirectoryInfo) -> Self::Result {
1158 io
1159 }
1160 fn create_file(self, io: RemoteIo, _info: fio::FileInfo) -> Self::Result {
1161 io
1162 }
1163 fn create_symlink(self, io: RemoteIo, _info: fio::SymlinkInfo) -> Self::Result {
1164 io
1165 }
1166 }
1167
1168 #[fuchsia::test]
1169 async fn test_open_pipelined() {
1170 let (client, stream) = fidl::endpoints::create_request_stream::<fio::DirectoryMarker>();
1171
1172 fn serve_mock_directory(
1173 mut stream: fio::DirectoryRequestStream,
1174 mut expected_paths: Vec<String>,
1175 ) {
1176 fasync::Task::spawn(async move {
1177 if let Some(Ok(request)) = stream.next().await {
1178 match request {
1179 fio::DirectoryRequest::Open { path, flags, options: _, object, .. } => {
1180 if !expected_paths.is_empty() {
1181 let expected = expected_paths.remove(0);
1182 assert_eq!(path, expected);
1183 if expected == "path1" {
1184 assert!(flags.contains(fio::Flags::PROTOCOL_DIRECTORY));
1185 } else if expected == "path2" {
1186 assert!(!flags.contains(fio::Flags::PROTOCOL_DIRECTORY));
1187 }
1188 }
1189 let server_end =
1190 fidl::endpoints::ServerEnd::<fio::DirectoryMarker>::new(object);
1191 let dir_stream = server_end.into_stream();
1192 let control_handle = dir_stream.control_handle();
1193 let representation =
1194 fio::Representation::Directory(fio::DirectoryInfo::default());
1195 control_handle.send_on_representation(representation).unwrap();
1196
1197 serve_mock_directory(dir_stream, expected_paths);
1198 }
1199 _ => {}
1200 }
1201 }
1202 })
1203 .detach();
1204 }
1205
1206 serve_mock_directory(stream, vec!["path1".to_string(), "path2".to_string()]);
1207
1208 let io = RemoteIo::new(client.into_channel().into());
1209 fasync::unblock(move || {
1210 let results = io
1211 .open_pipelined(
1212 &["path1", "path2"],
1213 fio::Flags::empty(),
1214 fio::NodeAttributesQuery::empty(),
1215 || DummyFactory,
1216 )
1217 .collect::<Vec<_>>();
1218 assert_eq!(results.len(), 2);
1219 assert!(results.iter().all(|r| r.is_ok()));
1220 })
1221 .await;
1222 }
1223
1224 #[fuchsia::test]
1225 async fn test_open_pipelined_not_found() {
1226 let (client, stream) = fidl::endpoints::create_request_stream::<fio::DirectoryMarker>();
1227
1228 fn serve_mock_directory_not_found(mut stream: fio::DirectoryRequestStream) {
1229 fasync::Task::spawn(async move {
1230 if let Some(Ok(request)) = stream.next().await {
1231 match request {
1232 fio::DirectoryRequest::Open {
1233 path, flags: _, options: _, object, ..
1234 } => {
1235 let server_end =
1236 fidl::endpoints::ServerEnd::<fio::DirectoryMarker>::new(object);
1237 let dir_stream = server_end.into_stream();
1238 let control_handle = dir_stream.control_handle();
1239
1240 if path == "not_found" {
1241 control_handle.shutdown_with_epitaph(zx::Status::NOT_FOUND);
1242 } else {
1243 let representation =
1244 fio::Representation::Directory(fio::DirectoryInfo::default());
1245 control_handle.send_on_representation(representation).unwrap();
1246 serve_mock_directory_not_found(dir_stream);
1247 }
1248 }
1249 _ => {}
1250 }
1251 }
1252 })
1253 .detach();
1254 }
1255
1256 serve_mock_directory_not_found(stream);
1257
1258 let io = RemoteIo::new(client.into_channel().into());
1259 fasync::unblock(move || {
1260 let results = io
1261 .open_pipelined(
1262 &["path1", "not_found", "path3"],
1263 fio::Flags::empty(),
1264 fio::NodeAttributesQuery::empty(),
1265 || DummyFactory,
1266 )
1267 .collect::<Vec<_>>();
1268 assert_eq!(results.len(), 2);
1269 assert!(results[0].is_ok());
1270 assert_eq!(results[1].as_ref().err(), Some(&zx::Status::NOT_FOUND));
1271 })
1272 .await;
1273 }
1274
1275 #[fuchsia::test]
1276 async fn test_open_pipelined_peer_closed() {
1277 let (client, stream) = fidl::endpoints::create_request_stream::<fio::DirectoryMarker>();
1278
1279 fn serve_mock_directory_close_early(mut stream: fio::DirectoryRequestStream) {
1280 fasync::Task::spawn(async move {
1281 if let Some(Ok(request)) = stream.next().await {
1282 match request {
1283 fio::DirectoryRequest::Open { object, .. } => {
1284 drop(object);
1286 }
1287 _ => {}
1288 }
1289 }
1290 })
1291 .detach();
1292 }
1293
1294 serve_mock_directory_close_early(stream);
1295
1296 let io = RemoteIo::new(client.into_channel().into());
1297 fasync::unblock(move || {
1298 let results = io
1299 .open_pipelined(
1300 &["path1", "path2"],
1301 fio::Flags::empty(),
1302 fio::NodeAttributesQuery::empty(),
1303 || DummyFactory,
1304 )
1305 .collect::<Vec<_>>();
1306 assert_eq!(results.len(), 1);
1307 assert_eq!(results[0].as_ref().err(), Some(&zx::Status::PEER_CLOSED));
1308 })
1309 .await;
1310 }
1311}