1use fidl::endpoints::SynchronousProxy;
10use fidl_fuchsia_io as fio;
11use fuchsia_sync::Mutex;
12use std::ops::{ControlFlow, Range};
13use syncio::{AllocateMode, zxio_fsverity_descriptor_t, zxio_node_attributes_t};
14use zerocopy::FromBytes;
15
16fn zxio_attr_from_fidl_attributes(in_attr: &fio::NodeAttributes2) -> zxio_node_attributes_t {
17 zxio_attr_from_fidl(&in_attr.mutable_attributes, &in_attr.immutable_attributes)
18}
19
20fn zxio_attr_from_fidl(
24 mutable: &fio::MutableNodeAttributes,
25 immutable: &fio::ImmutableNodeAttributes,
26) -> zxio_node_attributes_t {
27 let mut out_attr = zxio_node_attributes_t::default();
28 if let Some(protocols) = immutable.protocols {
29 out_attr.protocols = protocols.bits();
30 out_attr.has.protocols = true;
31 }
32 if let Some(abilities) = immutable.abilities {
33 out_attr.abilities = abilities.bits();
34 out_attr.has.abilities = true;
35 }
36 if let Some(id) = immutable.id {
37 out_attr.id = id;
38 out_attr.has.id = true;
39 }
40 if let Some(content_size) = immutable.content_size {
41 out_attr.content_size = content_size;
42 out_attr.has.content_size = true;
43 }
44 if let Some(storage_size) = immutable.storage_size {
45 out_attr.storage_size = storage_size;
46 out_attr.has.storage_size = true;
47 }
48 if let Some(link_count) = immutable.link_count {
49 out_attr.link_count = link_count;
50 out_attr.has.link_count = true;
51 }
52 if let Some(creation_time) = mutable.creation_time {
53 out_attr.creation_time = creation_time;
54 out_attr.has.creation_time = true;
55 }
56 if let Some(modification_time) = mutable.modification_time {
57 out_attr.modification_time = modification_time;
58 out_attr.has.modification_time = true;
59 }
60 if let Some(access_time) = mutable.access_time {
61 out_attr.access_time = access_time;
62 out_attr.has.access_time = true;
63 }
64 if let Some(mode) = mutable.mode {
65 out_attr.mode = mode;
66 out_attr.has.mode = true;
67 }
68 if let Some(uid) = mutable.uid {
69 out_attr.uid = uid;
70 out_attr.has.uid = true;
71 }
72 if let Some(gid) = mutable.gid {
73 out_attr.gid = gid;
74 out_attr.has.gid = true;
75 }
76 if let Some(rdev) = mutable.rdev {
77 out_attr.rdev = rdev;
78 out_attr.has.rdev = true;
79 }
80 if let Some(change_time) = immutable.change_time {
81 out_attr.change_time = change_time;
82 out_attr.has.change_time = true;
83 }
84 if let Some(casefold) = mutable.casefold {
85 out_attr.casefold = casefold;
86 out_attr.has.casefold = true;
87 }
88 if let Some(verity_enabled) = immutable.verity_enabled {
89 out_attr.fsverity_enabled = verity_enabled;
90 out_attr.has.fsverity_enabled = true;
91 }
92 if let Some(wrapping_key_id) = mutable.wrapping_key_id {
93 out_attr.wrapping_key_id = wrapping_key_id;
94 out_attr.has.wrapping_key_id = true;
95 }
96 out_attr
97}
98
99pub trait Factory {
101 type Result;
102
103 fn create_node(self, io: RemoteIo) -> Self::Result;
104 fn create_directory(self, io: RemoteIo) -> Self::Result;
105 fn create_file(self, io: RemoteIo, info: &fio::FileInfo) -> Self::Result;
106 fn create_symlink(self, io: RemoteIo, target: Vec<u8>) -> Self::Result;
107}
108
109pub fn create_with_on_representation<F: Factory>(
116 proxy: fio::NodeSynchronousProxy,
117 factory: F,
118) -> Result<(F::Result, zxio_node_attributes_t, Option<fio::SelinuxContext>), zx::Status> {
119 match proxy.wait_for_event(zx::MonotonicInstant::INFINITE) {
120 Ok(fio::NodeEvent::OnRepresentation { mut payload }) => {
121 let (result, attrs) = match &mut payload {
122 fio::Representation::Node(info) => {
123 (factory.create_node(RemoteIo::new(proxy)), &mut info.attributes)
124 }
125 fio::Representation::Directory(info) => {
126 (factory.create_directory(RemoteIo::new(proxy)), &mut info.attributes)
127 }
128 fio::Representation::File(info) => (
129 factory.create_file(
130 RemoteIo {
131 proxy,
132 stream: info
133 .stream
134 .take()
135 .map(zx::Stream::from)
136 .unwrap_or_else(|| zx::NullableHandle::invalid().into()),
137 },
138 info,
139 ),
140 &mut info.attributes,
141 ),
142 fio::Representation::Symlink(info) => {
143 let Some(target) = info.target.take() else {
144 return Err(zx::Status::IO);
145 };
146 (factory.create_symlink(RemoteIo::new(proxy), target), &mut info.attributes)
147 }
148 _ => return Err(zx::Status::NOT_SUPPORTED),
149 };
150 let (attrs, context) = attrs
151 .as_mut()
152 .map(|a| {
153 (zxio_attr_from_fidl_attributes(a), a.mutable_attributes.selinux_context.take())
154 })
155 .unwrap_or_default();
156 Ok((result, attrs, context))
157 }
158 Err(fidl::Error::ClientChannelClosed { status, .. }) => Err(status),
159 _ => Err(zx::Status::IO),
160 }
161}
162
163pub struct RemoteIo {
169 proxy: fio::NodeSynchronousProxy,
170 stream: zx::Stream,
173}
174
175impl RemoteIo {
176 pub fn new(proxy: fio::NodeSynchronousProxy) -> Self {
177 Self { proxy, stream: zx::NullableHandle::invalid().into() }
178 }
179
180 pub fn with_stream(proxy: fio::NodeSynchronousProxy, stream: zx::Stream) -> Self {
181 Self { proxy, stream }
182 }
183
184 pub fn into_proxy(self) -> fio::NodeSynchronousProxy {
185 self.proxy
186 }
187
188 fn cast_proxy<T: From<zx::Channel> + Into<zx::NullableHandle>>(&self) -> zx::Unowned<'_, T> {
189 zx::Unowned::new(self.proxy.as_channel())
190 }
191
192 pub fn attr_get(
194 &self,
195 query: fio::NodeAttributesQuery,
196 ) -> Result<(fio::MutableNodeAttributes, fio::ImmutableNodeAttributes), zx::Status> {
197 self.proxy
198 .get_attributes(query, zx::MonotonicInstant::INFINITE)
199 .map_err(|_| zx::Status::IO)?
200 .map_err(zx::Status::from_raw)
201 }
202
203 pub fn attr_get_zxio(
208 &self,
209 query: fio::NodeAttributesQuery,
210 ) -> Result<zxio_node_attributes_t, zx::Status> {
211 self.attr_get(query).map(|(m, i)| zxio_attr_from_fidl(&m, &i))
212 }
213
214 pub fn attr_set(&self, attributes: fio::MutableNodeAttributes) -> Result<(), zx::Status> {
216 self.proxy
217 .update_attributes(&attributes, zx::MonotonicInstant::INFINITE)
218 .map_err(|_| zx::Status::IO)?
219 .map_err(zx::Status::from_raw)
220 }
221
222 pub fn open<F: Factory>(
224 &self,
225 path: &str,
226 flags: fio::Flags,
227 create_attributes: Option<fio::MutableNodeAttributes>,
228 query: fio::NodeAttributesQuery,
229 factory: F,
230 ) -> Result<(F::Result, zxio_node_attributes_t, Option<fio::SelinuxContext>), zx::Status> {
231 let (client_end, server_end) = zx::Channel::create();
232 let dir_proxy = self.cast_proxy::<fio::DirectorySynchronousProxy>();
233 dir_proxy
234 .open(
235 path,
236 flags | fio::Flags::FLAG_SEND_REPRESENTATION,
237 &fio::Options {
238 attributes: (!query.is_empty()).then_some(query),
239 create_attributes,
240 ..Default::default()
241 },
242 server_end,
243 )
244 .map_err(|_| zx::Status::IO)?;
245 create_with_on_representation(client_end.into(), factory)
246 }
247
248 pub fn read_partial(&self, offset: u64, max: usize) -> Result<(Vec<u8>, bool), zx::Status> {
252 if self.stream.is_invalid() {
253 let file_proxy = self.cast_proxy::<fio::FileSynchronousProxy>();
254 let max = std::cmp::min(max as u64, fio::MAX_TRANSFER_SIZE);
255 let data = file_proxy
256 .read_at(max, offset, zx::MonotonicInstant::INFINITE)
257 .map_err(|_| zx::Status::IO)?
258 .map_err(zx::Status::from_raw)?;
259 let eof = (data.len() as u64) < max;
260 Ok((data, eof))
261 } else {
262 let bytes =
264 self.stream.read_at_to_vec(zx::StreamReadOptions::empty(), offset as u64, max)?;
265 let eof = bytes.len() < max;
266 Ok((bytes, eof))
267 }
268 }
269
270 pub fn read<E>(
275 &self,
276 offset: u64,
277 len: usize,
278 mut callback: impl FnMut(Vec<u8>) -> Result<usize, E>,
279 map_err: impl FnOnce(zx::Status) -> E,
280 ) -> Result<usize, E> {
281 let mut total = 0;
282 while total < len {
283 match self.read_partial(offset + total as u64, len - total) {
284 Ok((data, eof)) => {
285 if data.is_empty() {
286 break;
287 }
288 let data_len = data.len();
289 let written = callback(data)?;
290 total += written;
291 if eof || written < data_len {
292 break;
293 }
294 }
295 Err(e) => {
296 if total > 0 {
297 break;
298 }
299 return Err(map_err(e));
300 }
301 }
302 }
303 Ok(total)
304 }
305
306 pub fn write(&self, offset: u64, data: &[u8]) -> Result<usize, zx::Status> {
308 let file_proxy = self.cast_proxy::<fio::FileSynchronousProxy>();
309 let mut total_written = 0;
310 for chunk in data.chunks(fio::MAX_TRANSFER_SIZE as usize) {
311 let result = file_proxy
312 .write_at(chunk, offset + total_written as u64, zx::MonotonicInstant::INFINITE)
313 .map_err(|_| zx::Status::IO)
314 .and_then(|res| res.map_err(zx::Status::from_raw));
315 match result {
316 Ok(actual) => {
317 let actual = actual as usize;
318 total_written += actual;
319 if actual < chunk.len() {
320 return Ok(total_written);
321 }
322 }
323 Err(e) => {
324 if total_written > 0 {
325 break;
326 }
327 return Err(e);
328 }
329 }
330 }
331 Ok(total_written)
332 }
333
334 pub fn supports_vectored(&self) -> bool {
336 !self.stream.is_invalid()
338 }
339
340 pub unsafe fn readv(
347 &self,
348 offset: u64,
349 iovecs: &mut [zx::sys::zx_iovec_t],
350 ) -> Result<usize, zx::Status> {
351 if self.stream.is_invalid() {
352 return Err(zx::Status::NOT_SUPPORTED);
353 }
354 unsafe { self.stream.readv_at(zx::StreamReadOptions::empty(), offset as u64, iovecs) }
356 }
357
358 pub fn writev(&self, offset: u64, iovecs: &[zx::sys::zx_iovec_t]) -> Result<usize, zx::Status> {
361 if self.stream.is_invalid() {
362 return Err(zx::Status::NOT_SUPPORTED);
363 }
364 self.stream.writev_at(zx::StreamWriteOptions::empty(), offset, &iovecs)
365 }
366
367 pub fn truncate(&self, length: u64) -> Result<(), zx::Status> {
369 self.cast_proxy::<fio::FileSynchronousProxy>()
370 .resize(length, zx::MonotonicInstant::INFINITE)
371 .map_err(|_| zx::Status::IO)?
372 .map_err(zx::Status::from_raw)
373 }
374
375 pub fn vmo_get(&self, flags: zx::VmarFlags) -> Result<zx::Vmo, zx::Status> {
377 let mut fio_flags = fio::VmoFlags::empty();
378 if flags.contains(zx::VmarFlags::PERM_READ) {
379 fio_flags |= fio::VmoFlags::READ;
380 }
381 if flags.contains(zx::VmarFlags::PERM_WRITE) {
382 fio_flags |= fio::VmoFlags::WRITE;
383 }
384 if flags.contains(zx::VmarFlags::PERM_EXECUTE) {
385 fio_flags |= fio::VmoFlags::EXECUTE;
386 }
387 let file_proxy = self.cast_proxy::<fio::FileSynchronousProxy>();
388 let vmo = file_proxy
389 .get_backing_memory(fio_flags, zx::MonotonicInstant::INFINITE)
390 .map_err(|_| zx::Status::IO)?
391 .map_err(zx::Status::from_raw)?;
392 Ok(vmo)
393 }
394
395 pub fn sync(&self) -> Result<(), zx::Status> {
397 self.proxy
398 .sync(zx::MonotonicInstant::INFINITE)
399 .map_err(|_| zx::Status::IO)?
400 .map_err(zx::Status::from_raw)
401 }
402
403 pub fn close_and_update_access_time(self) {
405 let _ = self.proxy.get_attributes(
406 fio::NodeAttributesQuery::PENDING_ACCESS_TIME_UPDATE,
407 zx::MonotonicInstant::INFINITE_PAST,
408 );
409 }
410
411 pub fn clone_proxy(&self) -> Result<fio::NodeSynchronousProxy, zx::Status> {
413 let (client_end, server_end) = zx::Channel::create();
414 self.proxy.clone(server_end.into()).map_err(|_| zx::Status::IO)?;
415 Ok(client_end.into())
416 }
417
418 pub fn link_into(&self, target_dir: &Self, name: &str) -> Result<(), zx::Status> {
420 let target_dir_proxy = target_dir.cast_proxy::<fio::DirectorySynchronousProxy>();
421 let (status, token) = target_dir_proxy
422 .get_token(zx::MonotonicInstant::INFINITE)
423 .map_err(|_| zx::Status::IO)?;
424 zx::Status::ok(status)?;
425 let token = token.ok_or(zx::Status::NOT_SUPPORTED)?;
426
427 let linkable_proxy = self.cast_proxy::<fio::LinkableSynchronousProxy>();
429
430 linkable_proxy
431 .link_into(token.into(), name, zx::MonotonicInstant::INFINITE)
432 .map_err(|_| zx::Status::IO)?
433 .map_err(zx::Status::from_raw)
434 }
435
436 pub fn unlink(&self, name: &str, flags: fio::UnlinkFlags) -> Result<(), zx::Status> {
438 let options = fio::UnlinkOptions { flags: Some(flags), ..Default::default() };
439 let dir_proxy = self.cast_proxy::<fio::DirectorySynchronousProxy>();
440 dir_proxy
441 .unlink(name, &options, zx::MonotonicInstant::INFINITE)
442 .map_err(|_| zx::Status::IO)?
443 .map_err(zx::Status::from_raw)
444 }
445
446 pub fn rename(
448 &self,
449 old_path: &str,
450 new_directory: &Self,
451 new_path: &str,
452 ) -> Result<(), zx::Status> {
453 let new_dir_proxy = new_directory.cast_proxy::<fio::DirectorySynchronousProxy>();
454 let (status, token) =
455 new_dir_proxy.get_token(zx::MonotonicInstant::INFINITE).map_err(|_| zx::Status::IO)?;
456 zx::Status::ok(status)?;
457 let token = token.ok_or(zx::Status::NOT_SUPPORTED)?;
458 let dir_proxy = self.cast_proxy::<fio::DirectorySynchronousProxy>();
459 dir_proxy
460 .rename(old_path, token.into(), new_path, zx::MonotonicInstant::INFINITE)
461 .map_err(|_| zx::Status::IO)?
462 .map_err(zx::Status::from_raw)
463 }
464
465 pub fn create_symlink(&self, name: &str, target: &[u8]) -> Result<RemoteIo, zx::Status> {
467 let dir_proxy = self.cast_proxy::<fio::DirectorySynchronousProxy>();
468 let (client_end, server_end) = zx::Channel::create();
469 dir_proxy
470 .create_symlink(name, target, Some(server_end.into()), zx::MonotonicInstant::INFINITE)
471 .map_err(|_| zx::Status::IO)?
472 .map_err(zx::Status::from_raw)?;
473 Ok(RemoteIo::new(client_end.into()))
474 }
475
476 pub fn enable_verity(&self, descriptor: &zxio_fsverity_descriptor_t) -> Result<(), zx::Status> {
478 let file_proxy = self.cast_proxy::<fio::FileSynchronousProxy>();
479 let options = fio::VerificationOptions {
480 hash_algorithm: Some(match descriptor.hash_algorithm {
481 1 => fio::HashAlgorithm::Sha256,
482 2 => fio::HashAlgorithm::Sha512,
483 _ => return Err(zx::Status::INVALID_ARGS),
484 }),
485 salt: Some(descriptor.salt[..descriptor.salt_size as usize].to_vec()),
486 ..Default::default()
487 };
488 file_proxy
489 .enable_verity(&options, zx::MonotonicInstant::INFINITE)
490 .map_err(|_| zx::Status::IO)?
491 .map_err(zx::Status::from_raw)
492 }
493
494 pub fn allocate(&self, offset: u64, len: u64, mode: AllocateMode) -> Result<(), zx::Status> {
496 let file_proxy = self.cast_proxy::<fio::FileSynchronousProxy>();
497 let mut fio_mode = fio::AllocateMode::empty();
498 if mode.contains(AllocateMode::KEEP_SIZE) {
499 fio_mode |= fio::AllocateMode::KEEP_SIZE;
500 }
501 if mode.contains(AllocateMode::UNSHARE_RANGE) {
502 fio_mode |= fio::AllocateMode::UNSHARE_RANGE;
503 }
504 if mode.contains(AllocateMode::PUNCH_HOLE) {
505 fio_mode |= fio::AllocateMode::PUNCH_HOLE;
506 }
507 if mode.contains(AllocateMode::COLLAPSE_RANGE) {
508 fio_mode |= fio::AllocateMode::COLLAPSE_RANGE;
509 }
510 if mode.contains(AllocateMode::ZERO_RANGE) {
511 fio_mode |= fio::AllocateMode::ZERO_RANGE;
512 }
513 if mode.contains(AllocateMode::INSERT_RANGE) {
514 fio_mode |= fio::AllocateMode::INSERT_RANGE;
515 }
516 file_proxy
517 .allocate(offset, len, fio_mode, zx::MonotonicInstant::INFINITE)
518 .map_err(|_| zx::Status::IO)?
519 .map_err(zx::Status::from_raw)
520 }
521
522 pub fn xattr_get(&self, name: &[u8]) -> Result<Vec<u8>, zx::Status> {
524 let name_str = std::str::from_utf8(name.as_ref()).map_err(|_| zx::Status::INVALID_ARGS)?;
525 let result = self
526 .proxy
527 .get_extended_attribute(name_str.as_bytes(), zx::MonotonicInstant::INFINITE)
528 .map_err(|_| zx::Status::IO)?
529 .map_err(zx::Status::from_raw)?;
530 match result {
531 fio::ExtendedAttributeValue::Bytes(bytes) => Ok(bytes),
532 fio::ExtendedAttributeValue::Buffer(vmo) => {
533 let size = vmo.get_content_size().map_err(|_| zx::Status::IO)?;
534 let mut bytes = vec![0u8; size as usize];
535 vmo.read(&mut bytes, 0).map_err(|_| zx::Status::IO)?;
536 Ok(bytes)
537 }
538 _ => Err(zx::Status::NOT_SUPPORTED),
539 }
540 }
541
542 pub fn xattr_set(
544 &self,
545 name: &[u8],
546 value: &[u8],
547 mode: syncio::XattrSetMode,
548 ) -> Result<(), zx::Status> {
549 let val = fio::ExtendedAttributeValue::Bytes(value.to_vec());
550 let fidl_mode = match mode {
551 syncio::XattrSetMode::Set => fio::SetExtendedAttributeMode::Set,
552 syncio::XattrSetMode::Create => fio::SetExtendedAttributeMode::Create,
553 syncio::XattrSetMode::Replace => fio::SetExtendedAttributeMode::Replace,
554 };
555 self.proxy
556 .set_extended_attribute(name, val, fidl_mode, zx::MonotonicInstant::INFINITE)
557 .map_err(|_| zx::Status::IO)?
558 .map_err(zx::Status::from_raw)
559 }
560
561 pub fn xattr_remove(&self, name: &[u8]) -> Result<(), zx::Status> {
563 let name_str = std::str::from_utf8(name.as_ref()).map_err(|_| zx::Status::INVALID_ARGS)?;
564 self.proxy
565 .remove_extended_attribute(name_str.as_bytes(), zx::MonotonicInstant::INFINITE)
566 .map_err(|_| zx::Status::IO)?
567 .map_err(zx::Status::from_raw)
568 }
569
570 pub fn xattr_list(&self) -> Result<Vec<Vec<u8>>, zx::Status> {
572 let (client_end, server_end) = zx::Channel::create();
573 self.proxy.list_extended_attributes(server_end.into()).map_err(|_| zx::Status::IO)?;
574 let iterator = fio::ExtendedAttributeIteratorSynchronousProxy::new(client_end);
575 let mut all_attrs = vec![];
576 loop {
577 let (attributes, last) = iterator
578 .get_next(zx::MonotonicInstant::INFINITE)
579 .map_err(|_| zx::Status::IO)?
580 .map_err(zx::Status::from_raw)?;
581 all_attrs.extend(attributes);
582 if last {
583 break;
584 }
585 }
586 Ok(all_attrs)
587 }
588}
589
590pub struct RemoteDirectory {
595 proxy: fio::DirectorySynchronousProxy,
596 state: Mutex<State>,
597}
598
599#[derive(Default)]
600struct State {
601 buffer: Vec<u8>,
603
604 offset: usize,
606
607 pending_entry: Entry,
610
611 current_index: u64,
613}
614
615impl State {
616 fn name(&self, range: Range<usize>) -> &[u8] {
617 &self.buffer[range]
618 }
619
620 fn next(&mut self, proxy: &fio::DirectorySynchronousProxy) -> Result<Entry, zx::Status> {
623 let mut next_dirent = || -> Result<Entry, zx::Status> {
624 if self.offset >= self.buffer.len() {
625 match proxy.read_dirents(fio::MAX_BUF, zx::MonotonicInstant::INFINITE) {
626 Ok((status, dirents)) => {
627 zx::Status::ok(status)?;
628 if dirents.is_empty() {
629 return Ok(Entry::None);
630 }
631 self.buffer = dirents;
632 self.offset = 0;
633 }
634 Err(_) => return Err(zx::Status::IO),
635 }
636 }
637
638 #[repr(C, packed)]
639 #[derive(FromBytes)]
640 struct DirectoryEntry {
641 ino: u64,
642 name_len: u8,
643 entry_type: u8,
644 }
645
646 let Some((ino, name, entry_type)) =
647 DirectoryEntry::read_from_prefix(&self.buffer[self.offset..]).ok().and_then(
648 |(DirectoryEntry { ino, name_len, entry_type }, remainder)| {
649 let name_len = name_len as usize;
650 let name_start = self.offset + std::mem::size_of::<DirectoryEntry>();
651 (remainder.len() >= name_len).then_some((
652 ino,
653 name_start..name_start + name_len,
654 entry_type,
655 ))
656 },
657 )
658 else {
659 return Ok(Entry::None);
661 };
662
663 self.offset = name.end;
664
665 Ok(Entry::Some {
666 ino,
667 entry_type: fio::DirentType::from_primitive(entry_type).ok_or(zx::Status::IO)?,
668 name,
669 })
670 };
671
672 let mut next = self.pending_entry.take();
673 if let Entry::None = next {
674 next = next_dirent()?;
675 }
676 match &next {
680 Entry::Some { name, .. } if self.name(name.clone()) == b"." => {
681 self.pending_entry = Entry::DotDot;
682 }
683 _ => {}
684 }
685 self.current_index += 1;
686 Ok(next)
687 }
688
689 fn rewind(&mut self, proxy: &fio::DirectorySynchronousProxy) -> Result<(), zx::Status> {
690 self.pending_entry = Entry::None;
691 let status = proxy.rewind(zx::MonotonicInstant::INFINITE).map_err(|_| zx::Status::IO)?;
692 zx::Status::ok(status)?;
693 self.buffer.clear();
694 self.offset = 0;
695 self.current_index = 0;
696 Ok(())
697 }
698}
699
700#[derive(Default)]
701enum Entry {
702 #[default]
704 None,
705
706 Some {
707 ino: u64,
708 entry_type: fio::DirentType,
709 name: Range<usize>,
710 },
711
712 DotDot,
714}
715
716impl Entry {
717 fn take(&mut self) -> Entry {
718 std::mem::replace(self, Entry::None)
719 }
720}
721
722impl RemoteDirectory {
723 pub fn new(proxy: fio::DirectorySynchronousProxy) -> Self {
724 Self { proxy, state: Mutex::default() }
725 }
726
727 pub fn seek(&self, new_index: u64) -> Result<u64, zx::Status> {
729 let mut state = self.state.lock();
730
731 if new_index < state.current_index {
732 state.rewind(&self.proxy)?;
735 state.current_index = 0;
736 }
737
738 for i in state.current_index..new_index {
740 match state.next(&self.proxy) {
741 Ok(Entry::Some { .. } | Entry::DotDot) => {}
742 Ok(Entry::None) => break, Err(_) => {
744 return Ok(i);
750 }
751 }
752 }
753
754 Ok(new_index)
755 }
756
757 pub fn readdir<B, S: FnMut(u64, fio::DirentType, &[u8]) -> ControlFlow<B, ()>>(
761 &self,
762 mut sink: S,
763 ) -> Result<Option<B>, zx::Status> {
764 let mut state = self.state.lock();
765 loop {
766 let entry = state.next(&self.proxy)?;
767 if let ControlFlow::Break(b) = match &entry {
768 Entry::Some { ino, entry_type, name } => {
769 sink(*ino, *entry_type, state.name(name.clone()))
770 }
771 Entry::DotDot => sink(0, fio::DirentType::Directory, b".."),
772 Entry::None => break,
773 } {
774 state.pending_entry = entry;
775 return Ok(Some(b));
776 }
777 }
778 Ok(None)
779 }
780
781 pub fn sync(&self) -> Result<(), zx::Status> {
783 self.proxy
784 .sync(zx::MonotonicInstant::INFINITE)
785 .map_err(|_| zx::Status::IO)?
786 .map_err(zx::Status::from_raw)
787 }
788
789 pub fn clone_proxy(&self) -> Result<fio::DirectorySynchronousProxy, zx::Status> {
791 let (client_end, server_end) = zx::Channel::create();
792 self.proxy.clone(server_end.into()).map_err(|_| zx::Status::IO)?;
793 Ok(client_end.into())
794 }
795}
796
797#[cfg(test)]
798mod tests {
799 use super::*;
800 use fuchsia_async as fasync;
801 use futures::StreamExt;
802 use std::sync::Arc;
803 use std::sync::atomic::{AtomicU64, Ordering};
804
805 #[fuchsia::test]
806 async fn test_read_chunking() {
807 let (client, mut stream) = fidl::endpoints::create_request_stream::<fio::FileMarker>();
808 let content = vec![0xAB; (fio::MAX_TRANSFER_SIZE + 100) as usize];
809 let content_clone = content.clone();
810
811 let _server_task = fasync::Task::spawn(async move {
812 while let Some(Ok(request)) = stream.next().await {
813 match request {
814 fio::FileRequest::ReadAt { count, offset, responder } => {
815 let start = offset as usize;
816 let end = std::cmp::min(start + count as usize, content_clone.len());
817 let data = if start < content_clone.len() {
818 &content_clone[start..end]
819 } else {
820 &[]
821 };
822 responder.send(Ok(data)).unwrap();
823 }
824 _ => panic!("Unexpected request: {:?}", request),
825 }
826 }
827 });
828
829 let io = RemoteIo::new(client.into_channel().into());
830 fasync::unblock(move || {
831 let mut data = Vec::new();
832 let actual = io
833 .read(
834 0,
835 content.len(),
836 |chunk| -> Result<usize, zx::Status> {
837 let len = chunk.len();
838 data.extend(chunk);
839 Ok(len)
840 },
841 |status| status,
842 )
843 .unwrap();
844 assert_eq!(actual, content.len());
845 assert_eq!(data, content);
846 })
847 .await;
848 }
849
850 #[fuchsia::test]
851 async fn test_read_error_after_data() {
852 let (client, mut stream) = fidl::endpoints::create_request_stream::<fio::FileMarker>();
853 let chunk_size = 100;
854 let content = vec![0xAA; chunk_size];
855 let content_clone = content.clone();
856
857 let _server_task = fasync::Task::spawn(async move {
858 let mut request_count = 0;
859 while let Some(Ok(request)) = stream.next().await {
860 match request {
861 fio::FileRequest::ReadAt { count: _, offset: _, responder } => {
862 request_count += 1;
863 if request_count == 1 {
864 responder.send(Ok(&content_clone)).unwrap();
865 } else {
866 responder.send(Err(zx::sys::ZX_ERR_IO)).unwrap();
867 }
868 }
869 _ => panic!("Unexpected request: {:?}", request),
870 }
871 }
872 });
873
874 let io = RemoteIo::new(client.into_channel().into());
875 fasync::unblock(move || {
876 let mut data = Vec::new();
877 let actual = io
879 .read(
880 0,
881 chunk_size * 2,
882 |chunk| -> Result<usize, zx::Status> {
883 let len = chunk.len();
884 data.extend(chunk);
885 Ok(len)
886 },
887 |status| status,
888 )
889 .expect("read should succeed even if later chunks fail");
890 assert_eq!(actual, chunk_size);
891 assert_eq!(data.len(), chunk_size);
892 assert_eq!(data, content);
893 })
894 .await;
895 }
896
897 #[fuchsia::test]
898 async fn test_write_chunking() {
899 let (client, mut stream) = fidl::endpoints::create_request_stream::<fio::FileMarker>();
900 let content = vec![0xCD; (fio::MAX_TRANSFER_SIZE + 100) as usize];
901 let content2 = content.clone();
902
903 let server_task = fasync::Task::spawn(async move {
904 let mut written = vec![0; content2.len()];
905 while let Some(Ok(request)) = stream.next().await {
906 match request {
907 fio::FileRequest::WriteAt { offset, data, responder, .. } => {
908 let offset = offset as usize;
909 written[offset..offset + data.len()].copy_from_slice(&data);
910 responder.send(Ok(data.len() as u64)).unwrap();
911 }
912 _ => panic!("Unexpected request: {:?}", request),
913 }
914 }
915 assert_eq!(written, content2);
916 });
917
918 let io = RemoteIo::new(client.into_channel().into());
919 fasync::unblock(move || {
920 let written = io.write(0, &content).expect("write failed");
921 assert_eq!(written, content.len());
922 })
923 .await;
924
925 server_task.await;
926 }
927
928 #[fuchsia::test]
929 async fn test_write_error_after_data() {
930 let (client, mut stream) = fidl::endpoints::create_request_stream::<fio::FileMarker>();
931 let chunk_size = fio::MAX_TRANSFER_SIZE as usize;
932 let content = vec![0xEE; chunk_size * 2];
933
934 let _server_task = fasync::Task::spawn(async move {
935 let mut request_count = 0;
936 while let Some(Ok(request)) = stream.next().await {
937 match request {
938 fio::FileRequest::WriteAt { offset: _, data, responder, .. } => {
939 request_count += 1;
940 if request_count == 1 {
941 responder.send(Ok(data.len() as u64)).unwrap();
942 } else {
943 responder.send(Err(zx::sys::ZX_ERR_IO)).unwrap();
944 }
945 }
946 _ => panic!("Unexpected request: {:?}", request),
947 }
948 }
949 });
950
951 let io = RemoteIo::new(client.into_channel().into());
952 fasync::unblock(move || {
953 let written = io.write(0, &content).expect("write should succeed partial");
954 assert_eq!(written, chunk_size);
955 })
956 .await;
957 }
958
959 #[fuchsia::test]
960 async fn test_large_directory() {
961 let (client, mut stream) = fidl::endpoints::create_request_stream::<fio::DirectoryMarker>();
962 let num_entries = 2000;
963
964 let task = fasync::Task::spawn(async move {
965 let mut sent_count = 0;
966 let mut num_requests = 0;
967 while let Some(Ok(request)) = stream.next().await {
968 match request {
969 fio::DirectoryRequest::ReadDirents { max_bytes, responder } => {
970 num_requests += 1;
971 let mut buffer = Vec::new();
972 while sent_count < num_entries {
973 let name = if sent_count == 0 {
974 ".".to_string()
975 } else {
976 format!("file_{}", sent_count - 1)
977 };
978 let name_bytes = name.as_bytes();
979 let entry_size = 10 + name_bytes.len();
980 if buffer.len() + entry_size > max_bytes as usize {
981 break;
982 }
983 buffer.extend_from_slice(&(sent_count as u64 + 1).to_le_bytes());
984 buffer.push(name_bytes.len() as u8);
985 let entry_type = if sent_count == 0 {
986 fio::DirentType::Directory
987 } else {
988 fio::DirentType::File
989 };
990 buffer.push(entry_type.into_primitive());
991 buffer.extend_from_slice(name_bytes);
992 sent_count += 1;
993 }
994 let _ = responder.send(0, &buffer);
995 }
996 fio::DirectoryRequest::Rewind { responder } => {
997 sent_count = 0;
998 let _ = responder.send(0);
999 }
1000 fio::DirectoryRequest::Close { responder } => {
1001 let _ = responder.send(Ok(()));
1002 }
1003 _ => {}
1004 }
1005 }
1006 assert!(num_requests > 0);
1007 });
1008
1009 let dir = RemoteDirectory::new(client.into_channel().into());
1010 let count = Arc::new(AtomicU64::new(0));
1011 let count2 = count.clone();
1012 fasync::unblock(move || {
1013 dir.readdir::<(), _>(|_ino, _type, _name| {
1014 count.fetch_add(1, Ordering::Relaxed);
1015 ControlFlow::Continue(())
1016 })
1017 .unwrap();
1018 })
1019 .await;
1020 assert_eq!(count2.load(Ordering::Relaxed), num_entries + 1);
1022 task.await;
1023 }
1024
1025 #[fuchsia::test]
1026 async fn test_seek_backwards() {
1027 let (client, mut stream) = fidl::endpoints::create_request_stream::<fio::DirectoryMarker>();
1028 let _server_task = fasync::Task::spawn(async move {
1029 let entries = vec![
1030 (1, fio::DirentType::Directory, "."),
1031 (2, fio::DirentType::File, "file_0"),
1032 (3, fio::DirentType::File, "file_1"),
1033 (4, fio::DirentType::File, "file_2"),
1034 ];
1035 let mut current_entry = 0;
1036
1037 while let Some(Ok(request)) = stream.next().await {
1038 match request {
1039 fio::DirectoryRequest::ReadDirents { max_bytes, responder } => {
1040 let mut buffer = Vec::new();
1041 while current_entry < entries.len() {
1042 let (ino, type_, name) = entries[current_entry];
1043 let name_bytes = name.as_bytes();
1044 let entry_size = 10 + name_bytes.len();
1045 if buffer.len() + entry_size > max_bytes as usize {
1046 break;
1047 }
1048 buffer.extend_from_slice(&(ino as u64).to_le_bytes());
1049 buffer.push(name_bytes.len() as u8);
1050 buffer.push(type_.into_primitive());
1051 buffer.extend_from_slice(name_bytes);
1052 current_entry += 1;
1053 }
1054 responder.send(0, &buffer).unwrap();
1055 }
1056 fio::DirectoryRequest::Rewind { responder } => {
1057 current_entry = 0;
1058 responder.send(0).unwrap();
1059 }
1060 _ => {}
1061 }
1062 }
1063 });
1064
1065 let dir = RemoteDirectory::new(client.into_channel().into());
1066 fasync::unblock(move || {
1067 let mut names = Vec::new();
1068 dir.readdir::<(), _>(|_ino, _type, name| {
1070 names.push(name.to_vec());
1071 if names.len() == 3 { ControlFlow::Break(()) } else { ControlFlow::Continue(()) }
1072 })
1073 .unwrap();
1074
1075 assert_eq!(names[0], b".");
1076 assert_eq!(names[1], b"..");
1077 assert_eq!(names[2], b"file_0");
1078
1079 dir.seek(1).unwrap();
1082
1083 let mut names_after_seek = Vec::new();
1084 dir.readdir::<(), _>(|_ino, _type, name| {
1086 names_after_seek.push(name.to_vec());
1087 if names_after_seek.len() == 2 {
1088 ControlFlow::Break(())
1089 } else {
1090 ControlFlow::Continue(())
1091 }
1092 })
1093 .unwrap();
1094
1095 assert_eq!(names_after_seek[0], b"..");
1096 assert_eq!(names_after_seek[1], b"file_0");
1097 })
1098 .await;
1099 }
1100}