Skip to main content

sync_io_client/
lib.rs

1// Copyright 2026 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! This library provides a synchronous wrapper around fuchsia.io.  It is usually better to use the
6//! Rust standard library for this (which will use fdio and zxio).  The primary user of this library
7//! is Starnix which uses this for performance and memory reasons.
8
9use fidl::endpoints::SynchronousProxy;
10use fidl_fuchsia_io as fio;
11use fuchsia_sync::Mutex;
12use std::ops::{ControlFlow, Range};
13use syncio::{AllocateMode, zxio_fsverity_descriptor_t, zxio_node_attributes_t};
14use zerocopy::FromBytes;
15
16fn zxio_attr_from_fidl_attributes(in_attr: &fio::NodeAttributes2) -> zxio_node_attributes_t {
17    zxio_attr_from_fidl(&in_attr.mutable_attributes, &in_attr.immutable_attributes)
18}
19
20/// Converts FIDL attributes to `zxio_node_attributes_t`.
21///
22/// NOTE: This does not work for _all_ attributes e.g. fsverity options and root hash.
23fn zxio_attr_from_fidl(
24    mutable: &fio::MutableNodeAttributes,
25    immutable: &fio::ImmutableNodeAttributes,
26) -> zxio_node_attributes_t {
27    let mut out_attr = zxio_node_attributes_t::default();
28    if let Some(protocols) = immutable.protocols {
29        out_attr.protocols = protocols.bits();
30        out_attr.has.protocols = true;
31    }
32    if let Some(abilities) = immutable.abilities {
33        out_attr.abilities = abilities.bits();
34        out_attr.has.abilities = true;
35    }
36    if let Some(id) = immutable.id {
37        out_attr.id = id;
38        out_attr.has.id = true;
39    }
40    if let Some(content_size) = immutable.content_size {
41        out_attr.content_size = content_size;
42        out_attr.has.content_size = true;
43    }
44    if let Some(storage_size) = immutable.storage_size {
45        out_attr.storage_size = storage_size;
46        out_attr.has.storage_size = true;
47    }
48    if let Some(link_count) = immutable.link_count {
49        out_attr.link_count = link_count;
50        out_attr.has.link_count = true;
51    }
52    if let Some(creation_time) = mutable.creation_time {
53        out_attr.creation_time = creation_time;
54        out_attr.has.creation_time = true;
55    }
56    if let Some(modification_time) = mutable.modification_time {
57        out_attr.modification_time = modification_time;
58        out_attr.has.modification_time = true;
59    }
60    if let Some(access_time) = mutable.access_time {
61        out_attr.access_time = access_time;
62        out_attr.has.access_time = true;
63    }
64    if let Some(mode) = mutable.mode {
65        out_attr.mode = mode;
66        out_attr.has.mode = true;
67    }
68    if let Some(uid) = mutable.uid {
69        out_attr.uid = uid;
70        out_attr.has.uid = true;
71    }
72    if let Some(gid) = mutable.gid {
73        out_attr.gid = gid;
74        out_attr.has.gid = true;
75    }
76    if let Some(rdev) = mutable.rdev {
77        out_attr.rdev = rdev;
78        out_attr.has.rdev = true;
79    }
80    if let Some(change_time) = immutable.change_time {
81        out_attr.change_time = change_time;
82        out_attr.has.change_time = true;
83    }
84    if let Some(casefold) = mutable.casefold {
85        out_attr.casefold = casefold;
86        out_attr.has.casefold = true;
87    }
88    if let Some(verity_enabled) = immutable.verity_enabled {
89        out_attr.fsverity_enabled = verity_enabled;
90        out_attr.has.fsverity_enabled = true;
91    }
92    if let Some(wrapping_key_id) = mutable.wrapping_key_id {
93        out_attr.wrapping_key_id = wrapping_key_id;
94        out_attr.has.wrapping_key_id = true;
95    }
96    out_attr
97}
98
99/// This trait is used so that clients can create their own wrappers around types exposed here.
100pub trait Factory {
101    type Result;
102
103    fn create_node(self, io: RemoteIo) -> Self::Result;
104    fn create_directory(self, io: RemoteIo) -> Self::Result;
105    fn create_file(self, io: RemoteIo, info: &fio::FileInfo) -> Self::Result;
106    fn create_symlink(self, io: RemoteIo, target: Vec<u8>) -> Self::Result;
107}
108
109/// Waits for the fuchsia.io `OnRepresentation` event and then uses the factory to create an an
110/// appropriate object.  This returns attributes (as requested by the corresponding `open` call)
111/// that are present in the `OnRepresentation` event.
112///
113/// NOTE: The attributes returned are not comprehensive.  Check `zxio_attr_from_fidl` above for
114/// supported attributes.
115pub fn create_with_on_representation<F: Factory>(
116    proxy: fio::NodeSynchronousProxy,
117    factory: F,
118) -> Result<(F::Result, zxio_node_attributes_t, Option<fio::SelinuxContext>), zx::Status> {
119    match proxy.wait_for_event(zx::MonotonicInstant::INFINITE) {
120        Ok(fio::NodeEvent::OnRepresentation { mut payload }) => {
121            let (result, attrs) = match &mut payload {
122                fio::Representation::Node(info) => {
123                    (factory.create_node(RemoteIo::new(proxy)), &mut info.attributes)
124                }
125                fio::Representation::Directory(info) => {
126                    (factory.create_directory(RemoteIo::new(proxy)), &mut info.attributes)
127                }
128                fio::Representation::File(info) => (
129                    factory.create_file(
130                        RemoteIo {
131                            proxy,
132                            stream: info
133                                .stream
134                                .take()
135                                .map(zx::Stream::from)
136                                .unwrap_or_else(|| zx::NullableHandle::invalid().into()),
137                        },
138                        info,
139                    ),
140                    &mut info.attributes,
141                ),
142                fio::Representation::Symlink(info) => {
143                    let Some(target) = info.target.take() else {
144                        return Err(zx::Status::IO);
145                    };
146                    (factory.create_symlink(RemoteIo::new(proxy), target), &mut info.attributes)
147                }
148                _ => return Err(zx::Status::NOT_SUPPORTED),
149            };
150            let (attrs, context) = attrs
151                .as_mut()
152                .map(|a| {
153                    (zxio_attr_from_fidl_attributes(a), a.mutable_attributes.selinux_context.take())
154                })
155                .unwrap_or_default();
156            Ok((result, attrs, context))
157        }
158        Err(fidl::Error::ClientChannelClosed { status, .. }) => Err(status),
159        _ => Err(zx::Status::IO),
160    }
161}
162
163/// Wraps a proxy and optional stream and provides wrappers around most fuchsia.io methods.
164///
165/// NOTE: The caller must take care to call appropriate methods for the underlying type.  Calling
166/// the wrong methods (e.g. calling file methods on a directory) will result in the connection being
167/// closed.
168pub struct RemoteIo {
169    proxy: fio::NodeSynchronousProxy,
170    // NOTE: This can be invalid if the remote end did not return a stream in which case
171    // file I/O will use FIDL (slow).
172    stream: zx::Stream,
173}
174
175impl RemoteIo {
176    pub fn new(proxy: fio::NodeSynchronousProxy) -> Self {
177        Self { proxy, stream: zx::NullableHandle::invalid().into() }
178    }
179
180    pub fn with_stream(proxy: fio::NodeSynchronousProxy, stream: zx::Stream) -> Self {
181        Self { proxy, stream }
182    }
183
184    pub fn into_proxy(self) -> fio::NodeSynchronousProxy {
185        self.proxy
186    }
187
188    fn cast_proxy<T: From<zx::Channel> + Into<zx::NullableHandle>>(&self) -> zx::Unowned<'_, T> {
189        zx::Unowned::new(self.proxy.as_channel())
190    }
191
192    /// Returns attributes in fuchsia.io's FIDL representation.
193    pub fn attr_get(
194        &self,
195        query: fio::NodeAttributesQuery,
196    ) -> Result<(fio::MutableNodeAttributes, fio::ImmutableNodeAttributes), zx::Status> {
197        self.proxy
198            .get_attributes(query, zx::MonotonicInstant::INFINITE)
199            .map_err(|_| zx::Status::IO)?
200            .map_err(zx::Status::from_raw)
201    }
202
203    /// Returns attributes mapped to `zxio_node_attributes_t`
204    ///
205    /// NOTE: Not all attributes are supported.  See `zxio_attr_from_fidl` above for supported
206    /// attributes.
207    pub fn attr_get_zxio(
208        &self,
209        query: fio::NodeAttributesQuery,
210    ) -> Result<zxio_node_attributes_t, zx::Status> {
211        self.attr_get(query).map(|(m, i)| zxio_attr_from_fidl(&m, &i))
212    }
213
214    /// Sets attributes.
215    pub fn attr_set(&self, attributes: fio::MutableNodeAttributes) -> Result<(), zx::Status> {
216        self.proxy
217            .update_attributes(&attributes, zx::MonotonicInstant::INFINITE)
218            .map_err(|_| zx::Status::IO)?
219            .map_err(zx::Status::from_raw)
220    }
221
222    /// Wraps fuchsia.io/Directory's Open.
223    pub fn open<F: Factory>(
224        &self,
225        path: &str,
226        flags: fio::Flags,
227        create_attributes: Option<fio::MutableNodeAttributes>,
228        query: fio::NodeAttributesQuery,
229        factory: F,
230    ) -> Result<(F::Result, zxio_node_attributes_t, Option<fio::SelinuxContext>), zx::Status> {
231        let (client_end, server_end) = zx::Channel::create();
232        let dir_proxy = self.cast_proxy::<fio::DirectorySynchronousProxy>();
233        dir_proxy
234            .open(
235                path,
236                flags | fio::Flags::FLAG_SEND_REPRESENTATION,
237                &fio::Options {
238                    attributes: (!query.is_empty()).then_some(query),
239                    create_attributes,
240                    ..Default::default()
241                },
242                server_end,
243            )
244            .map_err(|_| zx::Status::IO)?;
245        create_with_on_representation(client_end.into(), factory)
246    }
247
248    /// Returns `(data, eof)`, where `eof` is true if we encountered the end of the file.  If `eof`
249    /// is false, then it is still possible that a subsequent read would read no more i.e. the end
250    /// of the file _might_ have been reached.  This might return fewer bytes than `max`.
251    pub fn read_partial(&self, offset: u64, max: usize) -> Result<(Vec<u8>, bool), zx::Status> {
252        if self.stream.is_invalid() {
253            let file_proxy = self.cast_proxy::<fio::FileSynchronousProxy>();
254            let max = std::cmp::min(max as u64, fio::MAX_TRANSFER_SIZE);
255            let data = file_proxy
256                .read_at(max, offset, zx::MonotonicInstant::INFINITE)
257                .map_err(|_| zx::Status::IO)?
258                .map_err(zx::Status::from_raw)?;
259            let eof = (data.len() as u64) < max;
260            Ok((data, eof))
261        } else {
262            // Use an intermediate buffer.
263            let bytes =
264                self.stream.read_at_to_vec(zx::StreamReadOptions::empty(), offset as u64, max)?;
265            let eof = bytes.len() < max;
266            Ok((bytes, eof))
267        }
268    }
269
270    /// Attempts to read `len` bytes and will only return fewer if it encounters the end of the
271    /// file, or an error.  `callback` will be called for each chunk.  If any bytes are successfully
272    /// passed to `callback`, `read` will return the total number of bytes successfully written and
273    /// any error encountered will be discarded.
274    pub fn read<E>(
275        &self,
276        offset: u64,
277        len: usize,
278        mut callback: impl FnMut(Vec<u8>) -> Result<usize, E>,
279        map_err: impl FnOnce(zx::Status) -> E,
280    ) -> Result<usize, E> {
281        let mut total = 0;
282        while total < len {
283            match self.read_partial(offset + total as u64, len - total) {
284                Ok((data, eof)) => {
285                    if data.is_empty() {
286                        break;
287                    }
288                    let data_len = data.len();
289                    let written = callback(data)?;
290                    total += written;
291                    if eof || written < data_len {
292                        break;
293                    }
294                }
295                Err(e) => {
296                    if total > 0 {
297                        break;
298                    }
299                    return Err(map_err(e));
300                }
301            }
302        }
303        Ok(total)
304    }
305
306    /// Writes `data` at `offset`.
307    pub fn write(&self, offset: u64, data: &[u8]) -> Result<usize, zx::Status> {
308        let file_proxy = self.cast_proxy::<fio::FileSynchronousProxy>();
309        let mut total_written = 0;
310        for chunk in data.chunks(fio::MAX_TRANSFER_SIZE as usize) {
311            let result = file_proxy
312                .write_at(chunk, offset + total_written as u64, zx::MonotonicInstant::INFINITE)
313                .map_err(|_| zx::Status::IO)
314                .and_then(|res| res.map_err(zx::Status::from_raw));
315            match result {
316                Ok(actual) => {
317                    let actual = actual as usize;
318                    total_written += actual;
319                    if actual < chunk.len() {
320                        return Ok(total_written);
321                    }
322                }
323                Err(e) => {
324                    if total_written > 0 {
325                        break;
326                    }
327                    return Err(e);
328                }
329            }
330        }
331        Ok(total_written)
332    }
333
334    /// Returns true if vectored operations are supported.
335    pub fn supports_vectored(&self) -> bool {
336        // We only support readv and writev if we have a stream.
337        !self.stream.is_invalid()
338    }
339
340    /// Reads into `iovecs` using a vectored read.  This is only supported with a valid stream.  See
341    /// `supports_vectored` above.
342    ///
343    /// # Safety
344    ///
345    /// Same as `zx::Stream::readv`.
346    pub unsafe fn readv(
347        &self,
348        offset: u64,
349        iovecs: &mut [zx::sys::zx_iovec_t],
350    ) -> Result<usize, zx::Status> {
351        if self.stream.is_invalid() {
352            return Err(zx::Status::NOT_SUPPORTED);
353        }
354        // SAFETY: See `zx::Stream::readv`.
355        unsafe { self.stream.readv_at(zx::StreamReadOptions::empty(), offset as u64, iovecs) }
356    }
357
358    /// Writes from `iovecs` using vectored write.  This is only supported with a valid stream.  See
359    /// `supports_vectored` above.
360    pub fn writev(&self, offset: u64, iovecs: &[zx::sys::zx_iovec_t]) -> Result<usize, zx::Status> {
361        if self.stream.is_invalid() {
362            return Err(zx::Status::NOT_SUPPORTED);
363        }
364        self.stream.writev_at(zx::StreamWriteOptions::empty(), offset, &iovecs)
365    }
366
367    /// Wraps fuchsia.io/File's Truncate.
368    pub fn truncate(&self, length: u64) -> Result<(), zx::Status> {
369        self.cast_proxy::<fio::FileSynchronousProxy>()
370            .resize(length, zx::MonotonicInstant::INFINITE)
371            .map_err(|_| zx::Status::IO)?
372            .map_err(zx::Status::from_raw)
373    }
374
375    /// Returns a VMO backing the file.
376    pub fn vmo_get(&self, flags: zx::VmarFlags) -> Result<zx::Vmo, zx::Status> {
377        let mut fio_flags = fio::VmoFlags::empty();
378        if flags.contains(zx::VmarFlags::PERM_READ) {
379            fio_flags |= fio::VmoFlags::READ;
380        }
381        if flags.contains(zx::VmarFlags::PERM_WRITE) {
382            fio_flags |= fio::VmoFlags::WRITE;
383        }
384        if flags.contains(zx::VmarFlags::PERM_EXECUTE) {
385            fio_flags |= fio::VmoFlags::EXECUTE;
386        }
387        let file_proxy = self.cast_proxy::<fio::FileSynchronousProxy>();
388        let vmo = file_proxy
389            .get_backing_memory(fio_flags, zx::MonotonicInstant::INFINITE)
390            .map_err(|_| zx::Status::IO)?
391            .map_err(zx::Status::from_raw)?;
392        Ok(vmo)
393    }
394
395    /// Wraps fuchsia.io/Node's Sync.
396    pub fn sync(&self) -> Result<(), zx::Status> {
397        self.proxy
398            .sync(zx::MonotonicInstant::INFINITE)
399            .map_err(|_| zx::Status::IO)?
400            .map_err(zx::Status::from_raw)
401    }
402
403    /// Closes and updates access time asynchronously.
404    pub fn close_and_update_access_time(self) {
405        let _ = self.proxy.get_attributes(
406            fio::NodeAttributesQuery::PENDING_ACCESS_TIME_UPDATE,
407            zx::MonotonicInstant::INFINITE_PAST,
408        );
409    }
410
411    /// Clones (in the fuchsia.unknown.Clonable sense) the underlying proxy.
412    pub fn clone_proxy(&self) -> Result<fio::NodeSynchronousProxy, zx::Status> {
413        let (client_end, server_end) = zx::Channel::create();
414        self.proxy.clone(server_end.into()).map_err(|_| zx::Status::IO)?;
415        Ok(client_end.into())
416    }
417
418    /// Wraps fuchsia.io/Node's LinkInto.
419    pub fn link_into(&self, target_dir: &Self, name: &str) -> Result<(), zx::Status> {
420        let target_dir_proxy = target_dir.cast_proxy::<fio::DirectorySynchronousProxy>();
421        let (status, token) = target_dir_proxy
422            .get_token(zx::MonotonicInstant::INFINITE)
423            .map_err(|_| zx::Status::IO)?;
424        zx::Status::ok(status)?;
425        let token = token.ok_or(zx::Status::NOT_SUPPORTED)?;
426
427        // Linkable::LinkInto is a separate protocol.
428        let linkable_proxy = self.cast_proxy::<fio::LinkableSynchronousProxy>();
429
430        linkable_proxy
431            .link_into(token.into(), name, zx::MonotonicInstant::INFINITE)
432            .map_err(|_| zx::Status::IO)?
433            .map_err(zx::Status::from_raw)
434    }
435
436    /// Wraps fuchsia.io/Directory's Unlink.
437    pub fn unlink(&self, name: &str, flags: fio::UnlinkFlags) -> Result<(), zx::Status> {
438        let options = fio::UnlinkOptions { flags: Some(flags), ..Default::default() };
439        let dir_proxy = self.cast_proxy::<fio::DirectorySynchronousProxy>();
440        dir_proxy
441            .unlink(name, &options, zx::MonotonicInstant::INFINITE)
442            .map_err(|_| zx::Status::IO)?
443            .map_err(zx::Status::from_raw)
444    }
445
446    /// Wraps fuchsia.io/Directory's Rename.
447    pub fn rename(
448        &self,
449        old_path: &str,
450        new_directory: &Self,
451        new_path: &str,
452    ) -> Result<(), zx::Status> {
453        let new_dir_proxy = new_directory.cast_proxy::<fio::DirectorySynchronousProxy>();
454        let (status, token) =
455            new_dir_proxy.get_token(zx::MonotonicInstant::INFINITE).map_err(|_| zx::Status::IO)?;
456        zx::Status::ok(status)?;
457        let token = token.ok_or(zx::Status::NOT_SUPPORTED)?;
458        let dir_proxy = self.cast_proxy::<fio::DirectorySynchronousProxy>();
459        dir_proxy
460            .rename(old_path, token.into(), new_path, zx::MonotonicInstant::INFINITE)
461            .map_err(|_| zx::Status::IO)?
462            .map_err(zx::Status::from_raw)
463    }
464
465    /// Wraps fuchsia.io/Directory's CreateSymlink.
466    pub fn create_symlink(&self, name: &str, target: &[u8]) -> Result<RemoteIo, zx::Status> {
467        let dir_proxy = self.cast_proxy::<fio::DirectorySynchronousProxy>();
468        let (client_end, server_end) = zx::Channel::create();
469        dir_proxy
470            .create_symlink(name, target, Some(server_end.into()), zx::MonotonicInstant::INFINITE)
471            .map_err(|_| zx::Status::IO)?
472            .map_err(zx::Status::from_raw)?;
473        Ok(RemoteIo::new(client_end.into()))
474    }
475
476    /// Wraps fuchsia.io/File's EnableVerity.
477    pub fn enable_verity(&self, descriptor: &zxio_fsverity_descriptor_t) -> Result<(), zx::Status> {
478        let file_proxy = self.cast_proxy::<fio::FileSynchronousProxy>();
479        let options = fio::VerificationOptions {
480            hash_algorithm: Some(match descriptor.hash_algorithm {
481                1 => fio::HashAlgorithm::Sha256,
482                2 => fio::HashAlgorithm::Sha512,
483                _ => return Err(zx::Status::INVALID_ARGS),
484            }),
485            salt: Some(descriptor.salt[..descriptor.salt_size as usize].to_vec()),
486            ..Default::default()
487        };
488        file_proxy
489            .enable_verity(&options, zx::MonotonicInstant::INFINITE)
490            .map_err(|_| zx::Status::IO)?
491            .map_err(zx::Status::from_raw)
492    }
493
494    /// Wraps fuchsia.io/File's Allocate.
495    pub fn allocate(&self, offset: u64, len: u64, mode: AllocateMode) -> Result<(), zx::Status> {
496        let file_proxy = self.cast_proxy::<fio::FileSynchronousProxy>();
497        let mut fio_mode = fio::AllocateMode::empty();
498        if mode.contains(AllocateMode::KEEP_SIZE) {
499            fio_mode |= fio::AllocateMode::KEEP_SIZE;
500        }
501        if mode.contains(AllocateMode::UNSHARE_RANGE) {
502            fio_mode |= fio::AllocateMode::UNSHARE_RANGE;
503        }
504        if mode.contains(AllocateMode::PUNCH_HOLE) {
505            fio_mode |= fio::AllocateMode::PUNCH_HOLE;
506        }
507        if mode.contains(AllocateMode::COLLAPSE_RANGE) {
508            fio_mode |= fio::AllocateMode::COLLAPSE_RANGE;
509        }
510        if mode.contains(AllocateMode::ZERO_RANGE) {
511            fio_mode |= fio::AllocateMode::ZERO_RANGE;
512        }
513        if mode.contains(AllocateMode::INSERT_RANGE) {
514            fio_mode |= fio::AllocateMode::INSERT_RANGE;
515        }
516        file_proxy
517            .allocate(offset, len, fio_mode, zx::MonotonicInstant::INFINITE)
518            .map_err(|_| zx::Status::IO)?
519            .map_err(zx::Status::from_raw)
520    }
521
522    /// Wraps fuchsia.io/Node's GetExtendedAttribute.
523    pub fn xattr_get(&self, name: &[u8]) -> Result<Vec<u8>, zx::Status> {
524        let name_str = std::str::from_utf8(name.as_ref()).map_err(|_| zx::Status::INVALID_ARGS)?;
525        let result = self
526            .proxy
527            .get_extended_attribute(name_str.as_bytes(), zx::MonotonicInstant::INFINITE)
528            .map_err(|_| zx::Status::IO)?
529            .map_err(zx::Status::from_raw)?;
530        match result {
531            fio::ExtendedAttributeValue::Bytes(bytes) => Ok(bytes),
532            fio::ExtendedAttributeValue::Buffer(vmo) => {
533                let size = vmo.get_content_size().map_err(|_| zx::Status::IO)?;
534                let mut bytes = vec![0u8; size as usize];
535                vmo.read(&mut bytes, 0).map_err(|_| zx::Status::IO)?;
536                Ok(bytes)
537            }
538            _ => Err(zx::Status::NOT_SUPPORTED),
539        }
540    }
541
542    /// Wraps fuchsia.io/Node's SetExtendedAttribute.
543    pub fn xattr_set(
544        &self,
545        name: &[u8],
546        value: &[u8],
547        mode: syncio::XattrSetMode,
548    ) -> Result<(), zx::Status> {
549        let val = fio::ExtendedAttributeValue::Bytes(value.to_vec());
550        let fidl_mode = match mode {
551            syncio::XattrSetMode::Set => fio::SetExtendedAttributeMode::Set,
552            syncio::XattrSetMode::Create => fio::SetExtendedAttributeMode::Create,
553            syncio::XattrSetMode::Replace => fio::SetExtendedAttributeMode::Replace,
554        };
555        self.proxy
556            .set_extended_attribute(name, val, fidl_mode, zx::MonotonicInstant::INFINITE)
557            .map_err(|_| zx::Status::IO)?
558            .map_err(zx::Status::from_raw)
559    }
560
561    /// Wraps fuchsia.io/Node's RenoveExtendedAttribute.
562    pub fn xattr_remove(&self, name: &[u8]) -> Result<(), zx::Status> {
563        let name_str = std::str::from_utf8(name.as_ref()).map_err(|_| zx::Status::INVALID_ARGS)?;
564        self.proxy
565            .remove_extended_attribute(name_str.as_bytes(), zx::MonotonicInstant::INFINITE)
566            .map_err(|_| zx::Status::IO)?
567            .map_err(zx::Status::from_raw)
568    }
569
570    /// Wraps fuchsia.io/Node's ListExtendedAttributes.
571    pub fn xattr_list(&self) -> Result<Vec<Vec<u8>>, zx::Status> {
572        let (client_end, server_end) = zx::Channel::create();
573        self.proxy.list_extended_attributes(server_end.into()).map_err(|_| zx::Status::IO)?;
574        let iterator = fio::ExtendedAttributeIteratorSynchronousProxy::new(client_end);
575        let mut all_attrs = vec![];
576        loop {
577            let (attributes, last) = iterator
578                .get_next(zx::MonotonicInstant::INFINITE)
579                .map_err(|_| zx::Status::IO)?
580                .map_err(zx::Status::from_raw)?;
581            all_attrs.extend(attributes);
582            if last {
583                break;
584            }
585        }
586        Ok(all_attrs)
587    }
588}
589
590/// RemoteDirectory supports iteration of directories and things that you can do to directories via
591/// a file descriptor.  Other opterations, such as creating children, can be done via `RemoteIo`.
592/// Iteration is not safe to be done concurrently because there is a seek pointer; `readdir` will
593/// resume from the seek position.
594pub struct RemoteDirectory {
595    proxy: fio::DirectorySynchronousProxy,
596    state: Mutex<State>,
597}
598
599#[derive(Default)]
600struct State {
601    /// Buffer contains the last batch of entries read from the remote end.
602    buffer: Vec<u8>,
603
604    /// Position in the buffer for the next entry.
605    offset: usize,
606
607    /// If the last attempt to write to the sink failed, this contains the entry that is pending to
608    /// be added. This is also used to synthesize dot-dot.
609    pending_entry: Entry,
610
611    /// The current iterator position in the directory.
612    current_index: u64,
613}
614
615impl State {
616    fn name(&self, range: Range<usize>) -> &[u8] {
617        &self.buffer[range]
618    }
619
620    /// Returns the next dir entry. If no more entries are found, returns None.  Returns an error if
621    /// the iterator fails.
622    fn next(&mut self, proxy: &fio::DirectorySynchronousProxy) -> Result<Entry, zx::Status> {
623        let mut next_dirent = || -> Result<Entry, zx::Status> {
624            if self.offset >= self.buffer.len() {
625                match proxy.read_dirents(fio::MAX_BUF, zx::MonotonicInstant::INFINITE) {
626                    Ok((status, dirents)) => {
627                        zx::Status::ok(status)?;
628                        if dirents.is_empty() {
629                            return Ok(Entry::None);
630                        }
631                        self.buffer = dirents;
632                        self.offset = 0;
633                    }
634                    Err(_) => return Err(zx::Status::IO),
635                }
636            }
637
638            #[repr(C, packed)]
639            #[derive(FromBytes)]
640            struct DirectoryEntry {
641                ino: u64,
642                name_len: u8,
643                entry_type: u8,
644            }
645
646            let Some((ino, name, entry_type)) =
647                DirectoryEntry::read_from_prefix(&self.buffer[self.offset..]).ok().and_then(
648                    |(DirectoryEntry { ino, name_len, entry_type }, remainder)| {
649                        let name_len = name_len as usize;
650                        let name_start = self.offset + std::mem::size_of::<DirectoryEntry>();
651                        (remainder.len() >= name_len).then_some((
652                            ino,
653                            name_start..name_start + name_len,
654                            entry_type,
655                        ))
656                    },
657                )
658            else {
659                // Truncated entry.
660                return Ok(Entry::None);
661            };
662
663            self.offset = name.end;
664
665            Ok(Entry::Some {
666                ino,
667                entry_type: fio::DirentType::from_primitive(entry_type).ok_or(zx::Status::IO)?,
668                name,
669            })
670        };
671
672        let mut next = self.pending_entry.take();
673        if let Entry::None = next {
674            next = next_dirent()?;
675        }
676        // We only want to synthesize .. if . exists because the . and .. entries get removed if the
677        // directory is unlinked, so if the remote filesystem has removed ., we know to omit the
678        // .. entry.
679        match &next {
680            Entry::Some { name, .. } if self.name(name.clone()) == b"." => {
681                self.pending_entry = Entry::DotDot;
682            }
683            _ => {}
684        }
685        self.current_index += 1;
686        Ok(next)
687    }
688
689    fn rewind(&mut self, proxy: &fio::DirectorySynchronousProxy) -> Result<(), zx::Status> {
690        self.pending_entry = Entry::None;
691        let status = proxy.rewind(zx::MonotonicInstant::INFINITE).map_err(|_| zx::Status::IO)?;
692        zx::Status::ok(status)?;
693        self.buffer.clear();
694        self.offset = 0;
695        self.current_index = 0;
696        Ok(())
697    }
698}
699
700#[derive(Default)]
701enum Entry {
702    // Indicates no more entries.
703    #[default]
704    None,
705
706    Some {
707        ino: u64,
708        entry_type: fio::DirentType,
709        name: Range<usize>,
710    },
711
712    // Indicates dot-dot should be synthesized.
713    DotDot,
714}
715
716impl Entry {
717    fn take(&mut self) -> Entry {
718        std::mem::replace(self, Entry::None)
719    }
720}
721
722impl RemoteDirectory {
723    pub fn new(proxy: fio::DirectorySynchronousProxy) -> Self {
724        Self { proxy, state: Mutex::default() }
725    }
726
727    /// Seeks to `new_index` in the directory.
728    pub fn seek(&self, new_index: u64) -> Result<u64, zx::Status> {
729        let mut state = self.state.lock();
730
731        if new_index < state.current_index {
732            // Our iterator only goes forward, so reset it here.  Note: we *must* rewind it rather
733            // than just create a new iterator because the remote end maintains the offset.
734            state.rewind(&self.proxy)?;
735            state.current_index = 0;
736        }
737
738        // Advance the iterator to catch up with the offset.
739        for i in state.current_index..new_index {
740            match state.next(&self.proxy) {
741                Ok(Entry::Some { .. } | Entry::DotDot) => {}
742                Ok(Entry::None) => break, // No more entries.
743                Err(_) => {
744                    // In order to keep the offset and the iterator in sync, set the new offset
745                    // to be as far as we could get.
746                    // Note that failing the seek here would also cause the iterator and the
747                    // offset to not be in sync, because the iterator has already moved from
748                    // where it was.
749                    return Ok(i);
750                }
751            }
752        }
753
754        Ok(new_index)
755    }
756
757    /// Returns `None` if there are no more entries to be read.  `sink` can choose to return
758    /// `ControlFlow::Break(_)` in which case the entry will be returned the next time `readdir` is
759    /// called.
760    pub fn readdir<B, S: FnMut(u64, fio::DirentType, &[u8]) -> ControlFlow<B, ()>>(
761        &self,
762        mut sink: S,
763    ) -> Result<Option<B>, zx::Status> {
764        let mut state = self.state.lock();
765        loop {
766            let entry = state.next(&self.proxy)?;
767            if let ControlFlow::Break(b) = match &entry {
768                Entry::Some { ino, entry_type, name } => {
769                    sink(*ino, *entry_type, state.name(name.clone()))
770                }
771                Entry::DotDot => sink(0, fio::DirentType::Directory, b".."),
772                Entry::None => break,
773            } {
774                state.pending_entry = entry;
775                return Ok(Some(b));
776            }
777        }
778        Ok(None)
779    }
780
781    /// Wraps fuchsia.io/Node's Sync.
782    pub fn sync(&self) -> Result<(), zx::Status> {
783        self.proxy
784            .sync(zx::MonotonicInstant::INFINITE)
785            .map_err(|_| zx::Status::IO)?
786            .map_err(zx::Status::from_raw)
787    }
788
789    /// Clones (in the fuchsia.unknown.Clonable sense) the underlying proxy.
790    pub fn clone_proxy(&self) -> Result<fio::DirectorySynchronousProxy, zx::Status> {
791        let (client_end, server_end) = zx::Channel::create();
792        self.proxy.clone(server_end.into()).map_err(|_| zx::Status::IO)?;
793        Ok(client_end.into())
794    }
795}
796
797#[cfg(test)]
798mod tests {
799    use super::*;
800    use fuchsia_async as fasync;
801    use futures::StreamExt;
802    use std::sync::Arc;
803    use std::sync::atomic::{AtomicU64, Ordering};
804
805    #[fuchsia::test]
806    async fn test_read_chunking() {
807        let (client, mut stream) = fidl::endpoints::create_request_stream::<fio::FileMarker>();
808        let content = vec![0xAB; (fio::MAX_TRANSFER_SIZE + 100) as usize];
809        let content_clone = content.clone();
810
811        let _server_task = fasync::Task::spawn(async move {
812            while let Some(Ok(request)) = stream.next().await {
813                match request {
814                    fio::FileRequest::ReadAt { count, offset, responder } => {
815                        let start = offset as usize;
816                        let end = std::cmp::min(start + count as usize, content_clone.len());
817                        let data = if start < content_clone.len() {
818                            &content_clone[start..end]
819                        } else {
820                            &[]
821                        };
822                        responder.send(Ok(data)).unwrap();
823                    }
824                    _ => panic!("Unexpected request: {:?}", request),
825                }
826            }
827        });
828
829        let io = RemoteIo::new(client.into_channel().into());
830        fasync::unblock(move || {
831            let mut data = Vec::new();
832            let actual = io
833                .read(
834                    0,
835                    content.len(),
836                    |chunk| -> Result<usize, zx::Status> {
837                        let len = chunk.len();
838                        data.extend(chunk);
839                        Ok(len)
840                    },
841                    |status| status,
842                )
843                .unwrap();
844            assert_eq!(actual, content.len());
845            assert_eq!(data, content);
846        })
847        .await;
848    }
849
850    #[fuchsia::test]
851    async fn test_read_error_after_data() {
852        let (client, mut stream) = fidl::endpoints::create_request_stream::<fio::FileMarker>();
853        let chunk_size = 100;
854        let content = vec![0xAA; chunk_size];
855        let content_clone = content.clone();
856
857        let _server_task = fasync::Task::spawn(async move {
858            let mut request_count = 0;
859            while let Some(Ok(request)) = stream.next().await {
860                match request {
861                    fio::FileRequest::ReadAt { count: _, offset: _, responder } => {
862                        request_count += 1;
863                        if request_count == 1 {
864                            responder.send(Ok(&content_clone)).unwrap();
865                        } else {
866                            responder.send(Err(zx::sys::ZX_ERR_IO)).unwrap();
867                        }
868                    }
869                    _ => panic!("Unexpected request: {:?}", request),
870                }
871            }
872        });
873
874        let io = RemoteIo::new(client.into_channel().into());
875        fasync::unblock(move || {
876            let mut data = Vec::new();
877            // Ask for more than chunk_size to ensure a second request is made.
878            let actual = io
879                .read(
880                    0,
881                    chunk_size * 2,
882                    |chunk| -> Result<usize, zx::Status> {
883                        let len = chunk.len();
884                        data.extend(chunk);
885                        Ok(len)
886                    },
887                    |status| status,
888                )
889                .expect("read should succeed even if later chunks fail");
890            assert_eq!(actual, chunk_size);
891            assert_eq!(data.len(), chunk_size);
892            assert_eq!(data, content);
893        })
894        .await;
895    }
896
897    #[fuchsia::test]
898    async fn test_write_chunking() {
899        let (client, mut stream) = fidl::endpoints::create_request_stream::<fio::FileMarker>();
900        let content = vec![0xCD; (fio::MAX_TRANSFER_SIZE + 100) as usize];
901        let content2 = content.clone();
902
903        let server_task = fasync::Task::spawn(async move {
904            let mut written = vec![0; content2.len()];
905            while let Some(Ok(request)) = stream.next().await {
906                match request {
907                    fio::FileRequest::WriteAt { offset, data, responder, .. } => {
908                        let offset = offset as usize;
909                        written[offset..offset + data.len()].copy_from_slice(&data);
910                        responder.send(Ok(data.len() as u64)).unwrap();
911                    }
912                    _ => panic!("Unexpected request: {:?}", request),
913                }
914            }
915            assert_eq!(written, content2);
916        });
917
918        let io = RemoteIo::new(client.into_channel().into());
919        fasync::unblock(move || {
920            let written = io.write(0, &content).expect("write failed");
921            assert_eq!(written, content.len());
922        })
923        .await;
924
925        server_task.await;
926    }
927
928    #[fuchsia::test]
929    async fn test_write_error_after_data() {
930        let (client, mut stream) = fidl::endpoints::create_request_stream::<fio::FileMarker>();
931        let chunk_size = fio::MAX_TRANSFER_SIZE as usize;
932        let content = vec![0xEE; chunk_size * 2];
933
934        let _server_task = fasync::Task::spawn(async move {
935            let mut request_count = 0;
936            while let Some(Ok(request)) = stream.next().await {
937                match request {
938                    fio::FileRequest::WriteAt { offset: _, data, responder, .. } => {
939                        request_count += 1;
940                        if request_count == 1 {
941                            responder.send(Ok(data.len() as u64)).unwrap();
942                        } else {
943                            responder.send(Err(zx::sys::ZX_ERR_IO)).unwrap();
944                        }
945                    }
946                    _ => panic!("Unexpected request: {:?}", request),
947                }
948            }
949        });
950
951        let io = RemoteIo::new(client.into_channel().into());
952        fasync::unblock(move || {
953            let written = io.write(0, &content).expect("write should succeed partial");
954            assert_eq!(written, chunk_size);
955        })
956        .await;
957    }
958
959    #[fuchsia::test]
960    async fn test_large_directory() {
961        let (client, mut stream) = fidl::endpoints::create_request_stream::<fio::DirectoryMarker>();
962        let num_entries = 2000;
963
964        let task = fasync::Task::spawn(async move {
965            let mut sent_count = 0;
966            let mut num_requests = 0;
967            while let Some(Ok(request)) = stream.next().await {
968                match request {
969                    fio::DirectoryRequest::ReadDirents { max_bytes, responder } => {
970                        num_requests += 1;
971                        let mut buffer = Vec::new();
972                        while sent_count < num_entries {
973                            let name = if sent_count == 0 {
974                                ".".to_string()
975                            } else {
976                                format!("file_{}", sent_count - 1)
977                            };
978                            let name_bytes = name.as_bytes();
979                            let entry_size = 10 + name_bytes.len();
980                            if buffer.len() + entry_size > max_bytes as usize {
981                                break;
982                            }
983                            buffer.extend_from_slice(&(sent_count as u64 + 1).to_le_bytes());
984                            buffer.push(name_bytes.len() as u8);
985                            let entry_type = if sent_count == 0 {
986                                fio::DirentType::Directory
987                            } else {
988                                fio::DirentType::File
989                            };
990                            buffer.push(entry_type.into_primitive());
991                            buffer.extend_from_slice(name_bytes);
992                            sent_count += 1;
993                        }
994                        let _ = responder.send(0, &buffer);
995                    }
996                    fio::DirectoryRequest::Rewind { responder } => {
997                        sent_count = 0;
998                        let _ = responder.send(0);
999                    }
1000                    fio::DirectoryRequest::Close { responder } => {
1001                        let _ = responder.send(Ok(()));
1002                    }
1003                    _ => {}
1004                }
1005            }
1006            assert!(num_requests > 0);
1007        });
1008
1009        let dir = RemoteDirectory::new(client.into_channel().into());
1010        let count = Arc::new(AtomicU64::new(0));
1011        let count2 = count.clone();
1012        fasync::unblock(move || {
1013            dir.readdir::<(), _>(|_ino, _type, _name| {
1014                count.fetch_add(1, Ordering::Relaxed);
1015                ControlFlow::Continue(())
1016            })
1017            .unwrap();
1018        })
1019        .await;
1020        // Expect num_entries + 1 (for synthesized "..")
1021        assert_eq!(count2.load(Ordering::Relaxed), num_entries + 1);
1022        task.await;
1023    }
1024
1025    #[fuchsia::test]
1026    async fn test_seek_backwards() {
1027        let (client, mut stream) = fidl::endpoints::create_request_stream::<fio::DirectoryMarker>();
1028        let _server_task = fasync::Task::spawn(async move {
1029            let entries = vec![
1030                (1, fio::DirentType::Directory, "."),
1031                (2, fio::DirentType::File, "file_0"),
1032                (3, fio::DirentType::File, "file_1"),
1033                (4, fio::DirentType::File, "file_2"),
1034            ];
1035            let mut current_entry = 0;
1036
1037            while let Some(Ok(request)) = stream.next().await {
1038                match request {
1039                    fio::DirectoryRequest::ReadDirents { max_bytes, responder } => {
1040                        let mut buffer = Vec::new();
1041                        while current_entry < entries.len() {
1042                            let (ino, type_, name) = entries[current_entry];
1043                            let name_bytes = name.as_bytes();
1044                            let entry_size = 10 + name_bytes.len();
1045                            if buffer.len() + entry_size > max_bytes as usize {
1046                                break;
1047                            }
1048                            buffer.extend_from_slice(&(ino as u64).to_le_bytes());
1049                            buffer.push(name_bytes.len() as u8);
1050                            buffer.push(type_.into_primitive());
1051                            buffer.extend_from_slice(name_bytes);
1052                            current_entry += 1;
1053                        }
1054                        responder.send(0, &buffer).unwrap();
1055                    }
1056                    fio::DirectoryRequest::Rewind { responder } => {
1057                        current_entry = 0;
1058                        responder.send(0).unwrap();
1059                    }
1060                    _ => {}
1061                }
1062            }
1063        });
1064
1065        let dir = RemoteDirectory::new(client.into_channel().into());
1066        fasync::unblock(move || {
1067            let mut names = Vec::new();
1068            // Read 3 entries: ".", "..", "file_0".
1069            dir.readdir::<(), _>(|_ino, _type, name| {
1070                names.push(name.to_vec());
1071                if names.len() == 3 { ControlFlow::Break(()) } else { ControlFlow::Continue(()) }
1072            })
1073            .unwrap();
1074
1075            assert_eq!(names[0], b".");
1076            assert_eq!(names[1], b"..");
1077            assert_eq!(names[2], b"file_0");
1078
1079            // Seek to 1. This triggers rewind() internally because 1 < current_index (3).
1080            // Index 1 corresponds to "..".
1081            dir.seek(1).unwrap();
1082
1083            let mut names_after_seek = Vec::new();
1084            // Read 2 entries: "..", "file_0".
1085            dir.readdir::<(), _>(|_ino, _type, name| {
1086                names_after_seek.push(name.to_vec());
1087                if names_after_seek.len() == 2 {
1088                    ControlFlow::Break(())
1089                } else {
1090                    ControlFlow::Continue(())
1091                }
1092            })
1093            .unwrap();
1094
1095            assert_eq!(names_after_seek[0], b"..");
1096            assert_eq!(names_after_seek[1], b"file_0");
1097        })
1098        .await;
1099    }
1100}