Skip to main content

sync_io_client/
lib.rs

1// Copyright 2026 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! This library provides a synchronous wrapper around fuchsia.io.  It is usually better to use the
6//! Rust standard library for this (which will use fdio and zxio).  The primary user of this library
7//! is Starnix which uses this for performance and memory reasons.
8
9use fidl::endpoints::SynchronousProxy;
10use fidl_fuchsia_io as fio;
11use fuchsia_sync::Mutex;
12use smallvec::SmallVec;
13use std::ops::{ControlFlow, Range};
14use syncio::{AllocateMode, zxio_fsverity_descriptor_t, zxio_node_attributes_t};
15use zerocopy::FromBytes;
16
17/// Converts FIDL attributes to `zxio_node_attributes_t`.
18///
19/// NOTE: This does not work for _all_ attributes e.g. fsverity options and root hash.
20fn zxio_attr_from_fidl(
21    mutable: &fio::MutableNodeAttributes,
22    immutable: &fio::ImmutableNodeAttributes,
23) -> zxio_node_attributes_t {
24    let mut out_attr = zxio_node_attributes_t::default();
25    if let Some(protocols) = immutable.protocols {
26        out_attr.protocols = protocols.bits();
27        out_attr.has.protocols = true;
28    }
29    if let Some(abilities) = immutable.abilities {
30        out_attr.abilities = abilities.bits();
31        out_attr.has.abilities = true;
32    }
33    if let Some(id) = immutable.id {
34        out_attr.id = id;
35        out_attr.has.id = true;
36    }
37    if let Some(content_size) = immutable.content_size {
38        out_attr.content_size = content_size;
39        out_attr.has.content_size = true;
40    }
41    if let Some(storage_size) = immutable.storage_size {
42        out_attr.storage_size = storage_size;
43        out_attr.has.storage_size = true;
44    }
45    if let Some(link_count) = immutable.link_count {
46        out_attr.link_count = link_count;
47        out_attr.has.link_count = true;
48    }
49    if let Some(creation_time) = mutable.creation_time {
50        out_attr.creation_time = creation_time;
51        out_attr.has.creation_time = true;
52    }
53    if let Some(modification_time) = mutable.modification_time {
54        out_attr.modification_time = modification_time;
55        out_attr.has.modification_time = true;
56    }
57    if let Some(access_time) = mutable.access_time {
58        out_attr.access_time = access_time;
59        out_attr.has.access_time = true;
60    }
61    if let Some(mode) = mutable.mode {
62        out_attr.mode = mode;
63        out_attr.has.mode = true;
64    }
65    if let Some(uid) = mutable.uid {
66        out_attr.uid = uid;
67        out_attr.has.uid = true;
68    }
69    if let Some(gid) = mutable.gid {
70        out_attr.gid = gid;
71        out_attr.has.gid = true;
72    }
73    if let Some(rdev) = mutable.rdev {
74        out_attr.rdev = rdev;
75        out_attr.has.rdev = true;
76    }
77    if let Some(change_time) = immutable.change_time {
78        out_attr.change_time = change_time;
79        out_attr.has.change_time = true;
80    }
81    if let Some(casefold) = mutable.casefold {
82        out_attr.casefold = casefold;
83        out_attr.has.casefold = true;
84    }
85    if let Some(verity_enabled) = immutable.verity_enabled {
86        out_attr.fsverity_enabled = verity_enabled;
87        out_attr.has.fsverity_enabled = true;
88    }
89    if let Some(wrapping_key_id) = mutable.wrapping_key_id {
90        out_attr.wrapping_key_id = wrapping_key_id;
91        out_attr.has.wrapping_key_id = true;
92    }
93    out_attr
94}
95
96/// This trait is used so that clients can create their own wrappers around types exposed here.
97pub trait Factory {
98    type Result;
99
100    fn create_node(self, io: RemoteIo, info: fio::NodeInfo) -> Self::Result;
101    fn create_directory(self, io: RemoteIo, info: fio::DirectoryInfo) -> Self::Result;
102    fn create_file(self, io: RemoteIo, info: fio::FileInfo) -> Self::Result;
103    fn create_symlink(self, io: RemoteIo, info: fio::SymlinkInfo) -> Self::Result;
104}
105
106/// Waits for the fuchsia.io `OnRepresentation` event and then uses the factory to create an an
107/// appropriate object.  This returns attributes (as requested by the corresponding `open` call)
108/// that are present in the `OnRepresentation` event.
109///
110/// NOTE: The attributes returned are not comprehensive.  Check `zxio_attr_from_fidl` above for
111/// supported attributes.
112pub fn create_with_on_representation<F: Factory>(
113    proxy: fio::NodeSynchronousProxy,
114    factory: F,
115) -> Result<F::Result, zx::Status> {
116    match proxy.wait_for_event(zx::MonotonicInstant::INFINITE) {
117        Ok(fio::NodeEvent::OnRepresentation { payload }) => match payload {
118            fio::Representation::Node(info) => Ok(factory.create_node(RemoteIo::new(proxy), info)),
119            fio::Representation::Directory(info) => {
120                Ok(factory.create_directory(RemoteIo::new(proxy), info))
121            }
122            fio::Representation::File(mut info) => {
123                let io = RemoteIo {
124                    proxy,
125                    stream: info
126                        .stream
127                        .take()
128                        .map(zx::Stream::from)
129                        .unwrap_or_else(|| zx::NullableHandle::invalid().into()),
130                };
131                Ok(factory.create_file(io, info))
132            }
133            fio::Representation::Symlink(info) => {
134                Ok(factory.create_symlink(RemoteIo::new(proxy), info))
135            }
136            _ => Err(zx::Status::NOT_SUPPORTED),
137        },
138        Err(fidl::Error::ClientChannelClosed { status, .. }) => Err(status),
139        _ => Err(zx::Status::IO),
140    }
141}
142
143/// Wraps a proxy and optional stream and provides wrappers around most fuchsia.io methods.
144///
145/// NOTE: The caller must take care to call appropriate methods for the underlying type.  Calling
146/// the wrong methods (e.g. calling file methods on a directory) will result in the connection being
147/// closed.
148pub struct RemoteIo {
149    proxy: fio::NodeSynchronousProxy,
150    // NOTE: This can be invalid if the remote end did not return a stream in which case
151    // file I/O will use FIDL (slow).
152    stream: zx::Stream,
153}
154
155impl RemoteIo {
156    pub fn new(proxy: fio::NodeSynchronousProxy) -> Self {
157        Self { proxy, stream: zx::NullableHandle::invalid().into() }
158    }
159
160    pub fn with_stream(proxy: fio::NodeSynchronousProxy, stream: zx::Stream) -> Self {
161        Self { proxy, stream }
162    }
163
164    pub fn into_proxy(self) -> fio::NodeSynchronousProxy {
165        self.proxy
166    }
167
168    fn cast_proxy<T: From<zx::Channel> + Into<zx::NullableHandle>>(&self) -> zx::Unowned<'_, T> {
169        zx::Unowned::new(self.proxy.as_channel())
170    }
171
172    /// Returns attributes in fuchsia.io's FIDL representation.
173    pub fn attr_get(
174        &self,
175        query: fio::NodeAttributesQuery,
176    ) -> Result<(fio::MutableNodeAttributes, fio::ImmutableNodeAttributes), zx::Status> {
177        self.proxy
178            .get_attributes(query, zx::MonotonicInstant::INFINITE)
179            .map_err(|_| zx::Status::IO)?
180            .map_err(zx::Status::from_raw)
181    }
182
183    /// Returns attributes mapped to `zxio_node_attributes_t`
184    ///
185    /// NOTE: Not all attributes are supported.  See `zxio_attr_from_fidl` above for supported
186    /// attributes.
187    pub fn attr_get_zxio(
188        &self,
189        query: fio::NodeAttributesQuery,
190    ) -> Result<zxio_node_attributes_t, zx::Status> {
191        self.attr_get(query).map(|(m, i)| zxio_attr_from_fidl(&m, &i))
192    }
193
194    /// Sets attributes.
195    pub fn attr_set(&self, attributes: fio::MutableNodeAttributes) -> Result<(), zx::Status> {
196        self.proxy
197            .update_attributes(&attributes, zx::MonotonicInstant::INFINITE)
198            .map_err(|_| zx::Status::IO)?
199            .map_err(zx::Status::from_raw)
200    }
201
202    /// Wraps fuchsia.io/Directory's Open.
203    pub fn open<F: Factory>(
204        &self,
205        path: &str,
206        flags: fio::Flags,
207        create_attributes: Option<fio::MutableNodeAttributes>,
208        query: fio::NodeAttributesQuery,
209        factory: F,
210    ) -> Result<F::Result, zx::Status> {
211        let (client_end, server_end) = zx::Channel::create();
212        let dir_proxy = self.cast_proxy::<fio::DirectorySynchronousProxy>();
213        dir_proxy
214            .open(
215                path,
216                flags | fio::Flags::FLAG_SEND_REPRESENTATION,
217                &fio::Options {
218                    attributes: (!query.is_empty()).then_some(query),
219                    create_attributes,
220                    ..Default::default()
221                },
222                server_end,
223            )
224            .map_err(|_| zx::Status::IO)?;
225        create_with_on_representation(client_end.into(), factory)
226    }
227
228    /// Opens all nodes iteratively along the relative sub-paths given in `paths`.
229    /// Each item in `paths` is opened from the node returned by opening the previous item in
230    /// `paths`.
231    ///
232    /// `factory_fn` is used to create the `factory` argument to `create_with_on_representation`.
233    ///
234    /// The return vector can be smaller than the initial `paths` vector, and execution will
235    /// always stop with the first error appending an `Err(status)` into the results.
236    ///
237    /// NOTE: To prevent opening and writing through non-directory nodes, this function adds
238    /// `fio::Flags::PROTOCOL_DIRECTORY | fio::Flags::PROTOCOL_SYMLINK` to intermediate components.
239    pub fn open_pipelined<F: Factory>(
240        &self,
241        paths: &[&str],
242        flags: fio::Flags,
243        query: fio::NodeAttributesQuery,
244        mut factory_fn: impl FnMut() -> F,
245    ) -> impl Iterator<Item = Result<F::Result, zx::Status>> {
246        let mut proxy_to_result = move |proxy: fio::NodeSynchronousProxy| {
247            create_with_on_representation(proxy, factory_fn())
248        };
249
250        let mut proxies = SmallVec::<[fio::NodeSynchronousProxy; 8]>::new();
251        let (client_end, server_end) = zx::Channel::create();
252        proxies.push(fio::NodeSynchronousProxy::new(client_end));
253        let mut next_server_end = Some(server_end);
254
255        for (i, path) in paths.iter().enumerate().rev() {
256            let server_end = next_server_end.take().unwrap();
257            let channel = if i == 0 {
258                self.proxy.as_channel()
259            } else {
260                let (client_end, server_end) = zx::Channel::create();
261                next_server_end = Some(server_end);
262                proxies.push(fio::NodeSynchronousProxy::new(client_end));
263                proxies.last().unwrap().as_channel()
264            };
265            let dir_proxy = zx::Unowned::<fio::DirectorySynchronousProxy>::new(channel);
266
267            let mut open_flags = flags | fio::Flags::FLAG_SEND_REPRESENTATION;
268            if i < paths.len() - 1 {
269                open_flags |= fio::Flags::PROTOCOL_DIRECTORY | fio::Flags::PROTOCOL_SYMLINK;
270            }
271
272            if let Err(_) = dir_proxy.open(
273                path,
274                open_flags,
275                &fio::Options {
276                    attributes: (!query.is_empty()).then_some(query),
277                    ..Default::default()
278                },
279                server_end,
280            ) {
281                // This should only be possible for the call from self. Other channels cannot be
282                // closed as they have not been sent yet.
283                debug_assert!(i == 0);
284                // Nothing to do. The first proxy just got disconnected. `proxy_to_result` will
285                // return an error.
286            }
287        }
288
289        let mut proxies = proxies.into_iter().rev();
290        std::iter::successors(proxies.next().map(&mut proxy_to_result), move |prev| match prev {
291            Err(_) => None,
292            Ok(_) => proxies.next().map(&mut proxy_to_result),
293        })
294    }
295
296    /// Returns `(data, eof)`, where `eof` is true if we encountered the end of the file.  If `eof`
297    /// is false, then it is still possible that a subsequent read would read no more i.e. the end
298    /// of the file _might_ have been reached.  This might return fewer bytes than `max`.
299    pub fn read_partial(&self, offset: u64, max: usize) -> Result<(Vec<u8>, bool), zx::Status> {
300        if self.stream.is_invalid() {
301            let file_proxy = self.cast_proxy::<fio::FileSynchronousProxy>();
302            let max = std::cmp::min(max as u64, fio::MAX_TRANSFER_SIZE);
303            let data = file_proxy
304                .read_at(max, offset, zx::MonotonicInstant::INFINITE)
305                .map_err(|_| zx::Status::IO)?
306                .map_err(zx::Status::from_raw)?;
307            let eof = (data.len() as u64) < max;
308            Ok((data, eof))
309        } else {
310            // Use an intermediate buffer.
311            let bytes =
312                self.stream.read_at_to_vec(zx::StreamReadOptions::empty(), offset as u64, max)?;
313            let eof = bytes.len() < max;
314            Ok((bytes, eof))
315        }
316    }
317
318    /// Attempts to read `len` bytes and will only return fewer if it encounters the end of the
319    /// file, or an error.  `callback` will be called for each chunk.  If any bytes are successfully
320    /// passed to `callback`, `read` will return the total number of bytes successfully written and
321    /// any error encountered will be discarded.
322    pub fn read<E>(
323        &self,
324        offset: u64,
325        len: usize,
326        mut callback: impl FnMut(Vec<u8>) -> Result<usize, E>,
327        map_err: impl FnOnce(zx::Status) -> E,
328    ) -> Result<usize, E> {
329        let mut total = 0;
330        while total < len {
331            match self.read_partial(offset + total as u64, len - total) {
332                Ok((data, eof)) => {
333                    if data.is_empty() {
334                        break;
335                    }
336                    let data_len = data.len();
337                    let written = callback(data)?;
338                    total += written;
339                    if eof || written < data_len {
340                        break;
341                    }
342                }
343                Err(e) => {
344                    if total > 0 {
345                        break;
346                    }
347                    return Err(map_err(e));
348                }
349            }
350        }
351        Ok(total)
352    }
353
354    /// Writes `data` at `offset`.
355    pub fn write(&self, offset: u64, data: &[u8]) -> Result<usize, zx::Status> {
356        let file_proxy = self.cast_proxy::<fio::FileSynchronousProxy>();
357        let mut total_written = 0;
358        for chunk in data.chunks(fio::MAX_TRANSFER_SIZE as usize) {
359            let result = file_proxy
360                .write_at(chunk, offset + total_written as u64, zx::MonotonicInstant::INFINITE)
361                .map_err(|_| zx::Status::IO)
362                .and_then(|res| res.map_err(zx::Status::from_raw));
363            match result {
364                Ok(actual) => {
365                    let actual = actual as usize;
366                    total_written += actual;
367                    if actual < chunk.len() {
368                        return Ok(total_written);
369                    }
370                }
371                Err(e) => {
372                    if total_written > 0 {
373                        break;
374                    }
375                    return Err(e);
376                }
377            }
378        }
379        Ok(total_written)
380    }
381
382    /// Returns true if vectored operations are supported.
383    pub fn supports_vectored(&self) -> bool {
384        // We only support readv and writev if we have a stream.
385        !self.stream.is_invalid()
386    }
387
388    /// Reads into `iovecs` using a vectored read.  This is only supported with a valid stream.  See
389    /// `supports_vectored` above.
390    ///
391    /// # Safety
392    ///
393    /// Same as `zx::Stream::readv`.
394    pub unsafe fn readv(
395        &self,
396        offset: u64,
397        iovecs: &mut [zx::sys::zx_iovec_t],
398    ) -> Result<usize, zx::Status> {
399        if self.stream.is_invalid() {
400            return Err(zx::Status::NOT_SUPPORTED);
401        }
402        // SAFETY: See `zx::Stream::readv`.
403        unsafe { self.stream.readv_at(zx::StreamReadOptions::empty(), offset as u64, iovecs) }
404    }
405
406    /// Writes from `iovecs` using vectored write.  This is only supported with a valid stream.  See
407    /// `supports_vectored` above.
408    pub fn writev(&self, offset: u64, iovecs: &[zx::sys::zx_iovec_t]) -> Result<usize, zx::Status> {
409        if self.stream.is_invalid() {
410            return Err(zx::Status::NOT_SUPPORTED);
411        }
412        self.stream.writev_at(zx::StreamWriteOptions::empty(), offset, &iovecs)
413    }
414
415    /// Wraps fuchsia.io/File's Truncate.
416    pub fn truncate(&self, length: u64) -> Result<(), zx::Status> {
417        self.cast_proxy::<fio::FileSynchronousProxy>()
418            .resize(length, zx::MonotonicInstant::INFINITE)
419            .map_err(|_| zx::Status::IO)?
420            .map_err(zx::Status::from_raw)
421    }
422
423    /// Returns a VMO backing the file.
424    pub fn vmo_get(&self, flags: zx::VmarFlags) -> Result<zx::Vmo, zx::Status> {
425        let mut fio_flags = fio::VmoFlags::empty();
426        if flags.contains(zx::VmarFlags::PERM_READ) {
427            fio_flags |= fio::VmoFlags::READ;
428        }
429        if flags.contains(zx::VmarFlags::PERM_WRITE) {
430            fio_flags |= fio::VmoFlags::WRITE;
431        }
432        if flags.contains(zx::VmarFlags::PERM_EXECUTE) {
433            fio_flags |= fio::VmoFlags::EXECUTE;
434        }
435        let file_proxy = self.cast_proxy::<fio::FileSynchronousProxy>();
436        let vmo = file_proxy
437            .get_backing_memory(fio_flags, zx::MonotonicInstant::INFINITE)
438            .map_err(|_| zx::Status::IO)?
439            .map_err(zx::Status::from_raw)?;
440        Ok(vmo)
441    }
442
443    /// Wraps fuchsia.io/Node's Sync.
444    pub fn sync(&self) -> Result<(), zx::Status> {
445        self.proxy
446            .sync(zx::MonotonicInstant::INFINITE)
447            .map_err(|_| zx::Status::IO)?
448            .map_err(zx::Status::from_raw)
449    }
450
451    /// Closes and updates access time asynchronously.
452    pub fn close_and_update_access_time(self) {
453        let _ = self.proxy.get_attributes(
454            fio::NodeAttributesQuery::PENDING_ACCESS_TIME_UPDATE,
455            zx::MonotonicInstant::INFINITE_PAST,
456        );
457    }
458
459    /// Clones (in the fuchsia.unknown.Clonable sense) the underlying proxy.
460    pub fn clone_proxy(&self) -> Result<fio::NodeSynchronousProxy, zx::Status> {
461        let (client_end, server_end) = zx::Channel::create();
462        self.proxy.clone(server_end.into()).map_err(|_| zx::Status::IO)?;
463        Ok(client_end.into())
464    }
465
466    /// Wraps fuchsia.io/Node's LinkInto.
467    pub fn link_into(&self, target_dir: &Self, name: &str) -> Result<(), zx::Status> {
468        let target_dir_proxy = target_dir.cast_proxy::<fio::DirectorySynchronousProxy>();
469        let (status, token) = target_dir_proxy
470            .get_token(zx::MonotonicInstant::INFINITE)
471            .map_err(|_| zx::Status::IO)?;
472        zx::Status::ok(status)?;
473        let token = token.ok_or(zx::Status::NOT_SUPPORTED)?;
474
475        // Linkable::LinkInto is a separate protocol.
476        let linkable_proxy = self.cast_proxy::<fio::LinkableSynchronousProxy>();
477
478        linkable_proxy
479            .link_into(token.into(), name, zx::MonotonicInstant::INFINITE)
480            .map_err(|_| zx::Status::IO)?
481            .map_err(zx::Status::from_raw)
482    }
483
484    /// Wraps fuchsia.io/Directory's Unlink.
485    pub fn unlink(&self, name: &str, flags: fio::UnlinkFlags) -> Result<(), zx::Status> {
486        let options = fio::UnlinkOptions { flags: Some(flags), ..Default::default() };
487        let dir_proxy = self.cast_proxy::<fio::DirectorySynchronousProxy>();
488        dir_proxy
489            .unlink(name, &options, zx::MonotonicInstant::INFINITE)
490            .map_err(|_| zx::Status::IO)?
491            .map_err(zx::Status::from_raw)
492    }
493
494    /// Wraps fuchsia.io/Directory's Rename.
495    pub fn rename(
496        &self,
497        old_path: &str,
498        new_directory: &Self,
499        new_path: &str,
500    ) -> Result<(), zx::Status> {
501        let new_dir_proxy = new_directory.cast_proxy::<fio::DirectorySynchronousProxy>();
502        let (status, token) =
503            new_dir_proxy.get_token(zx::MonotonicInstant::INFINITE).map_err(|_| zx::Status::IO)?;
504        zx::Status::ok(status)?;
505        let token = token.ok_or(zx::Status::NOT_SUPPORTED)?;
506        let dir_proxy = self.cast_proxy::<fio::DirectorySynchronousProxy>();
507        dir_proxy
508            .rename(old_path, token.into(), new_path, zx::MonotonicInstant::INFINITE)
509            .map_err(|_| zx::Status::IO)?
510            .map_err(zx::Status::from_raw)
511    }
512
513    /// Wraps fuchsia.io/Directory's CreateSymlink.
514    pub fn create_symlink(&self, name: &str, target: &[u8]) -> Result<RemoteIo, zx::Status> {
515        let dir_proxy = self.cast_proxy::<fio::DirectorySynchronousProxy>();
516        let (client_end, server_end) = zx::Channel::create();
517        dir_proxy
518            .create_symlink(name, target, Some(server_end.into()), zx::MonotonicInstant::INFINITE)
519            .map_err(|_| zx::Status::IO)?
520            .map_err(zx::Status::from_raw)?;
521        Ok(RemoteIo::new(client_end.into()))
522    }
523
524    /// Wraps fuchsia.io/File's EnableVerity.
525    pub fn enable_verity(&self, descriptor: &zxio_fsverity_descriptor_t) -> Result<(), zx::Status> {
526        let file_proxy = self.cast_proxy::<fio::FileSynchronousProxy>();
527        let options = fio::VerificationOptions {
528            hash_algorithm: Some(match descriptor.hash_algorithm {
529                1 => fio::HashAlgorithm::Sha256,
530                2 => fio::HashAlgorithm::Sha512,
531                _ => return Err(zx::Status::INVALID_ARGS),
532            }),
533            salt: Some(descriptor.salt[..descriptor.salt_size as usize].to_vec()),
534            ..Default::default()
535        };
536        file_proxy
537            .enable_verity(&options, zx::MonotonicInstant::INFINITE)
538            .map_err(|_| zx::Status::IO)?
539            .map_err(zx::Status::from_raw)
540    }
541
542    /// Wraps fuchsia.io/File's Allocate.
543    pub fn allocate(&self, offset: u64, len: u64, mode: AllocateMode) -> Result<(), zx::Status> {
544        let file_proxy = self.cast_proxy::<fio::FileSynchronousProxy>();
545        let mut fio_mode = fio::AllocateMode::empty();
546        if mode.contains(AllocateMode::KEEP_SIZE) {
547            fio_mode |= fio::AllocateMode::KEEP_SIZE;
548        }
549        if mode.contains(AllocateMode::UNSHARE_RANGE) {
550            fio_mode |= fio::AllocateMode::UNSHARE_RANGE;
551        }
552        if mode.contains(AllocateMode::PUNCH_HOLE) {
553            fio_mode |= fio::AllocateMode::PUNCH_HOLE;
554        }
555        if mode.contains(AllocateMode::COLLAPSE_RANGE) {
556            fio_mode |= fio::AllocateMode::COLLAPSE_RANGE;
557        }
558        if mode.contains(AllocateMode::ZERO_RANGE) {
559            fio_mode |= fio::AllocateMode::ZERO_RANGE;
560        }
561        if mode.contains(AllocateMode::INSERT_RANGE) {
562            fio_mode |= fio::AllocateMode::INSERT_RANGE;
563        }
564        file_proxy
565            .allocate(offset, len, fio_mode, zx::MonotonicInstant::INFINITE)
566            .map_err(|_| zx::Status::IO)?
567            .map_err(zx::Status::from_raw)
568    }
569
570    /// Wraps fuchsia.io/Node's GetExtendedAttribute.
571    pub fn xattr_get(&self, name: &[u8]) -> Result<Vec<u8>, zx::Status> {
572        let name_str = std::str::from_utf8(name.as_ref()).map_err(|_| zx::Status::INVALID_ARGS)?;
573        let result = self
574            .proxy
575            .get_extended_attribute(name_str.as_bytes(), zx::MonotonicInstant::INFINITE)
576            .map_err(|_| zx::Status::IO)?
577            .map_err(zx::Status::from_raw)?;
578        match result {
579            fio::ExtendedAttributeValue::Bytes(bytes) => Ok(bytes),
580            fio::ExtendedAttributeValue::Buffer(vmo) => {
581                let size = vmo.get_content_size().map_err(|_| zx::Status::IO)?;
582                let mut bytes = vec![0u8; size as usize];
583                vmo.read(&mut bytes, 0).map_err(|_| zx::Status::IO)?;
584                Ok(bytes)
585            }
586            _ => Err(zx::Status::NOT_SUPPORTED),
587        }
588    }
589
590    /// Wraps fuchsia.io/Node's SetExtendedAttribute.
591    pub fn xattr_set(
592        &self,
593        name: &[u8],
594        value: &[u8],
595        mode: syncio::XattrSetMode,
596    ) -> Result<(), zx::Status> {
597        let val = fio::ExtendedAttributeValue::Bytes(value.to_vec());
598        let fidl_mode = match mode {
599            syncio::XattrSetMode::Set => fio::SetExtendedAttributeMode::Set,
600            syncio::XattrSetMode::Create => fio::SetExtendedAttributeMode::Create,
601            syncio::XattrSetMode::Replace => fio::SetExtendedAttributeMode::Replace,
602        };
603        self.proxy
604            .set_extended_attribute(name, val, fidl_mode, zx::MonotonicInstant::INFINITE)
605            .map_err(|_| zx::Status::IO)?
606            .map_err(zx::Status::from_raw)
607    }
608
609    /// Wraps fuchsia.io/Node's RenoveExtendedAttribute.
610    pub fn xattr_remove(&self, name: &[u8]) -> Result<(), zx::Status> {
611        let name_str = std::str::from_utf8(name.as_ref()).map_err(|_| zx::Status::INVALID_ARGS)?;
612        self.proxy
613            .remove_extended_attribute(name_str.as_bytes(), zx::MonotonicInstant::INFINITE)
614            .map_err(|_| zx::Status::IO)?
615            .map_err(zx::Status::from_raw)
616    }
617
618    /// Wraps fuchsia.io/Node's ListExtendedAttributes.
619    pub fn xattr_list(&self) -> Result<Vec<Vec<u8>>, zx::Status> {
620        let (client_end, server_end) = zx::Channel::create();
621        self.proxy.list_extended_attributes(server_end.into()).map_err(|_| zx::Status::IO)?;
622        let iterator = fio::ExtendedAttributeIteratorSynchronousProxy::new(client_end);
623        let mut all_attrs = vec![];
624        loop {
625            let (attributes, last) = iterator
626                .get_next(zx::MonotonicInstant::INFINITE)
627                .map_err(|_| zx::Status::IO)?
628                .map_err(zx::Status::from_raw)?;
629            all_attrs.extend(attributes);
630            if last {
631                break;
632            }
633        }
634        Ok(all_attrs)
635    }
636}
637
638/// RemoteDirectory supports iteration of directories and things that you can do to directories via
639/// a file descriptor.  Other opterations, such as creating children, can be done via `RemoteIo`.
640/// Iteration is not safe to be done concurrently because there is a seek pointer; `readdir` will
641/// resume from the seek position.
642pub struct RemoteDirectory {
643    proxy: fio::DirectorySynchronousProxy,
644    state: Mutex<State>,
645}
646
647#[derive(Default)]
648struct State {
649    /// Buffer contains the last batch of entries read from the remote end.
650    buffer: Vec<u8>,
651
652    /// Position in the buffer for the next entry.
653    offset: usize,
654
655    /// If the last attempt to write to the sink failed, this contains the entry that is pending to
656    /// be added. This is also used to synthesize dot-dot.
657    pending_entry: Entry,
658
659    /// The current iterator position in the directory.
660    current_index: u64,
661}
662
663impl State {
664    fn name(&self, range: Range<usize>) -> &[u8] {
665        &self.buffer[range]
666    }
667
668    /// Returns the next dir entry. If no more entries are found, returns None.  Returns an error if
669    /// the iterator fails.
670    fn next(&mut self, proxy: &fio::DirectorySynchronousProxy) -> Result<Entry, zx::Status> {
671        let mut next_dirent = || -> Result<Entry, zx::Status> {
672            if self.offset >= self.buffer.len() {
673                match proxy.read_dirents(fio::MAX_BUF, zx::MonotonicInstant::INFINITE) {
674                    Ok((status, dirents)) => {
675                        zx::Status::ok(status)?;
676                        if dirents.is_empty() {
677                            return Ok(Entry::None);
678                        }
679                        self.buffer = dirents;
680                        self.offset = 0;
681                    }
682                    Err(_) => return Err(zx::Status::IO),
683                }
684            }
685
686            #[repr(C, packed)]
687            #[derive(FromBytes)]
688            struct DirectoryEntry {
689                ino: u64,
690                name_len: u8,
691                entry_type: u8,
692            }
693
694            let Some((ino, name, entry_type)) =
695                DirectoryEntry::read_from_prefix(&self.buffer[self.offset..]).ok().and_then(
696                    |(DirectoryEntry { ino, name_len, entry_type }, remainder)| {
697                        let name_len = name_len as usize;
698                        let name_start = self.offset + std::mem::size_of::<DirectoryEntry>();
699                        (remainder.len() >= name_len).then_some((
700                            ino,
701                            name_start..name_start + name_len,
702                            entry_type,
703                        ))
704                    },
705                )
706            else {
707                // Truncated entry.
708                return Ok(Entry::None);
709            };
710
711            self.offset = name.end;
712
713            Ok(Entry::Some {
714                ino,
715                entry_type: fio::DirentType::from_primitive(entry_type).ok_or(zx::Status::IO)?,
716                name,
717            })
718        };
719
720        let mut next = self.pending_entry.take();
721        if let Entry::None = next {
722            next = next_dirent()?;
723        }
724        // We only want to synthesize .. if . exists because the . and .. entries get removed if the
725        // directory is unlinked, so if the remote filesystem has removed ., we know to omit the
726        // .. entry.
727        match &next {
728            Entry::Some { name, .. } if self.name(name.clone()) == b"." => {
729                self.pending_entry = Entry::DotDot;
730            }
731            _ => {}
732        }
733        self.current_index += 1;
734        Ok(next)
735    }
736
737    fn rewind(&mut self, proxy: &fio::DirectorySynchronousProxy) -> Result<(), zx::Status> {
738        self.pending_entry = Entry::None;
739        let status = proxy.rewind(zx::MonotonicInstant::INFINITE).map_err(|_| zx::Status::IO)?;
740        zx::Status::ok(status)?;
741        self.buffer.clear();
742        self.offset = 0;
743        self.current_index = 0;
744        Ok(())
745    }
746}
747
748#[derive(Default)]
749enum Entry {
750    // Indicates no more entries.
751    #[default]
752    None,
753
754    Some {
755        ino: u64,
756        entry_type: fio::DirentType,
757        name: Range<usize>,
758    },
759
760    // Indicates dot-dot should be synthesized.
761    DotDot,
762}
763
764impl Entry {
765    fn take(&mut self) -> Entry {
766        std::mem::replace(self, Entry::None)
767    }
768}
769
770impl RemoteDirectory {
771    pub fn new(proxy: fio::DirectorySynchronousProxy) -> Self {
772        Self { proxy, state: Mutex::default() }
773    }
774
775    /// Seeks to `new_index` in the directory.
776    pub fn seek(&self, new_index: u64) -> Result<u64, zx::Status> {
777        let mut state = self.state.lock();
778
779        if new_index < state.current_index {
780            // Our iterator only goes forward, so reset it here.  Note: we *must* rewind it rather
781            // than just create a new iterator because the remote end maintains the offset.
782            state.rewind(&self.proxy)?;
783            state.current_index = 0;
784        }
785
786        // Advance the iterator to catch up with the offset.
787        for i in state.current_index..new_index {
788            match state.next(&self.proxy) {
789                Ok(Entry::Some { .. } | Entry::DotDot) => {}
790                Ok(Entry::None) => break, // No more entries.
791                Err(_) => {
792                    // In order to keep the offset and the iterator in sync, set the new offset
793                    // to be as far as we could get.
794                    // Note that failing the seek here would also cause the iterator and the
795                    // offset to not be in sync, because the iterator has already moved from
796                    // where it was.
797                    return Ok(i);
798                }
799            }
800        }
801
802        Ok(new_index)
803    }
804
805    /// Returns `None` if there are no more entries to be read.  `sink` can choose to return
806    /// `ControlFlow::Break(_)` in which case the entry will be returned the next time `readdir` is
807    /// called.
808    pub fn readdir<B, S: FnMut(u64, fio::DirentType, &[u8]) -> ControlFlow<B, ()>>(
809        &self,
810        mut sink: S,
811    ) -> Result<Option<B>, zx::Status> {
812        let mut state = self.state.lock();
813        loop {
814            let entry = state.next(&self.proxy)?;
815            if let ControlFlow::Break(b) = match &entry {
816                Entry::Some { ino, entry_type, name } => {
817                    sink(*ino, *entry_type, state.name(name.clone()))
818                }
819                Entry::DotDot => sink(0, fio::DirentType::Directory, b".."),
820                Entry::None => break,
821            } {
822                state.pending_entry = entry;
823                return Ok(Some(b));
824            }
825        }
826        Ok(None)
827    }
828
829    /// Wraps fuchsia.io/Node's Sync.
830    pub fn sync(&self) -> Result<(), zx::Status> {
831        self.proxy
832            .sync(zx::MonotonicInstant::INFINITE)
833            .map_err(|_| zx::Status::IO)?
834            .map_err(zx::Status::from_raw)
835    }
836
837    /// Clones (in the fuchsia.unknown.Clonable sense) the underlying proxy.
838    pub fn clone_proxy(&self) -> Result<fio::DirectorySynchronousProxy, zx::Status> {
839        let (client_end, server_end) = zx::Channel::create();
840        self.proxy.clone(server_end.into()).map_err(|_| zx::Status::IO)?;
841        Ok(client_end.into())
842    }
843}
844
845#[cfg(test)]
846mod tests {
847    use super::*;
848    use fidl::endpoints::{ControlHandle, RequestStream};
849    use fuchsia_async as fasync;
850    use futures::StreamExt;
851    use std::sync::Arc;
852    use std::sync::atomic::{AtomicU64, Ordering};
853
854    #[fuchsia::test]
855    async fn test_read_chunking() {
856        let (client, mut stream) = fidl::endpoints::create_request_stream::<fio::FileMarker>();
857        let content = vec![0xAB; (fio::MAX_TRANSFER_SIZE + 100) as usize];
858        let content_clone = content.clone();
859
860        let _server_task = fasync::Task::spawn(async move {
861            while let Some(Ok(request)) = stream.next().await {
862                match request {
863                    fio::FileRequest::ReadAt { count, offset, responder } => {
864                        let start = offset as usize;
865                        let end = std::cmp::min(start + count as usize, content_clone.len());
866                        let data = if start < content_clone.len() {
867                            &content_clone[start..end]
868                        } else {
869                            &[]
870                        };
871                        responder.send(Ok(data)).unwrap();
872                    }
873                    _ => panic!("Unexpected request: {:?}", request),
874                }
875            }
876        });
877
878        let io = RemoteIo::new(client.into_channel().into());
879        fasync::unblock(move || {
880            let mut data = Vec::new();
881            let actual = io
882                .read(
883                    0,
884                    content.len(),
885                    |chunk| -> Result<usize, zx::Status> {
886                        let len = chunk.len();
887                        data.extend(chunk);
888                        Ok(len)
889                    },
890                    |status| status,
891                )
892                .unwrap();
893            assert_eq!(actual, content.len());
894            assert_eq!(data, content);
895        })
896        .await;
897    }
898
899    #[fuchsia::test]
900    async fn test_read_error_after_data() {
901        let (client, mut stream) = fidl::endpoints::create_request_stream::<fio::FileMarker>();
902        let chunk_size = 100;
903        let content = vec![0xAA; chunk_size];
904        let content_clone = content.clone();
905
906        let _server_task = fasync::Task::spawn(async move {
907            let mut request_count = 0;
908            while let Some(Ok(request)) = stream.next().await {
909                match request {
910                    fio::FileRequest::ReadAt { count: _, offset: _, responder } => {
911                        request_count += 1;
912                        if request_count == 1 {
913                            responder.send(Ok(&content_clone)).unwrap();
914                        } else {
915                            responder.send(Err(zx::sys::ZX_ERR_IO)).unwrap();
916                        }
917                    }
918                    _ => panic!("Unexpected request: {:?}", request),
919                }
920            }
921        });
922
923        let io = RemoteIo::new(client.into_channel().into());
924        fasync::unblock(move || {
925            let mut data = Vec::new();
926            // Ask for more than chunk_size to ensure a second request is made.
927            let actual = io
928                .read(
929                    0,
930                    chunk_size * 2,
931                    |chunk| -> Result<usize, zx::Status> {
932                        let len = chunk.len();
933                        data.extend(chunk);
934                        Ok(len)
935                    },
936                    |status| status,
937                )
938                .expect("read should succeed even if later chunks fail");
939            assert_eq!(actual, chunk_size);
940            assert_eq!(data.len(), chunk_size);
941            assert_eq!(data, content);
942        })
943        .await;
944    }
945
946    #[fuchsia::test]
947    async fn test_write_chunking() {
948        let (client, mut stream) = fidl::endpoints::create_request_stream::<fio::FileMarker>();
949        let content = vec![0xCD; (fio::MAX_TRANSFER_SIZE + 100) as usize];
950        let content2 = content.clone();
951
952        let server_task = fasync::Task::spawn(async move {
953            let mut written = vec![0; content2.len()];
954            while let Some(Ok(request)) = stream.next().await {
955                match request {
956                    fio::FileRequest::WriteAt { offset, data, responder, .. } => {
957                        let offset = offset as usize;
958                        written[offset..offset + data.len()].copy_from_slice(&data);
959                        responder.send(Ok(data.len() as u64)).unwrap();
960                    }
961                    _ => panic!("Unexpected request: {:?}", request),
962                }
963            }
964            assert_eq!(written, content2);
965        });
966
967        let io = RemoteIo::new(client.into_channel().into());
968        fasync::unblock(move || {
969            let written = io.write(0, &content).expect("write failed");
970            assert_eq!(written, content.len());
971        })
972        .await;
973
974        server_task.await;
975    }
976
977    #[fuchsia::test]
978    async fn test_write_error_after_data() {
979        let (client, mut stream) = fidl::endpoints::create_request_stream::<fio::FileMarker>();
980        let chunk_size = fio::MAX_TRANSFER_SIZE as usize;
981        let content = vec![0xEE; chunk_size * 2];
982
983        let _server_task = fasync::Task::spawn(async move {
984            let mut request_count = 0;
985            while let Some(Ok(request)) = stream.next().await {
986                match request {
987                    fio::FileRequest::WriteAt { offset: _, data, responder, .. } => {
988                        request_count += 1;
989                        if request_count == 1 {
990                            responder.send(Ok(data.len() as u64)).unwrap();
991                        } else {
992                            responder.send(Err(zx::sys::ZX_ERR_IO)).unwrap();
993                        }
994                    }
995                    _ => panic!("Unexpected request: {:?}", request),
996                }
997            }
998        });
999
1000        let io = RemoteIo::new(client.into_channel().into());
1001        fasync::unblock(move || {
1002            let written = io.write(0, &content).expect("write should succeed partial");
1003            assert_eq!(written, chunk_size);
1004        })
1005        .await;
1006    }
1007
1008    #[fuchsia::test]
1009    async fn test_large_directory() {
1010        let (client, mut stream) = fidl::endpoints::create_request_stream::<fio::DirectoryMarker>();
1011        let num_entries = 2000;
1012
1013        let task = fasync::Task::spawn(async move {
1014            let mut sent_count = 0;
1015            let mut num_requests = 0;
1016            while let Some(Ok(request)) = stream.next().await {
1017                match request {
1018                    fio::DirectoryRequest::ReadDirents { max_bytes, responder } => {
1019                        num_requests += 1;
1020                        let mut buffer = Vec::new();
1021                        while sent_count < num_entries {
1022                            let name = if sent_count == 0 {
1023                                ".".to_string()
1024                            } else {
1025                                format!("file_{}", sent_count - 1)
1026                            };
1027                            let name_bytes = name.as_bytes();
1028                            let entry_size = 10 + name_bytes.len();
1029                            if buffer.len() + entry_size > max_bytes as usize {
1030                                break;
1031                            }
1032                            buffer.extend_from_slice(&(sent_count as u64 + 1).to_le_bytes());
1033                            buffer.push(name_bytes.len() as u8);
1034                            let entry_type = if sent_count == 0 {
1035                                fio::DirentType::Directory
1036                            } else {
1037                                fio::DirentType::File
1038                            };
1039                            buffer.push(entry_type.into_primitive());
1040                            buffer.extend_from_slice(name_bytes);
1041                            sent_count += 1;
1042                        }
1043                        let _ = responder.send(0, &buffer);
1044                    }
1045                    fio::DirectoryRequest::Rewind { responder } => {
1046                        sent_count = 0;
1047                        let _ = responder.send(0);
1048                    }
1049                    fio::DirectoryRequest::Close { responder } => {
1050                        let _ = responder.send(Ok(()));
1051                    }
1052                    _ => {}
1053                }
1054            }
1055            assert!(num_requests > 0);
1056        });
1057
1058        let dir = RemoteDirectory::new(client.into_channel().into());
1059        let count = Arc::new(AtomicU64::new(0));
1060        let count2 = count.clone();
1061        fasync::unblock(move || {
1062            dir.readdir::<(), _>(|_ino, _type, _name| {
1063                count.fetch_add(1, Ordering::Relaxed);
1064                ControlFlow::Continue(())
1065            })
1066            .unwrap();
1067        })
1068        .await;
1069        // Expect num_entries + 1 (for synthesized "..")
1070        assert_eq!(count2.load(Ordering::Relaxed), num_entries + 1);
1071        task.await;
1072    }
1073
1074    #[fuchsia::test]
1075    async fn test_seek_backwards() {
1076        let (client, mut stream) = fidl::endpoints::create_request_stream::<fio::DirectoryMarker>();
1077        let _server_task = fasync::Task::spawn(async move {
1078            let entries = vec![
1079                (1, fio::DirentType::Directory, "."),
1080                (2, fio::DirentType::File, "file_0"),
1081                (3, fio::DirentType::File, "file_1"),
1082                (4, fio::DirentType::File, "file_2"),
1083            ];
1084            let mut current_entry = 0;
1085
1086            while let Some(Ok(request)) = stream.next().await {
1087                match request {
1088                    fio::DirectoryRequest::ReadDirents { max_bytes, responder } => {
1089                        let mut buffer = Vec::new();
1090                        while current_entry < entries.len() {
1091                            let (ino, type_, name) = entries[current_entry];
1092                            let name_bytes = name.as_bytes();
1093                            let entry_size = 10 + name_bytes.len();
1094                            if buffer.len() + entry_size > max_bytes as usize {
1095                                break;
1096                            }
1097                            buffer.extend_from_slice(&(ino as u64).to_le_bytes());
1098                            buffer.push(name_bytes.len() as u8);
1099                            buffer.push(type_.into_primitive());
1100                            buffer.extend_from_slice(name_bytes);
1101                            current_entry += 1;
1102                        }
1103                        responder.send(0, &buffer).unwrap();
1104                    }
1105                    fio::DirectoryRequest::Rewind { responder } => {
1106                        current_entry = 0;
1107                        responder.send(0).unwrap();
1108                    }
1109                    _ => {}
1110                }
1111            }
1112        });
1113
1114        let dir = RemoteDirectory::new(client.into_channel().into());
1115        fasync::unblock(move || {
1116            let mut names = Vec::new();
1117            // Read 3 entries: ".", "..", "file_0".
1118            dir.readdir::<(), _>(|_ino, _type, name| {
1119                names.push(name.to_vec());
1120                if names.len() == 3 { ControlFlow::Break(()) } else { ControlFlow::Continue(()) }
1121            })
1122            .unwrap();
1123
1124            assert_eq!(names[0], b".");
1125            assert_eq!(names[1], b"..");
1126            assert_eq!(names[2], b"file_0");
1127
1128            // Seek to 1. This triggers rewind() internally because 1 < current_index (3).
1129            // Index 1 corresponds to "..".
1130            dir.seek(1).unwrap();
1131
1132            let mut names_after_seek = Vec::new();
1133            // Read 2 entries: "..", "file_0".
1134            dir.readdir::<(), _>(|_ino, _type, name| {
1135                names_after_seek.push(name.to_vec());
1136                if names_after_seek.len() == 2 {
1137                    ControlFlow::Break(())
1138                } else {
1139                    ControlFlow::Continue(())
1140                }
1141            })
1142            .unwrap();
1143
1144            assert_eq!(names_after_seek[0], b"..");
1145            assert_eq!(names_after_seek[1], b"file_0");
1146        })
1147        .await;
1148    }
1149
1150    struct DummyFactory;
1151
1152    impl Factory for DummyFactory {
1153        type Result = RemoteIo;
1154        fn create_node(self, io: RemoteIo, _info: fio::NodeInfo) -> Self::Result {
1155            io
1156        }
1157        fn create_directory(self, io: RemoteIo, _info: fio::DirectoryInfo) -> Self::Result {
1158            io
1159        }
1160        fn create_file(self, io: RemoteIo, _info: fio::FileInfo) -> Self::Result {
1161            io
1162        }
1163        fn create_symlink(self, io: RemoteIo, _info: fio::SymlinkInfo) -> Self::Result {
1164            io
1165        }
1166    }
1167
1168    #[fuchsia::test]
1169    async fn test_open_pipelined() {
1170        let (client, stream) = fidl::endpoints::create_request_stream::<fio::DirectoryMarker>();
1171
1172        fn serve_mock_directory(
1173            mut stream: fio::DirectoryRequestStream,
1174            mut expected_paths: Vec<String>,
1175        ) {
1176            fasync::Task::spawn(async move {
1177                if let Some(Ok(request)) = stream.next().await {
1178                    match request {
1179                        fio::DirectoryRequest::Open { path, flags, options: _, object, .. } => {
1180                            if !expected_paths.is_empty() {
1181                                let expected = expected_paths.remove(0);
1182                                assert_eq!(path, expected);
1183                                if expected == "path1" {
1184                                    assert!(flags.contains(fio::Flags::PROTOCOL_DIRECTORY));
1185                                } else if expected == "path2" {
1186                                    assert!(!flags.contains(fio::Flags::PROTOCOL_DIRECTORY));
1187                                }
1188                            }
1189                            let server_end =
1190                                fidl::endpoints::ServerEnd::<fio::DirectoryMarker>::new(object);
1191                            let dir_stream = server_end.into_stream();
1192                            let control_handle = dir_stream.control_handle();
1193                            let representation =
1194                                fio::Representation::Directory(fio::DirectoryInfo::default());
1195                            control_handle.send_on_representation(representation).unwrap();
1196
1197                            serve_mock_directory(dir_stream, expected_paths);
1198                        }
1199                        _ => {}
1200                    }
1201                }
1202            })
1203            .detach();
1204        }
1205
1206        serve_mock_directory(stream, vec!["path1".to_string(), "path2".to_string()]);
1207
1208        let io = RemoteIo::new(client.into_channel().into());
1209        fasync::unblock(move || {
1210            let results = io
1211                .open_pipelined(
1212                    &["path1", "path2"],
1213                    fio::Flags::empty(),
1214                    fio::NodeAttributesQuery::empty(),
1215                    || DummyFactory,
1216                )
1217                .collect::<Vec<_>>();
1218            assert_eq!(results.len(), 2);
1219            assert!(results.iter().all(|r| r.is_ok()));
1220        })
1221        .await;
1222    }
1223
1224    #[fuchsia::test]
1225    async fn test_open_pipelined_not_found() {
1226        let (client, stream) = fidl::endpoints::create_request_stream::<fio::DirectoryMarker>();
1227
1228        fn serve_mock_directory_not_found(mut stream: fio::DirectoryRequestStream) {
1229            fasync::Task::spawn(async move {
1230                if let Some(Ok(request)) = stream.next().await {
1231                    match request {
1232                        fio::DirectoryRequest::Open {
1233                            path, flags: _, options: _, object, ..
1234                        } => {
1235                            let server_end =
1236                                fidl::endpoints::ServerEnd::<fio::DirectoryMarker>::new(object);
1237                            let dir_stream = server_end.into_stream();
1238                            let control_handle = dir_stream.control_handle();
1239
1240                            if path == "not_found" {
1241                                control_handle.shutdown_with_epitaph(zx::Status::NOT_FOUND);
1242                            } else {
1243                                let representation =
1244                                    fio::Representation::Directory(fio::DirectoryInfo::default());
1245                                control_handle.send_on_representation(representation).unwrap();
1246                                serve_mock_directory_not_found(dir_stream);
1247                            }
1248                        }
1249                        _ => {}
1250                    }
1251                }
1252            })
1253            .detach();
1254        }
1255
1256        serve_mock_directory_not_found(stream);
1257
1258        let io = RemoteIo::new(client.into_channel().into());
1259        fasync::unblock(move || {
1260            let results = io
1261                .open_pipelined(
1262                    &["path1", "not_found", "path3"],
1263                    fio::Flags::empty(),
1264                    fio::NodeAttributesQuery::empty(),
1265                    || DummyFactory,
1266                )
1267                .collect::<Vec<_>>();
1268            assert_eq!(results.len(), 2);
1269            assert!(results[0].is_ok());
1270            assert_eq!(results[1].as_ref().err(), Some(&zx::Status::NOT_FOUND));
1271        })
1272        .await;
1273    }
1274
1275    #[fuchsia::test]
1276    async fn test_open_pipelined_peer_closed() {
1277        let (client, stream) = fidl::endpoints::create_request_stream::<fio::DirectoryMarker>();
1278
1279        fn serve_mock_directory_close_early(mut stream: fio::DirectoryRequestStream) {
1280            fasync::Task::spawn(async move {
1281                if let Some(Ok(request)) = stream.next().await {
1282                    match request {
1283                        fio::DirectoryRequest::Open { object, .. } => {
1284                            // We just drop the object (the server_end of the channel), closing it.
1285                            drop(object);
1286                        }
1287                        _ => {}
1288                    }
1289                }
1290            })
1291            .detach();
1292        }
1293
1294        serve_mock_directory_close_early(stream);
1295
1296        let io = RemoteIo::new(client.into_channel().into());
1297        fasync::unblock(move || {
1298            let results = io
1299                .open_pipelined(
1300                    &["path1", "path2"],
1301                    fio::Flags::empty(),
1302                    fio::NodeAttributesQuery::empty(),
1303                    || DummyFactory,
1304                )
1305                .collect::<Vec<_>>();
1306            assert_eq!(results.len(), 1);
1307            assert_eq!(results[0].as_ref().err(), Some(&zx::Status::PEER_CLOSED));
1308        })
1309        .await;
1310    }
1311}