1use fidl::endpoints::SynchronousProxy;
10use fidl_fuchsia_io as fio;
11use fuchsia_sync::Mutex;
12use std::ops::{ControlFlow, Range};
13use syncio::{AllocateMode, zxio_fsverity_descriptor_t, zxio_node_attributes_t};
14use zerocopy::FromBytes;
15
16fn zxio_attr_from_fidl(
20 mutable: &fio::MutableNodeAttributes,
21 immutable: &fio::ImmutableNodeAttributes,
22) -> zxio_node_attributes_t {
23 let mut out_attr = zxio_node_attributes_t::default();
24 if let Some(protocols) = immutable.protocols {
25 out_attr.protocols = protocols.bits();
26 out_attr.has.protocols = true;
27 }
28 if let Some(abilities) = immutable.abilities {
29 out_attr.abilities = abilities.bits();
30 out_attr.has.abilities = true;
31 }
32 if let Some(id) = immutable.id {
33 out_attr.id = id;
34 out_attr.has.id = true;
35 }
36 if let Some(content_size) = immutable.content_size {
37 out_attr.content_size = content_size;
38 out_attr.has.content_size = true;
39 }
40 if let Some(storage_size) = immutable.storage_size {
41 out_attr.storage_size = storage_size;
42 out_attr.has.storage_size = true;
43 }
44 if let Some(link_count) = immutable.link_count {
45 out_attr.link_count = link_count;
46 out_attr.has.link_count = true;
47 }
48 if let Some(creation_time) = mutable.creation_time {
49 out_attr.creation_time = creation_time;
50 out_attr.has.creation_time = true;
51 }
52 if let Some(modification_time) = mutable.modification_time {
53 out_attr.modification_time = modification_time;
54 out_attr.has.modification_time = true;
55 }
56 if let Some(access_time) = mutable.access_time {
57 out_attr.access_time = access_time;
58 out_attr.has.access_time = true;
59 }
60 if let Some(mode) = mutable.mode {
61 out_attr.mode = mode;
62 out_attr.has.mode = true;
63 }
64 if let Some(uid) = mutable.uid {
65 out_attr.uid = uid;
66 out_attr.has.uid = true;
67 }
68 if let Some(gid) = mutable.gid {
69 out_attr.gid = gid;
70 out_attr.has.gid = true;
71 }
72 if let Some(rdev) = mutable.rdev {
73 out_attr.rdev = rdev;
74 out_attr.has.rdev = true;
75 }
76 if let Some(change_time) = immutable.change_time {
77 out_attr.change_time = change_time;
78 out_attr.has.change_time = true;
79 }
80 if let Some(casefold) = mutable.casefold {
81 out_attr.casefold = casefold;
82 out_attr.has.casefold = true;
83 }
84 if let Some(verity_enabled) = immutable.verity_enabled {
85 out_attr.fsverity_enabled = verity_enabled;
86 out_attr.has.fsverity_enabled = true;
87 }
88 if let Some(wrapping_key_id) = mutable.wrapping_key_id {
89 out_attr.wrapping_key_id = wrapping_key_id;
90 out_attr.has.wrapping_key_id = true;
91 }
92 out_attr
93}
94
95pub trait Factory {
97 type Result;
98
99 fn create_node(self, io: RemoteIo, info: fio::NodeInfo) -> Self::Result;
100 fn create_directory(self, io: RemoteIo, info: fio::DirectoryInfo) -> Self::Result;
101 fn create_file(self, io: RemoteIo, info: fio::FileInfo) -> Self::Result;
102 fn create_symlink(self, io: RemoteIo, info: fio::SymlinkInfo) -> Self::Result;
103}
104
105pub fn create_with_on_representation<F: Factory>(
112 proxy: fio::NodeSynchronousProxy,
113 factory: F,
114) -> Result<F::Result, zx::Status> {
115 match proxy.wait_for_event(zx::MonotonicInstant::INFINITE) {
116 Ok(fio::NodeEvent::OnRepresentation { payload }) => match payload {
117 fio::Representation::Node(info) => Ok(factory.create_node(RemoteIo::new(proxy), info)),
118 fio::Representation::Directory(info) => {
119 Ok(factory.create_directory(RemoteIo::new(proxy), info))
120 }
121 fio::Representation::File(mut info) => {
122 let io = RemoteIo {
123 proxy,
124 stream: info
125 .stream
126 .take()
127 .map(zx::Stream::from)
128 .unwrap_or_else(|| zx::NullableHandle::invalid().into()),
129 };
130 Ok(factory.create_file(io, info))
131 }
132 fio::Representation::Symlink(info) => {
133 Ok(factory.create_symlink(RemoteIo::new(proxy), info))
134 }
135 _ => Err(zx::Status::NOT_SUPPORTED),
136 },
137 Err(fidl::Error::ClientChannelClosed { status, .. }) => Err(status),
138 _ => Err(zx::Status::IO),
139 }
140}
141
142pub struct RemoteIo {
148 proxy: fio::NodeSynchronousProxy,
149 stream: zx::Stream,
152}
153
154impl RemoteIo {
155 pub fn new(proxy: fio::NodeSynchronousProxy) -> Self {
156 Self { proxy, stream: zx::NullableHandle::invalid().into() }
157 }
158
159 pub fn with_stream(proxy: fio::NodeSynchronousProxy, stream: zx::Stream) -> Self {
160 Self { proxy, stream }
161 }
162
163 pub fn into_proxy(self) -> fio::NodeSynchronousProxy {
164 self.proxy
165 }
166
167 fn cast_proxy<T: From<zx::Channel> + Into<zx::NullableHandle>>(&self) -> zx::Unowned<'_, T> {
168 zx::Unowned::new(self.proxy.as_channel())
169 }
170
171 pub fn attr_get(
173 &self,
174 query: fio::NodeAttributesQuery,
175 ) -> Result<(fio::MutableNodeAttributes, fio::ImmutableNodeAttributes), zx::Status> {
176 self.proxy
177 .get_attributes(query, zx::MonotonicInstant::INFINITE)
178 .map_err(|_| zx::Status::IO)?
179 .map_err(zx::Status::from_raw)
180 }
181
182 pub fn attr_get_zxio(
187 &self,
188 query: fio::NodeAttributesQuery,
189 ) -> Result<zxio_node_attributes_t, zx::Status> {
190 self.attr_get(query).map(|(m, i)| zxio_attr_from_fidl(&m, &i))
191 }
192
193 pub fn attr_set(&self, attributes: fio::MutableNodeAttributes) -> Result<(), zx::Status> {
195 self.proxy
196 .update_attributes(&attributes, zx::MonotonicInstant::INFINITE)
197 .map_err(|_| zx::Status::IO)?
198 .map_err(zx::Status::from_raw)
199 }
200
201 pub fn open<F: Factory>(
203 &self,
204 path: &str,
205 flags: fio::Flags,
206 create_attributes: Option<fio::MutableNodeAttributes>,
207 query: fio::NodeAttributesQuery,
208 factory: F,
209 ) -> Result<F::Result, zx::Status> {
210 let (client_end, server_end) = zx::Channel::create();
211 let dir_proxy = self.cast_proxy::<fio::DirectorySynchronousProxy>();
212 dir_proxy
213 .open(
214 path,
215 flags | fio::Flags::FLAG_SEND_REPRESENTATION,
216 &fio::Options {
217 attributes: (!query.is_empty()).then_some(query),
218 create_attributes,
219 ..Default::default()
220 },
221 server_end,
222 )
223 .map_err(|_| zx::Status::IO)?;
224 create_with_on_representation(client_end.into(), factory)
225 }
226
227 pub fn open_pipelined<F: Factory>(
239 &self,
240 paths: &[&str],
241 flags: fio::Flags,
242 query: fio::NodeAttributesQuery,
243 mut factory_fn: impl FnMut() -> F,
244 ) -> Vec<Result<F::Result, zx::Status>> {
245 let mut proxy_to_result =
246 |proxies: Vec<fio::NodeSynchronousProxy>| -> Vec<Result<F::Result, zx::Status>> {
247 let mut results = Vec::new();
248 for proxy in proxies {
249 let result = create_with_on_representation(proxy, factory_fn());
250 let is_err = result.is_err();
251 results.push(result);
252 if is_err {
254 break;
255 }
256 }
257 results
258 };
259
260 let mut proxies: Vec<fio::NodeSynchronousProxy> = Vec::with_capacity(paths.len());
261
262 for (i, path) in paths.iter().enumerate() {
263 let (client_end, server_end) = zx::Channel::create();
264 let channel = proxies.last().unwrap_or(&self.proxy).as_channel();
265 let dir_proxy = zx::Unowned::<fio::DirectorySynchronousProxy>::new(channel);
266
267 let mut open_flags = flags | fio::Flags::FLAG_SEND_REPRESENTATION;
268 if i < paths.len() - 1 {
269 open_flags |= fio::Flags::PROTOCOL_DIRECTORY;
270 }
271
272 #[cfg(test)]
273 {
274 TEST_HOOK.with(|hook| {
275 if let Some(hook) = hook.borrow_mut().as_mut() {
276 hook();
277 }
278 });
279 }
280
281 if let Err(err) = dir_proxy.open(
282 path,
283 open_flags,
284 &fio::Options {
285 attributes: (!query.is_empty()).then_some(query),
286 ..Default::default()
287 },
288 server_end,
289 ) {
290 let mut results = proxy_to_result(proxies);
292 if results.last().is_none_or(|r| r.is_ok()) {
294 let status = match err {
295 fidl::Error::ClientChannelClosed { status, .. } => status,
296 fidl::Error::ClientWrite(fidl::TransportError::Status(status)) => status,
297 _ => zx::Status::IO,
298 };
299 results.push(Err(status));
300 }
301 return results;
302 }
303 proxies.push(fio::NodeSynchronousProxy::new(client_end));
304 }
305
306 return proxy_to_result(proxies);
307 }
308
309 pub fn read_partial(&self, offset: u64, max: usize) -> Result<(Vec<u8>, bool), zx::Status> {
313 if self.stream.is_invalid() {
314 let file_proxy = self.cast_proxy::<fio::FileSynchronousProxy>();
315 let max = std::cmp::min(max as u64, fio::MAX_TRANSFER_SIZE);
316 let data = file_proxy
317 .read_at(max, offset, zx::MonotonicInstant::INFINITE)
318 .map_err(|_| zx::Status::IO)?
319 .map_err(zx::Status::from_raw)?;
320 let eof = (data.len() as u64) < max;
321 Ok((data, eof))
322 } else {
323 let bytes =
325 self.stream.read_at_to_vec(zx::StreamReadOptions::empty(), offset as u64, max)?;
326 let eof = bytes.len() < max;
327 Ok((bytes, eof))
328 }
329 }
330
331 pub fn read<E>(
336 &self,
337 offset: u64,
338 len: usize,
339 mut callback: impl FnMut(Vec<u8>) -> Result<usize, E>,
340 map_err: impl FnOnce(zx::Status) -> E,
341 ) -> Result<usize, E> {
342 let mut total = 0;
343 while total < len {
344 match self.read_partial(offset + total as u64, len - total) {
345 Ok((data, eof)) => {
346 if data.is_empty() {
347 break;
348 }
349 let data_len = data.len();
350 let written = callback(data)?;
351 total += written;
352 if eof || written < data_len {
353 break;
354 }
355 }
356 Err(e) => {
357 if total > 0 {
358 break;
359 }
360 return Err(map_err(e));
361 }
362 }
363 }
364 Ok(total)
365 }
366
367 pub fn write(&self, offset: u64, data: &[u8]) -> Result<usize, zx::Status> {
369 let file_proxy = self.cast_proxy::<fio::FileSynchronousProxy>();
370 let mut total_written = 0;
371 for chunk in data.chunks(fio::MAX_TRANSFER_SIZE as usize) {
372 let result = file_proxy
373 .write_at(chunk, offset + total_written as u64, zx::MonotonicInstant::INFINITE)
374 .map_err(|_| zx::Status::IO)
375 .and_then(|res| res.map_err(zx::Status::from_raw));
376 match result {
377 Ok(actual) => {
378 let actual = actual as usize;
379 total_written += actual;
380 if actual < chunk.len() {
381 return Ok(total_written);
382 }
383 }
384 Err(e) => {
385 if total_written > 0 {
386 break;
387 }
388 return Err(e);
389 }
390 }
391 }
392 Ok(total_written)
393 }
394
395 pub fn supports_vectored(&self) -> bool {
397 !self.stream.is_invalid()
399 }
400
401 pub unsafe fn readv(
408 &self,
409 offset: u64,
410 iovecs: &mut [zx::sys::zx_iovec_t],
411 ) -> Result<usize, zx::Status> {
412 if self.stream.is_invalid() {
413 return Err(zx::Status::NOT_SUPPORTED);
414 }
415 unsafe { self.stream.readv_at(zx::StreamReadOptions::empty(), offset as u64, iovecs) }
417 }
418
419 pub fn writev(&self, offset: u64, iovecs: &[zx::sys::zx_iovec_t]) -> Result<usize, zx::Status> {
422 if self.stream.is_invalid() {
423 return Err(zx::Status::NOT_SUPPORTED);
424 }
425 self.stream.writev_at(zx::StreamWriteOptions::empty(), offset, &iovecs)
426 }
427
428 pub fn truncate(&self, length: u64) -> Result<(), zx::Status> {
430 self.cast_proxy::<fio::FileSynchronousProxy>()
431 .resize(length, zx::MonotonicInstant::INFINITE)
432 .map_err(|_| zx::Status::IO)?
433 .map_err(zx::Status::from_raw)
434 }
435
436 pub fn vmo_get(&self, flags: zx::VmarFlags) -> Result<zx::Vmo, zx::Status> {
438 let mut fio_flags = fio::VmoFlags::empty();
439 if flags.contains(zx::VmarFlags::PERM_READ) {
440 fio_flags |= fio::VmoFlags::READ;
441 }
442 if flags.contains(zx::VmarFlags::PERM_WRITE) {
443 fio_flags |= fio::VmoFlags::WRITE;
444 }
445 if flags.contains(zx::VmarFlags::PERM_EXECUTE) {
446 fio_flags |= fio::VmoFlags::EXECUTE;
447 }
448 let file_proxy = self.cast_proxy::<fio::FileSynchronousProxy>();
449 let vmo = file_proxy
450 .get_backing_memory(fio_flags, zx::MonotonicInstant::INFINITE)
451 .map_err(|_| zx::Status::IO)?
452 .map_err(zx::Status::from_raw)?;
453 Ok(vmo)
454 }
455
456 pub fn sync(&self) -> Result<(), zx::Status> {
458 self.proxy
459 .sync(zx::MonotonicInstant::INFINITE)
460 .map_err(|_| zx::Status::IO)?
461 .map_err(zx::Status::from_raw)
462 }
463
464 pub fn close_and_update_access_time(self) {
466 let _ = self.proxy.get_attributes(
467 fio::NodeAttributesQuery::PENDING_ACCESS_TIME_UPDATE,
468 zx::MonotonicInstant::INFINITE_PAST,
469 );
470 }
471
472 pub fn clone_proxy(&self) -> Result<fio::NodeSynchronousProxy, zx::Status> {
474 let (client_end, server_end) = zx::Channel::create();
475 self.proxy.clone(server_end.into()).map_err(|_| zx::Status::IO)?;
476 Ok(client_end.into())
477 }
478
479 pub fn link_into(&self, target_dir: &Self, name: &str) -> Result<(), zx::Status> {
481 let target_dir_proxy = target_dir.cast_proxy::<fio::DirectorySynchronousProxy>();
482 let (status, token) = target_dir_proxy
483 .get_token(zx::MonotonicInstant::INFINITE)
484 .map_err(|_| zx::Status::IO)?;
485 zx::Status::ok(status)?;
486 let token = token.ok_or(zx::Status::NOT_SUPPORTED)?;
487
488 let linkable_proxy = self.cast_proxy::<fio::LinkableSynchronousProxy>();
490
491 linkable_proxy
492 .link_into(token.into(), name, zx::MonotonicInstant::INFINITE)
493 .map_err(|_| zx::Status::IO)?
494 .map_err(zx::Status::from_raw)
495 }
496
497 pub fn unlink(&self, name: &str, flags: fio::UnlinkFlags) -> Result<(), zx::Status> {
499 let options = fio::UnlinkOptions { flags: Some(flags), ..Default::default() };
500 let dir_proxy = self.cast_proxy::<fio::DirectorySynchronousProxy>();
501 dir_proxy
502 .unlink(name, &options, zx::MonotonicInstant::INFINITE)
503 .map_err(|_| zx::Status::IO)?
504 .map_err(zx::Status::from_raw)
505 }
506
507 pub fn rename(
509 &self,
510 old_path: &str,
511 new_directory: &Self,
512 new_path: &str,
513 ) -> Result<(), zx::Status> {
514 let new_dir_proxy = new_directory.cast_proxy::<fio::DirectorySynchronousProxy>();
515 let (status, token) =
516 new_dir_proxy.get_token(zx::MonotonicInstant::INFINITE).map_err(|_| zx::Status::IO)?;
517 zx::Status::ok(status)?;
518 let token = token.ok_or(zx::Status::NOT_SUPPORTED)?;
519 let dir_proxy = self.cast_proxy::<fio::DirectorySynchronousProxy>();
520 dir_proxy
521 .rename(old_path, token.into(), new_path, zx::MonotonicInstant::INFINITE)
522 .map_err(|_| zx::Status::IO)?
523 .map_err(zx::Status::from_raw)
524 }
525
526 pub fn create_symlink(&self, name: &str, target: &[u8]) -> Result<RemoteIo, zx::Status> {
528 let dir_proxy = self.cast_proxy::<fio::DirectorySynchronousProxy>();
529 let (client_end, server_end) = zx::Channel::create();
530 dir_proxy
531 .create_symlink(name, target, Some(server_end.into()), zx::MonotonicInstant::INFINITE)
532 .map_err(|_| zx::Status::IO)?
533 .map_err(zx::Status::from_raw)?;
534 Ok(RemoteIo::new(client_end.into()))
535 }
536
537 pub fn enable_verity(&self, descriptor: &zxio_fsverity_descriptor_t) -> Result<(), zx::Status> {
539 let file_proxy = self.cast_proxy::<fio::FileSynchronousProxy>();
540 let options = fio::VerificationOptions {
541 hash_algorithm: Some(match descriptor.hash_algorithm {
542 1 => fio::HashAlgorithm::Sha256,
543 2 => fio::HashAlgorithm::Sha512,
544 _ => return Err(zx::Status::INVALID_ARGS),
545 }),
546 salt: Some(descriptor.salt[..descriptor.salt_size as usize].to_vec()),
547 ..Default::default()
548 };
549 file_proxy
550 .enable_verity(&options, zx::MonotonicInstant::INFINITE)
551 .map_err(|_| zx::Status::IO)?
552 .map_err(zx::Status::from_raw)
553 }
554
555 pub fn allocate(&self, offset: u64, len: u64, mode: AllocateMode) -> Result<(), zx::Status> {
557 let file_proxy = self.cast_proxy::<fio::FileSynchronousProxy>();
558 let mut fio_mode = fio::AllocateMode::empty();
559 if mode.contains(AllocateMode::KEEP_SIZE) {
560 fio_mode |= fio::AllocateMode::KEEP_SIZE;
561 }
562 if mode.contains(AllocateMode::UNSHARE_RANGE) {
563 fio_mode |= fio::AllocateMode::UNSHARE_RANGE;
564 }
565 if mode.contains(AllocateMode::PUNCH_HOLE) {
566 fio_mode |= fio::AllocateMode::PUNCH_HOLE;
567 }
568 if mode.contains(AllocateMode::COLLAPSE_RANGE) {
569 fio_mode |= fio::AllocateMode::COLLAPSE_RANGE;
570 }
571 if mode.contains(AllocateMode::ZERO_RANGE) {
572 fio_mode |= fio::AllocateMode::ZERO_RANGE;
573 }
574 if mode.contains(AllocateMode::INSERT_RANGE) {
575 fio_mode |= fio::AllocateMode::INSERT_RANGE;
576 }
577 file_proxy
578 .allocate(offset, len, fio_mode, zx::MonotonicInstant::INFINITE)
579 .map_err(|_| zx::Status::IO)?
580 .map_err(zx::Status::from_raw)
581 }
582
583 pub fn xattr_get(&self, name: &[u8]) -> Result<Vec<u8>, zx::Status> {
585 let name_str = std::str::from_utf8(name.as_ref()).map_err(|_| zx::Status::INVALID_ARGS)?;
586 let result = self
587 .proxy
588 .get_extended_attribute(name_str.as_bytes(), zx::MonotonicInstant::INFINITE)
589 .map_err(|_| zx::Status::IO)?
590 .map_err(zx::Status::from_raw)?;
591 match result {
592 fio::ExtendedAttributeValue::Bytes(bytes) => Ok(bytes),
593 fio::ExtendedAttributeValue::Buffer(vmo) => {
594 let size = vmo.get_content_size().map_err(|_| zx::Status::IO)?;
595 let mut bytes = vec![0u8; size as usize];
596 vmo.read(&mut bytes, 0).map_err(|_| zx::Status::IO)?;
597 Ok(bytes)
598 }
599 _ => Err(zx::Status::NOT_SUPPORTED),
600 }
601 }
602
603 pub fn xattr_set(
605 &self,
606 name: &[u8],
607 value: &[u8],
608 mode: syncio::XattrSetMode,
609 ) -> Result<(), zx::Status> {
610 let val = fio::ExtendedAttributeValue::Bytes(value.to_vec());
611 let fidl_mode = match mode {
612 syncio::XattrSetMode::Set => fio::SetExtendedAttributeMode::Set,
613 syncio::XattrSetMode::Create => fio::SetExtendedAttributeMode::Create,
614 syncio::XattrSetMode::Replace => fio::SetExtendedAttributeMode::Replace,
615 };
616 self.proxy
617 .set_extended_attribute(name, val, fidl_mode, zx::MonotonicInstant::INFINITE)
618 .map_err(|_| zx::Status::IO)?
619 .map_err(zx::Status::from_raw)
620 }
621
622 pub fn xattr_remove(&self, name: &[u8]) -> Result<(), zx::Status> {
624 let name_str = std::str::from_utf8(name.as_ref()).map_err(|_| zx::Status::INVALID_ARGS)?;
625 self.proxy
626 .remove_extended_attribute(name_str.as_bytes(), zx::MonotonicInstant::INFINITE)
627 .map_err(|_| zx::Status::IO)?
628 .map_err(zx::Status::from_raw)
629 }
630
631 pub fn xattr_list(&self) -> Result<Vec<Vec<u8>>, zx::Status> {
633 let (client_end, server_end) = zx::Channel::create();
634 self.proxy.list_extended_attributes(server_end.into()).map_err(|_| zx::Status::IO)?;
635 let iterator = fio::ExtendedAttributeIteratorSynchronousProxy::new(client_end);
636 let mut all_attrs = vec![];
637 loop {
638 let (attributes, last) = iterator
639 .get_next(zx::MonotonicInstant::INFINITE)
640 .map_err(|_| zx::Status::IO)?
641 .map_err(zx::Status::from_raw)?;
642 all_attrs.extend(attributes);
643 if last {
644 break;
645 }
646 }
647 Ok(all_attrs)
648 }
649}
650
651pub struct RemoteDirectory {
656 proxy: fio::DirectorySynchronousProxy,
657 state: Mutex<State>,
658}
659
660#[derive(Default)]
661struct State {
662 buffer: Vec<u8>,
664
665 offset: usize,
667
668 pending_entry: Entry,
671
672 current_index: u64,
674}
675
676impl State {
677 fn name(&self, range: Range<usize>) -> &[u8] {
678 &self.buffer[range]
679 }
680
681 fn next(&mut self, proxy: &fio::DirectorySynchronousProxy) -> Result<Entry, zx::Status> {
684 let mut next_dirent = || -> Result<Entry, zx::Status> {
685 if self.offset >= self.buffer.len() {
686 match proxy.read_dirents(fio::MAX_BUF, zx::MonotonicInstant::INFINITE) {
687 Ok((status, dirents)) => {
688 zx::Status::ok(status)?;
689 if dirents.is_empty() {
690 return Ok(Entry::None);
691 }
692 self.buffer = dirents;
693 self.offset = 0;
694 }
695 Err(_) => return Err(zx::Status::IO),
696 }
697 }
698
699 #[repr(C, packed)]
700 #[derive(FromBytes)]
701 struct DirectoryEntry {
702 ino: u64,
703 name_len: u8,
704 entry_type: u8,
705 }
706
707 let Some((ino, name, entry_type)) =
708 DirectoryEntry::read_from_prefix(&self.buffer[self.offset..]).ok().and_then(
709 |(DirectoryEntry { ino, name_len, entry_type }, remainder)| {
710 let name_len = name_len as usize;
711 let name_start = self.offset + std::mem::size_of::<DirectoryEntry>();
712 (remainder.len() >= name_len).then_some((
713 ino,
714 name_start..name_start + name_len,
715 entry_type,
716 ))
717 },
718 )
719 else {
720 return Ok(Entry::None);
722 };
723
724 self.offset = name.end;
725
726 Ok(Entry::Some {
727 ino,
728 entry_type: fio::DirentType::from_primitive(entry_type).ok_or(zx::Status::IO)?,
729 name,
730 })
731 };
732
733 let mut next = self.pending_entry.take();
734 if let Entry::None = next {
735 next = next_dirent()?;
736 }
737 match &next {
741 Entry::Some { name, .. } if self.name(name.clone()) == b"." => {
742 self.pending_entry = Entry::DotDot;
743 }
744 _ => {}
745 }
746 self.current_index += 1;
747 Ok(next)
748 }
749
750 fn rewind(&mut self, proxy: &fio::DirectorySynchronousProxy) -> Result<(), zx::Status> {
751 self.pending_entry = Entry::None;
752 let status = proxy.rewind(zx::MonotonicInstant::INFINITE).map_err(|_| zx::Status::IO)?;
753 zx::Status::ok(status)?;
754 self.buffer.clear();
755 self.offset = 0;
756 self.current_index = 0;
757 Ok(())
758 }
759}
760
761#[derive(Default)]
762enum Entry {
763 #[default]
765 None,
766
767 Some {
768 ino: u64,
769 entry_type: fio::DirentType,
770 name: Range<usize>,
771 },
772
773 DotDot,
775}
776
777impl Entry {
778 fn take(&mut self) -> Entry {
779 std::mem::replace(self, Entry::None)
780 }
781}
782
783impl RemoteDirectory {
784 pub fn new(proxy: fio::DirectorySynchronousProxy) -> Self {
785 Self { proxy, state: Mutex::default() }
786 }
787
788 pub fn seek(&self, new_index: u64) -> Result<u64, zx::Status> {
790 let mut state = self.state.lock();
791
792 if new_index < state.current_index {
793 state.rewind(&self.proxy)?;
796 state.current_index = 0;
797 }
798
799 for i in state.current_index..new_index {
801 match state.next(&self.proxy) {
802 Ok(Entry::Some { .. } | Entry::DotDot) => {}
803 Ok(Entry::None) => break, Err(_) => {
805 return Ok(i);
811 }
812 }
813 }
814
815 Ok(new_index)
816 }
817
818 pub fn readdir<B, S: FnMut(u64, fio::DirentType, &[u8]) -> ControlFlow<B, ()>>(
822 &self,
823 mut sink: S,
824 ) -> Result<Option<B>, zx::Status> {
825 let mut state = self.state.lock();
826 loop {
827 let entry = state.next(&self.proxy)?;
828 if let ControlFlow::Break(b) = match &entry {
829 Entry::Some { ino, entry_type, name } => {
830 sink(*ino, *entry_type, state.name(name.clone()))
831 }
832 Entry::DotDot => sink(0, fio::DirentType::Directory, b".."),
833 Entry::None => break,
834 } {
835 state.pending_entry = entry;
836 return Ok(Some(b));
837 }
838 }
839 Ok(None)
840 }
841
842 pub fn sync(&self) -> Result<(), zx::Status> {
844 self.proxy
845 .sync(zx::MonotonicInstant::INFINITE)
846 .map_err(|_| zx::Status::IO)?
847 .map_err(zx::Status::from_raw)
848 }
849
850 pub fn clone_proxy(&self) -> Result<fio::DirectorySynchronousProxy, zx::Status> {
852 let (client_end, server_end) = zx::Channel::create();
853 self.proxy.clone(server_end.into()).map_err(|_| zx::Status::IO)?;
854 Ok(client_end.into())
855 }
856}
857
858#[cfg(test)]
860thread_local! {
861 pub static TEST_HOOK: std::cell::RefCell<Option<Box<dyn FnMut()>>> =
862 std::cell::RefCell::new(None);
863}
864
865#[cfg(test)]
866mod tests {
867 use super::*;
868 use fidl::endpoints::{ControlHandle, RequestStream};
869 use fuchsia_async as fasync;
870 use futures::StreamExt;
871 use std::sync::Arc;
872 use std::sync::atomic::{AtomicU64, Ordering};
873
874 fn hook(f: Box<dyn FnMut()>) -> impl Drop {
875 TEST_HOOK.with(|hook| {
876 *hook.borrow_mut() = Some(f);
877 });
878 return scopeguard::guard((), |_| {
879 TEST_HOOK.with(|hook| {
880 *hook.borrow_mut() = None;
881 });
882 });
883 }
884
885 #[fuchsia::test]
886 async fn test_read_chunking() {
887 let (client, mut stream) = fidl::endpoints::create_request_stream::<fio::FileMarker>();
888 let content = vec![0xAB; (fio::MAX_TRANSFER_SIZE + 100) as usize];
889 let content_clone = content.clone();
890
891 let _server_task = fasync::Task::spawn(async move {
892 while let Some(Ok(request)) = stream.next().await {
893 match request {
894 fio::FileRequest::ReadAt { count, offset, responder } => {
895 let start = offset as usize;
896 let end = std::cmp::min(start + count as usize, content_clone.len());
897 let data = if start < content_clone.len() {
898 &content_clone[start..end]
899 } else {
900 &[]
901 };
902 responder.send(Ok(data)).unwrap();
903 }
904 _ => panic!("Unexpected request: {:?}", request),
905 }
906 }
907 });
908
909 let io = RemoteIo::new(client.into_channel().into());
910 fasync::unblock(move || {
911 let mut data = Vec::new();
912 let actual = io
913 .read(
914 0,
915 content.len(),
916 |chunk| -> Result<usize, zx::Status> {
917 let len = chunk.len();
918 data.extend(chunk);
919 Ok(len)
920 },
921 |status| status,
922 )
923 .unwrap();
924 assert_eq!(actual, content.len());
925 assert_eq!(data, content);
926 })
927 .await;
928 }
929
930 #[fuchsia::test]
931 async fn test_read_error_after_data() {
932 let (client, mut stream) = fidl::endpoints::create_request_stream::<fio::FileMarker>();
933 let chunk_size = 100;
934 let content = vec![0xAA; chunk_size];
935 let content_clone = content.clone();
936
937 let _server_task = fasync::Task::spawn(async move {
938 let mut request_count = 0;
939 while let Some(Ok(request)) = stream.next().await {
940 match request {
941 fio::FileRequest::ReadAt { count: _, offset: _, responder } => {
942 request_count += 1;
943 if request_count == 1 {
944 responder.send(Ok(&content_clone)).unwrap();
945 } else {
946 responder.send(Err(zx::sys::ZX_ERR_IO)).unwrap();
947 }
948 }
949 _ => panic!("Unexpected request: {:?}", request),
950 }
951 }
952 });
953
954 let io = RemoteIo::new(client.into_channel().into());
955 fasync::unblock(move || {
956 let mut data = Vec::new();
957 let actual = io
959 .read(
960 0,
961 chunk_size * 2,
962 |chunk| -> Result<usize, zx::Status> {
963 let len = chunk.len();
964 data.extend(chunk);
965 Ok(len)
966 },
967 |status| status,
968 )
969 .expect("read should succeed even if later chunks fail");
970 assert_eq!(actual, chunk_size);
971 assert_eq!(data.len(), chunk_size);
972 assert_eq!(data, content);
973 })
974 .await;
975 }
976
977 #[fuchsia::test]
978 async fn test_write_chunking() {
979 let (client, mut stream) = fidl::endpoints::create_request_stream::<fio::FileMarker>();
980 let content = vec![0xCD; (fio::MAX_TRANSFER_SIZE + 100) as usize];
981 let content2 = content.clone();
982
983 let server_task = fasync::Task::spawn(async move {
984 let mut written = vec![0; content2.len()];
985 while let Some(Ok(request)) = stream.next().await {
986 match request {
987 fio::FileRequest::WriteAt { offset, data, responder, .. } => {
988 let offset = offset as usize;
989 written[offset..offset + data.len()].copy_from_slice(&data);
990 responder.send(Ok(data.len() as u64)).unwrap();
991 }
992 _ => panic!("Unexpected request: {:?}", request),
993 }
994 }
995 assert_eq!(written, content2);
996 });
997
998 let io = RemoteIo::new(client.into_channel().into());
999 fasync::unblock(move || {
1000 let written = io.write(0, &content).expect("write failed");
1001 assert_eq!(written, content.len());
1002 })
1003 .await;
1004
1005 server_task.await;
1006 }
1007
1008 #[fuchsia::test]
1009 async fn test_write_error_after_data() {
1010 let (client, mut stream) = fidl::endpoints::create_request_stream::<fio::FileMarker>();
1011 let chunk_size = fio::MAX_TRANSFER_SIZE as usize;
1012 let content = vec![0xEE; chunk_size * 2];
1013
1014 let _server_task = fasync::Task::spawn(async move {
1015 let mut request_count = 0;
1016 while let Some(Ok(request)) = stream.next().await {
1017 match request {
1018 fio::FileRequest::WriteAt { offset: _, data, responder, .. } => {
1019 request_count += 1;
1020 if request_count == 1 {
1021 responder.send(Ok(data.len() as u64)).unwrap();
1022 } else {
1023 responder.send(Err(zx::sys::ZX_ERR_IO)).unwrap();
1024 }
1025 }
1026 _ => panic!("Unexpected request: {:?}", request),
1027 }
1028 }
1029 });
1030
1031 let io = RemoteIo::new(client.into_channel().into());
1032 fasync::unblock(move || {
1033 let written = io.write(0, &content).expect("write should succeed partial");
1034 assert_eq!(written, chunk_size);
1035 })
1036 .await;
1037 }
1038
1039 #[fuchsia::test]
1040 async fn test_large_directory() {
1041 let (client, mut stream) = fidl::endpoints::create_request_stream::<fio::DirectoryMarker>();
1042 let num_entries = 2000;
1043
1044 let task = fasync::Task::spawn(async move {
1045 let mut sent_count = 0;
1046 let mut num_requests = 0;
1047 while let Some(Ok(request)) = stream.next().await {
1048 match request {
1049 fio::DirectoryRequest::ReadDirents { max_bytes, responder } => {
1050 num_requests += 1;
1051 let mut buffer = Vec::new();
1052 while sent_count < num_entries {
1053 let name = if sent_count == 0 {
1054 ".".to_string()
1055 } else {
1056 format!("file_{}", sent_count - 1)
1057 };
1058 let name_bytes = name.as_bytes();
1059 let entry_size = 10 + name_bytes.len();
1060 if buffer.len() + entry_size > max_bytes as usize {
1061 break;
1062 }
1063 buffer.extend_from_slice(&(sent_count as u64 + 1).to_le_bytes());
1064 buffer.push(name_bytes.len() as u8);
1065 let entry_type = if sent_count == 0 {
1066 fio::DirentType::Directory
1067 } else {
1068 fio::DirentType::File
1069 };
1070 buffer.push(entry_type.into_primitive());
1071 buffer.extend_from_slice(name_bytes);
1072 sent_count += 1;
1073 }
1074 let _ = responder.send(0, &buffer);
1075 }
1076 fio::DirectoryRequest::Rewind { responder } => {
1077 sent_count = 0;
1078 let _ = responder.send(0);
1079 }
1080 fio::DirectoryRequest::Close { responder } => {
1081 let _ = responder.send(Ok(()));
1082 }
1083 _ => {}
1084 }
1085 }
1086 assert!(num_requests > 0);
1087 });
1088
1089 let dir = RemoteDirectory::new(client.into_channel().into());
1090 let count = Arc::new(AtomicU64::new(0));
1091 let count2 = count.clone();
1092 fasync::unblock(move || {
1093 dir.readdir::<(), _>(|_ino, _type, _name| {
1094 count.fetch_add(1, Ordering::Relaxed);
1095 ControlFlow::Continue(())
1096 })
1097 .unwrap();
1098 })
1099 .await;
1100 assert_eq!(count2.load(Ordering::Relaxed), num_entries + 1);
1102 task.await;
1103 }
1104
1105 #[fuchsia::test]
1106 async fn test_seek_backwards() {
1107 let (client, mut stream) = fidl::endpoints::create_request_stream::<fio::DirectoryMarker>();
1108 let _server_task = fasync::Task::spawn(async move {
1109 let entries = vec![
1110 (1, fio::DirentType::Directory, "."),
1111 (2, fio::DirentType::File, "file_0"),
1112 (3, fio::DirentType::File, "file_1"),
1113 (4, fio::DirentType::File, "file_2"),
1114 ];
1115 let mut current_entry = 0;
1116
1117 while let Some(Ok(request)) = stream.next().await {
1118 match request {
1119 fio::DirectoryRequest::ReadDirents { max_bytes, responder } => {
1120 let mut buffer = Vec::new();
1121 while current_entry < entries.len() {
1122 let (ino, type_, name) = entries[current_entry];
1123 let name_bytes = name.as_bytes();
1124 let entry_size = 10 + name_bytes.len();
1125 if buffer.len() + entry_size > max_bytes as usize {
1126 break;
1127 }
1128 buffer.extend_from_slice(&(ino as u64).to_le_bytes());
1129 buffer.push(name_bytes.len() as u8);
1130 buffer.push(type_.into_primitive());
1131 buffer.extend_from_slice(name_bytes);
1132 current_entry += 1;
1133 }
1134 responder.send(0, &buffer).unwrap();
1135 }
1136 fio::DirectoryRequest::Rewind { responder } => {
1137 current_entry = 0;
1138 responder.send(0).unwrap();
1139 }
1140 _ => {}
1141 }
1142 }
1143 });
1144
1145 let dir = RemoteDirectory::new(client.into_channel().into());
1146 fasync::unblock(move || {
1147 let mut names = Vec::new();
1148 dir.readdir::<(), _>(|_ino, _type, name| {
1150 names.push(name.to_vec());
1151 if names.len() == 3 { ControlFlow::Break(()) } else { ControlFlow::Continue(()) }
1152 })
1153 .unwrap();
1154
1155 assert_eq!(names[0], b".");
1156 assert_eq!(names[1], b"..");
1157 assert_eq!(names[2], b"file_0");
1158
1159 dir.seek(1).unwrap();
1162
1163 let mut names_after_seek = Vec::new();
1164 dir.readdir::<(), _>(|_ino, _type, name| {
1166 names_after_seek.push(name.to_vec());
1167 if names_after_seek.len() == 2 {
1168 ControlFlow::Break(())
1169 } else {
1170 ControlFlow::Continue(())
1171 }
1172 })
1173 .unwrap();
1174
1175 assert_eq!(names_after_seek[0], b"..");
1176 assert_eq!(names_after_seek[1], b"file_0");
1177 })
1178 .await;
1179 }
1180
1181 struct DummyFactory;
1182
1183 impl Factory for DummyFactory {
1184 type Result = RemoteIo;
1185 fn create_node(self, io: RemoteIo, _info: fio::NodeInfo) -> Self::Result {
1186 io
1187 }
1188 fn create_directory(self, io: RemoteIo, _info: fio::DirectoryInfo) -> Self::Result {
1189 io
1190 }
1191 fn create_file(self, io: RemoteIo, _info: fio::FileInfo) -> Self::Result {
1192 io
1193 }
1194 fn create_symlink(self, io: RemoteIo, _info: fio::SymlinkInfo) -> Self::Result {
1195 io
1196 }
1197 }
1198
1199 #[fuchsia::test]
1200 async fn test_open_pipelined() {
1201 let (client, stream) = fidl::endpoints::create_request_stream::<fio::DirectoryMarker>();
1202
1203 fn serve_mock_directory(
1204 mut stream: fio::DirectoryRequestStream,
1205 mut expected_paths: Vec<String>,
1206 ) {
1207 fasync::Task::spawn(async move {
1208 if let Some(Ok(request)) = stream.next().await {
1209 match request {
1210 fio::DirectoryRequest::Open { path, flags, options: _, object, .. } => {
1211 if !expected_paths.is_empty() {
1212 let expected = expected_paths.remove(0);
1213 assert_eq!(path, expected);
1214 if expected == "path1" {
1215 assert!(flags.contains(fio::Flags::PROTOCOL_DIRECTORY));
1216 } else if expected == "path2" {
1217 assert!(!flags.contains(fio::Flags::PROTOCOL_DIRECTORY));
1218 }
1219 }
1220 let server_end =
1221 fidl::endpoints::ServerEnd::<fio::DirectoryMarker>::new(object);
1222 let dir_stream = server_end.into_stream();
1223 let control_handle = dir_stream.control_handle();
1224 let representation =
1225 fio::Representation::Directory(fio::DirectoryInfo::default());
1226 control_handle.send_on_representation(representation).unwrap();
1227
1228 serve_mock_directory(dir_stream, expected_paths);
1229 }
1230 _ => {}
1231 }
1232 }
1233 })
1234 .detach();
1235 }
1236
1237 serve_mock_directory(stream, vec!["path1".to_string(), "path2".to_string()]);
1238
1239 let io = RemoteIo::new(client.into_channel().into());
1240 fasync::unblock(move || {
1241 let results = io.open_pipelined(
1242 &["path1", "path2"],
1243 fio::Flags::empty(),
1244 fio::NodeAttributesQuery::empty(),
1245 || DummyFactory,
1246 );
1247 assert_eq!(results.len(), 2);
1248 assert!(results.iter().all(|r| r.is_ok()));
1249 })
1250 .await;
1251 }
1252
1253 #[fuchsia::test]
1254 async fn test_open_pipelined_not_found() {
1255 let (client, stream) = fidl::endpoints::create_request_stream::<fio::DirectoryMarker>();
1256
1257 fn serve_mock_directory_not_found(mut stream: fio::DirectoryRequestStream) {
1258 fasync::Task::spawn(async move {
1259 if let Some(Ok(request)) = stream.next().await {
1260 match request {
1261 fio::DirectoryRequest::Open {
1262 path, flags: _, options: _, object, ..
1263 } => {
1264 let server_end =
1265 fidl::endpoints::ServerEnd::<fio::DirectoryMarker>::new(object);
1266 let dir_stream = server_end.into_stream();
1267 let control_handle = dir_stream.control_handle();
1268
1269 if path == "not_found" {
1270 control_handle.shutdown_with_epitaph(zx::Status::NOT_FOUND);
1271 } else {
1272 let representation =
1273 fio::Representation::Directory(fio::DirectoryInfo::default());
1274 control_handle.send_on_representation(representation).unwrap();
1275 serve_mock_directory_not_found(dir_stream);
1276 }
1277 }
1278 _ => {}
1279 }
1280 }
1281 })
1282 .detach();
1283 }
1284
1285 serve_mock_directory_not_found(stream);
1286
1287 let io = RemoteIo::new(client.into_channel().into());
1288 fasync::unblock(move || {
1289 let results = io.open_pipelined(
1290 &["path1", "not_found", "path3"],
1291 fio::Flags::empty(),
1292 fio::NodeAttributesQuery::empty(),
1293 || DummyFactory,
1294 );
1295 assert_eq!(results.len(), 2);
1296 assert!(results[0].is_ok());
1297 assert_eq!(results[1].as_ref().err(), Some(&zx::Status::NOT_FOUND));
1298 })
1299 .await;
1300 }
1301
1302 #[fuchsia::test]
1303 async fn test_open_pipelined_peer_closed() {
1304 let (client, stream) = fidl::endpoints::create_request_stream::<fio::DirectoryMarker>();
1305
1306 fn serve_mock_directory_close_early(mut stream: fio::DirectoryRequestStream) {
1307 fasync::Task::spawn(async move {
1308 if let Some(Ok(request)) = stream.next().await {
1309 match request {
1310 fio::DirectoryRequest::Open { object, .. } => {
1311 drop(object);
1313 }
1314 _ => {}
1315 }
1316 }
1317 })
1318 .detach();
1319 }
1320
1321 serve_mock_directory_close_early(stream);
1322
1323 let io = RemoteIo::new(client.into_channel().into());
1324 fasync::unblock(move || {
1325 let _guard = hook(Box::new(|| {
1328 std::thread::sleep(std::time::Duration::from_millis(100));
1329 }));
1330 let results = io.open_pipelined(
1331 &["path1", "path2"],
1332 fio::Flags::empty(),
1333 fio::NodeAttributesQuery::empty(),
1334 || DummyFactory,
1335 );
1336 assert_eq!(results.len(), 1);
1337 assert_eq!(results[0].as_ref().err(), Some(&zx::Status::PEER_CLOSED));
1338 })
1339 .await;
1340 }
1341}