Skip to main content

sync_io_client/
lib.rs

1// Copyright 2026 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! This library provides a synchronous wrapper around fuchsia.io.  It is usually better to use the
6//! Rust standard library for this (which will use fdio and zxio).  The primary user of this library
7//! is Starnix which uses this for performance and memory reasons.
8
9use fidl::endpoints::SynchronousProxy;
10use fidl_fuchsia_io as fio;
11use fuchsia_sync::Mutex;
12use std::ops::{ControlFlow, Range};
13use syncio::{AllocateMode, zxio_fsverity_descriptor_t, zxio_node_attributes_t};
14use zerocopy::FromBytes;
15
16/// Converts FIDL attributes to `zxio_node_attributes_t`.
17///
18/// NOTE: This does not work for _all_ attributes e.g. fsverity options and root hash.
19fn zxio_attr_from_fidl(
20    mutable: &fio::MutableNodeAttributes,
21    immutable: &fio::ImmutableNodeAttributes,
22) -> zxio_node_attributes_t {
23    let mut out_attr = zxio_node_attributes_t::default();
24    if let Some(protocols) = immutable.protocols {
25        out_attr.protocols = protocols.bits();
26        out_attr.has.protocols = true;
27    }
28    if let Some(abilities) = immutable.abilities {
29        out_attr.abilities = abilities.bits();
30        out_attr.has.abilities = true;
31    }
32    if let Some(id) = immutable.id {
33        out_attr.id = id;
34        out_attr.has.id = true;
35    }
36    if let Some(content_size) = immutable.content_size {
37        out_attr.content_size = content_size;
38        out_attr.has.content_size = true;
39    }
40    if let Some(storage_size) = immutable.storage_size {
41        out_attr.storage_size = storage_size;
42        out_attr.has.storage_size = true;
43    }
44    if let Some(link_count) = immutable.link_count {
45        out_attr.link_count = link_count;
46        out_attr.has.link_count = true;
47    }
48    if let Some(creation_time) = mutable.creation_time {
49        out_attr.creation_time = creation_time;
50        out_attr.has.creation_time = true;
51    }
52    if let Some(modification_time) = mutable.modification_time {
53        out_attr.modification_time = modification_time;
54        out_attr.has.modification_time = true;
55    }
56    if let Some(access_time) = mutable.access_time {
57        out_attr.access_time = access_time;
58        out_attr.has.access_time = true;
59    }
60    if let Some(mode) = mutable.mode {
61        out_attr.mode = mode;
62        out_attr.has.mode = true;
63    }
64    if let Some(uid) = mutable.uid {
65        out_attr.uid = uid;
66        out_attr.has.uid = true;
67    }
68    if let Some(gid) = mutable.gid {
69        out_attr.gid = gid;
70        out_attr.has.gid = true;
71    }
72    if let Some(rdev) = mutable.rdev {
73        out_attr.rdev = rdev;
74        out_attr.has.rdev = true;
75    }
76    if let Some(change_time) = immutable.change_time {
77        out_attr.change_time = change_time;
78        out_attr.has.change_time = true;
79    }
80    if let Some(casefold) = mutable.casefold {
81        out_attr.casefold = casefold;
82        out_attr.has.casefold = true;
83    }
84    if let Some(verity_enabled) = immutable.verity_enabled {
85        out_attr.fsverity_enabled = verity_enabled;
86        out_attr.has.fsverity_enabled = true;
87    }
88    if let Some(wrapping_key_id) = mutable.wrapping_key_id {
89        out_attr.wrapping_key_id = wrapping_key_id;
90        out_attr.has.wrapping_key_id = true;
91    }
92    out_attr
93}
94
95/// This trait is used so that clients can create their own wrappers around types exposed here.
96pub trait Factory {
97    type Result;
98
99    fn create_node(self, io: RemoteIo, info: fio::NodeInfo) -> Self::Result;
100    fn create_directory(self, io: RemoteIo, info: fio::DirectoryInfo) -> Self::Result;
101    fn create_file(self, io: RemoteIo, info: fio::FileInfo) -> Self::Result;
102    fn create_symlink(self, io: RemoteIo, info: fio::SymlinkInfo) -> Self::Result;
103}
104
105/// Waits for the fuchsia.io `OnRepresentation` event and then uses the factory to create an an
106/// appropriate object.  This returns attributes (as requested by the corresponding `open` call)
107/// that are present in the `OnRepresentation` event.
108///
109/// NOTE: The attributes returned are not comprehensive.  Check `zxio_attr_from_fidl` above for
110/// supported attributes.
111pub fn create_with_on_representation<F: Factory>(
112    proxy: fio::NodeSynchronousProxy,
113    factory: F,
114) -> Result<F::Result, zx::Status> {
115    match proxy.wait_for_event(zx::MonotonicInstant::INFINITE) {
116        Ok(fio::NodeEvent::OnRepresentation { payload }) => match payload {
117            fio::Representation::Node(info) => Ok(factory.create_node(RemoteIo::new(proxy), info)),
118            fio::Representation::Directory(info) => {
119                Ok(factory.create_directory(RemoteIo::new(proxy), info))
120            }
121            fio::Representation::File(mut info) => {
122                let io = RemoteIo {
123                    proxy,
124                    stream: info
125                        .stream
126                        .take()
127                        .map(zx::Stream::from)
128                        .unwrap_or_else(|| zx::NullableHandle::invalid().into()),
129                };
130                Ok(factory.create_file(io, info))
131            }
132            fio::Representation::Symlink(info) => {
133                Ok(factory.create_symlink(RemoteIo::new(proxy), info))
134            }
135            _ => Err(zx::Status::NOT_SUPPORTED),
136        },
137        Err(fidl::Error::ClientChannelClosed { status, .. }) => Err(status),
138        _ => Err(zx::Status::IO),
139    }
140}
141
142/// Wraps a proxy and optional stream and provides wrappers around most fuchsia.io methods.
143///
144/// NOTE: The caller must take care to call appropriate methods for the underlying type.  Calling
145/// the wrong methods (e.g. calling file methods on a directory) will result in the connection being
146/// closed.
147pub struct RemoteIo {
148    proxy: fio::NodeSynchronousProxy,
149    // NOTE: This can be invalid if the remote end did not return a stream in which case
150    // file I/O will use FIDL (slow).
151    stream: zx::Stream,
152}
153
154impl RemoteIo {
155    pub fn new(proxy: fio::NodeSynchronousProxy) -> Self {
156        Self { proxy, stream: zx::NullableHandle::invalid().into() }
157    }
158
159    pub fn with_stream(proxy: fio::NodeSynchronousProxy, stream: zx::Stream) -> Self {
160        Self { proxy, stream }
161    }
162
163    pub fn into_proxy(self) -> fio::NodeSynchronousProxy {
164        self.proxy
165    }
166
167    fn cast_proxy<T: From<zx::Channel> + Into<zx::NullableHandle>>(&self) -> zx::Unowned<'_, T> {
168        zx::Unowned::new(self.proxy.as_channel())
169    }
170
171    /// Returns attributes in fuchsia.io's FIDL representation.
172    pub fn attr_get(
173        &self,
174        query: fio::NodeAttributesQuery,
175    ) -> Result<(fio::MutableNodeAttributes, fio::ImmutableNodeAttributes), zx::Status> {
176        self.proxy
177            .get_attributes(query, zx::MonotonicInstant::INFINITE)
178            .map_err(|_| zx::Status::IO)?
179            .map_err(zx::Status::from_raw)
180    }
181
182    /// Returns attributes mapped to `zxio_node_attributes_t`
183    ///
184    /// NOTE: Not all attributes are supported.  See `zxio_attr_from_fidl` above for supported
185    /// attributes.
186    pub fn attr_get_zxio(
187        &self,
188        query: fio::NodeAttributesQuery,
189    ) -> Result<zxio_node_attributes_t, zx::Status> {
190        self.attr_get(query).map(|(m, i)| zxio_attr_from_fidl(&m, &i))
191    }
192
193    /// Sets attributes.
194    pub fn attr_set(&self, attributes: fio::MutableNodeAttributes) -> Result<(), zx::Status> {
195        self.proxy
196            .update_attributes(&attributes, zx::MonotonicInstant::INFINITE)
197            .map_err(|_| zx::Status::IO)?
198            .map_err(zx::Status::from_raw)
199    }
200
201    /// Wraps fuchsia.io/Directory's Open.
202    pub fn open<F: Factory>(
203        &self,
204        path: &str,
205        flags: fio::Flags,
206        create_attributes: Option<fio::MutableNodeAttributes>,
207        query: fio::NodeAttributesQuery,
208        factory: F,
209    ) -> Result<F::Result, zx::Status> {
210        let (client_end, server_end) = zx::Channel::create();
211        let dir_proxy = self.cast_proxy::<fio::DirectorySynchronousProxy>();
212        dir_proxy
213            .open(
214                path,
215                flags | fio::Flags::FLAG_SEND_REPRESENTATION,
216                &fio::Options {
217                    attributes: (!query.is_empty()).then_some(query),
218                    create_attributes,
219                    ..Default::default()
220                },
221                server_end,
222            )
223            .map_err(|_| zx::Status::IO)?;
224        create_with_on_representation(client_end.into(), factory)
225    }
226
227    /// Opens all nodes iteratively along the relative sub-paths given in `paths`.
228    /// Each item in `paths` is opened from the node returned by opening the previous item in
229    /// `paths`.
230    ///
231    /// `factory_fn` is used to create the `factory` argument to `create_with_on_representation`.
232    ///
233    /// The return vector can be smaller than the initial `paths` vector, and execution will
234    /// always stop with the first error appending an `Err(status)` into the results.
235    ///
236    /// NOTE: To prevent opening and writing through non-directory nodes, this function adds
237    /// `fio::Flags::PROTOCOL_DIRECTORY` to intermediate components.
238    pub fn open_pipelined<F: Factory>(
239        &self,
240        paths: &[&str],
241        flags: fio::Flags,
242        query: fio::NodeAttributesQuery,
243        mut factory_fn: impl FnMut() -> F,
244    ) -> Vec<Result<F::Result, zx::Status>> {
245        let mut proxy_to_result =
246            |proxies: Vec<fio::NodeSynchronousProxy>| -> Vec<Result<F::Result, zx::Status>> {
247                let mut results = Vec::new();
248                for proxy in proxies {
249                    let result = create_with_on_representation(proxy, factory_fn());
250                    let is_err = result.is_err();
251                    results.push(result);
252                    // If the latest result is an error, return early.
253                    if is_err {
254                        break;
255                    }
256                }
257                results
258            };
259
260        let mut proxies: Vec<fio::NodeSynchronousProxy> = Vec::with_capacity(paths.len());
261
262        for (i, path) in paths.iter().enumerate() {
263            let (client_end, server_end) = zx::Channel::create();
264            let channel = proxies.last().unwrap_or(&self.proxy).as_channel();
265            let dir_proxy = zx::Unowned::<fio::DirectorySynchronousProxy>::new(channel);
266
267            let mut open_flags = flags | fio::Flags::FLAG_SEND_REPRESENTATION;
268            if i < paths.len() - 1 {
269                open_flags |= fio::Flags::PROTOCOL_DIRECTORY;
270            }
271
272            #[cfg(test)]
273            {
274                TEST_HOOK.with(|hook| {
275                    if let Some(hook) = hook.borrow_mut().as_mut() {
276                        hook();
277                    }
278                });
279            }
280
281            if let Err(err) = dir_proxy.open(
282                path,
283                open_flags,
284                &fio::Options {
285                    attributes: (!query.is_empty()).then_some(query),
286                    ..Default::default()
287                },
288                server_end,
289            ) {
290                // The open call failed: return the result up to this point.
291                let mut results = proxy_to_result(proxies);
292                // If no error happened before this open call, add the error.
293                if results.last().is_none_or(|r| r.is_ok()) {
294                    let status = match err {
295                        fidl::Error::ClientChannelClosed { status, .. } => status,
296                        fidl::Error::ClientWrite(fidl::TransportError::Status(status)) => status,
297                        _ => zx::Status::IO,
298                    };
299                    results.push(Err(status));
300                }
301                return results;
302            }
303            proxies.push(fio::NodeSynchronousProxy::new(client_end));
304        }
305
306        return proxy_to_result(proxies);
307    }
308
309    /// Returns `(data, eof)`, where `eof` is true if we encountered the end of the file.  If `eof`
310    /// is false, then it is still possible that a subsequent read would read no more i.e. the end
311    /// of the file _might_ have been reached.  This might return fewer bytes than `max`.
312    pub fn read_partial(&self, offset: u64, max: usize) -> Result<(Vec<u8>, bool), zx::Status> {
313        if self.stream.is_invalid() {
314            let file_proxy = self.cast_proxy::<fio::FileSynchronousProxy>();
315            let max = std::cmp::min(max as u64, fio::MAX_TRANSFER_SIZE);
316            let data = file_proxy
317                .read_at(max, offset, zx::MonotonicInstant::INFINITE)
318                .map_err(|_| zx::Status::IO)?
319                .map_err(zx::Status::from_raw)?;
320            let eof = (data.len() as u64) < max;
321            Ok((data, eof))
322        } else {
323            // Use an intermediate buffer.
324            let bytes =
325                self.stream.read_at_to_vec(zx::StreamReadOptions::empty(), offset as u64, max)?;
326            let eof = bytes.len() < max;
327            Ok((bytes, eof))
328        }
329    }
330
331    /// Attempts to read `len` bytes and will only return fewer if it encounters the end of the
332    /// file, or an error.  `callback` will be called for each chunk.  If any bytes are successfully
333    /// passed to `callback`, `read` will return the total number of bytes successfully written and
334    /// any error encountered will be discarded.
335    pub fn read<E>(
336        &self,
337        offset: u64,
338        len: usize,
339        mut callback: impl FnMut(Vec<u8>) -> Result<usize, E>,
340        map_err: impl FnOnce(zx::Status) -> E,
341    ) -> Result<usize, E> {
342        let mut total = 0;
343        while total < len {
344            match self.read_partial(offset + total as u64, len - total) {
345                Ok((data, eof)) => {
346                    if data.is_empty() {
347                        break;
348                    }
349                    let data_len = data.len();
350                    let written = callback(data)?;
351                    total += written;
352                    if eof || written < data_len {
353                        break;
354                    }
355                }
356                Err(e) => {
357                    if total > 0 {
358                        break;
359                    }
360                    return Err(map_err(e));
361                }
362            }
363        }
364        Ok(total)
365    }
366
367    /// Writes `data` at `offset`.
368    pub fn write(&self, offset: u64, data: &[u8]) -> Result<usize, zx::Status> {
369        let file_proxy = self.cast_proxy::<fio::FileSynchronousProxy>();
370        let mut total_written = 0;
371        for chunk in data.chunks(fio::MAX_TRANSFER_SIZE as usize) {
372            let result = file_proxy
373                .write_at(chunk, offset + total_written as u64, zx::MonotonicInstant::INFINITE)
374                .map_err(|_| zx::Status::IO)
375                .and_then(|res| res.map_err(zx::Status::from_raw));
376            match result {
377                Ok(actual) => {
378                    let actual = actual as usize;
379                    total_written += actual;
380                    if actual < chunk.len() {
381                        return Ok(total_written);
382                    }
383                }
384                Err(e) => {
385                    if total_written > 0 {
386                        break;
387                    }
388                    return Err(e);
389                }
390            }
391        }
392        Ok(total_written)
393    }
394
395    /// Returns true if vectored operations are supported.
396    pub fn supports_vectored(&self) -> bool {
397        // We only support readv and writev if we have a stream.
398        !self.stream.is_invalid()
399    }
400
401    /// Reads into `iovecs` using a vectored read.  This is only supported with a valid stream.  See
402    /// `supports_vectored` above.
403    ///
404    /// # Safety
405    ///
406    /// Same as `zx::Stream::readv`.
407    pub unsafe fn readv(
408        &self,
409        offset: u64,
410        iovecs: &mut [zx::sys::zx_iovec_t],
411    ) -> Result<usize, zx::Status> {
412        if self.stream.is_invalid() {
413            return Err(zx::Status::NOT_SUPPORTED);
414        }
415        // SAFETY: See `zx::Stream::readv`.
416        unsafe { self.stream.readv_at(zx::StreamReadOptions::empty(), offset as u64, iovecs) }
417    }
418
419    /// Writes from `iovecs` using vectored write.  This is only supported with a valid stream.  See
420    /// `supports_vectored` above.
421    pub fn writev(&self, offset: u64, iovecs: &[zx::sys::zx_iovec_t]) -> Result<usize, zx::Status> {
422        if self.stream.is_invalid() {
423            return Err(zx::Status::NOT_SUPPORTED);
424        }
425        self.stream.writev_at(zx::StreamWriteOptions::empty(), offset, &iovecs)
426    }
427
428    /// Wraps fuchsia.io/File's Truncate.
429    pub fn truncate(&self, length: u64) -> Result<(), zx::Status> {
430        self.cast_proxy::<fio::FileSynchronousProxy>()
431            .resize(length, zx::MonotonicInstant::INFINITE)
432            .map_err(|_| zx::Status::IO)?
433            .map_err(zx::Status::from_raw)
434    }
435
436    /// Returns a VMO backing the file.
437    pub fn vmo_get(&self, flags: zx::VmarFlags) -> Result<zx::Vmo, zx::Status> {
438        let mut fio_flags = fio::VmoFlags::empty();
439        if flags.contains(zx::VmarFlags::PERM_READ) {
440            fio_flags |= fio::VmoFlags::READ;
441        }
442        if flags.contains(zx::VmarFlags::PERM_WRITE) {
443            fio_flags |= fio::VmoFlags::WRITE;
444        }
445        if flags.contains(zx::VmarFlags::PERM_EXECUTE) {
446            fio_flags |= fio::VmoFlags::EXECUTE;
447        }
448        let file_proxy = self.cast_proxy::<fio::FileSynchronousProxy>();
449        let vmo = file_proxy
450            .get_backing_memory(fio_flags, zx::MonotonicInstant::INFINITE)
451            .map_err(|_| zx::Status::IO)?
452            .map_err(zx::Status::from_raw)?;
453        Ok(vmo)
454    }
455
456    /// Wraps fuchsia.io/Node's Sync.
457    pub fn sync(&self) -> Result<(), zx::Status> {
458        self.proxy
459            .sync(zx::MonotonicInstant::INFINITE)
460            .map_err(|_| zx::Status::IO)?
461            .map_err(zx::Status::from_raw)
462    }
463
464    /// Closes and updates access time asynchronously.
465    pub fn close_and_update_access_time(self) {
466        let _ = self.proxy.get_attributes(
467            fio::NodeAttributesQuery::PENDING_ACCESS_TIME_UPDATE,
468            zx::MonotonicInstant::INFINITE_PAST,
469        );
470    }
471
472    /// Clones (in the fuchsia.unknown.Clonable sense) the underlying proxy.
473    pub fn clone_proxy(&self) -> Result<fio::NodeSynchronousProxy, zx::Status> {
474        let (client_end, server_end) = zx::Channel::create();
475        self.proxy.clone(server_end.into()).map_err(|_| zx::Status::IO)?;
476        Ok(client_end.into())
477    }
478
479    /// Wraps fuchsia.io/Node's LinkInto.
480    pub fn link_into(&self, target_dir: &Self, name: &str) -> Result<(), zx::Status> {
481        let target_dir_proxy = target_dir.cast_proxy::<fio::DirectorySynchronousProxy>();
482        let (status, token) = target_dir_proxy
483            .get_token(zx::MonotonicInstant::INFINITE)
484            .map_err(|_| zx::Status::IO)?;
485        zx::Status::ok(status)?;
486        let token = token.ok_or(zx::Status::NOT_SUPPORTED)?;
487
488        // Linkable::LinkInto is a separate protocol.
489        let linkable_proxy = self.cast_proxy::<fio::LinkableSynchronousProxy>();
490
491        linkable_proxy
492            .link_into(token.into(), name, zx::MonotonicInstant::INFINITE)
493            .map_err(|_| zx::Status::IO)?
494            .map_err(zx::Status::from_raw)
495    }
496
497    /// Wraps fuchsia.io/Directory's Unlink.
498    pub fn unlink(&self, name: &str, flags: fio::UnlinkFlags) -> Result<(), zx::Status> {
499        let options = fio::UnlinkOptions { flags: Some(flags), ..Default::default() };
500        let dir_proxy = self.cast_proxy::<fio::DirectorySynchronousProxy>();
501        dir_proxy
502            .unlink(name, &options, zx::MonotonicInstant::INFINITE)
503            .map_err(|_| zx::Status::IO)?
504            .map_err(zx::Status::from_raw)
505    }
506
507    /// Wraps fuchsia.io/Directory's Rename.
508    pub fn rename(
509        &self,
510        old_path: &str,
511        new_directory: &Self,
512        new_path: &str,
513    ) -> Result<(), zx::Status> {
514        let new_dir_proxy = new_directory.cast_proxy::<fio::DirectorySynchronousProxy>();
515        let (status, token) =
516            new_dir_proxy.get_token(zx::MonotonicInstant::INFINITE).map_err(|_| zx::Status::IO)?;
517        zx::Status::ok(status)?;
518        let token = token.ok_or(zx::Status::NOT_SUPPORTED)?;
519        let dir_proxy = self.cast_proxy::<fio::DirectorySynchronousProxy>();
520        dir_proxy
521            .rename(old_path, token.into(), new_path, zx::MonotonicInstant::INFINITE)
522            .map_err(|_| zx::Status::IO)?
523            .map_err(zx::Status::from_raw)
524    }
525
526    /// Wraps fuchsia.io/Directory's CreateSymlink.
527    pub fn create_symlink(&self, name: &str, target: &[u8]) -> Result<RemoteIo, zx::Status> {
528        let dir_proxy = self.cast_proxy::<fio::DirectorySynchronousProxy>();
529        let (client_end, server_end) = zx::Channel::create();
530        dir_proxy
531            .create_symlink(name, target, Some(server_end.into()), zx::MonotonicInstant::INFINITE)
532            .map_err(|_| zx::Status::IO)?
533            .map_err(zx::Status::from_raw)?;
534        Ok(RemoteIo::new(client_end.into()))
535    }
536
537    /// Wraps fuchsia.io/File's EnableVerity.
538    pub fn enable_verity(&self, descriptor: &zxio_fsverity_descriptor_t) -> Result<(), zx::Status> {
539        let file_proxy = self.cast_proxy::<fio::FileSynchronousProxy>();
540        let options = fio::VerificationOptions {
541            hash_algorithm: Some(match descriptor.hash_algorithm {
542                1 => fio::HashAlgorithm::Sha256,
543                2 => fio::HashAlgorithm::Sha512,
544                _ => return Err(zx::Status::INVALID_ARGS),
545            }),
546            salt: Some(descriptor.salt[..descriptor.salt_size as usize].to_vec()),
547            ..Default::default()
548        };
549        file_proxy
550            .enable_verity(&options, zx::MonotonicInstant::INFINITE)
551            .map_err(|_| zx::Status::IO)?
552            .map_err(zx::Status::from_raw)
553    }
554
555    /// Wraps fuchsia.io/File's Allocate.
556    pub fn allocate(&self, offset: u64, len: u64, mode: AllocateMode) -> Result<(), zx::Status> {
557        let file_proxy = self.cast_proxy::<fio::FileSynchronousProxy>();
558        let mut fio_mode = fio::AllocateMode::empty();
559        if mode.contains(AllocateMode::KEEP_SIZE) {
560            fio_mode |= fio::AllocateMode::KEEP_SIZE;
561        }
562        if mode.contains(AllocateMode::UNSHARE_RANGE) {
563            fio_mode |= fio::AllocateMode::UNSHARE_RANGE;
564        }
565        if mode.contains(AllocateMode::PUNCH_HOLE) {
566            fio_mode |= fio::AllocateMode::PUNCH_HOLE;
567        }
568        if mode.contains(AllocateMode::COLLAPSE_RANGE) {
569            fio_mode |= fio::AllocateMode::COLLAPSE_RANGE;
570        }
571        if mode.contains(AllocateMode::ZERO_RANGE) {
572            fio_mode |= fio::AllocateMode::ZERO_RANGE;
573        }
574        if mode.contains(AllocateMode::INSERT_RANGE) {
575            fio_mode |= fio::AllocateMode::INSERT_RANGE;
576        }
577        file_proxy
578            .allocate(offset, len, fio_mode, zx::MonotonicInstant::INFINITE)
579            .map_err(|_| zx::Status::IO)?
580            .map_err(zx::Status::from_raw)
581    }
582
583    /// Wraps fuchsia.io/Node's GetExtendedAttribute.
584    pub fn xattr_get(&self, name: &[u8]) -> Result<Vec<u8>, zx::Status> {
585        let name_str = std::str::from_utf8(name.as_ref()).map_err(|_| zx::Status::INVALID_ARGS)?;
586        let result = self
587            .proxy
588            .get_extended_attribute(name_str.as_bytes(), zx::MonotonicInstant::INFINITE)
589            .map_err(|_| zx::Status::IO)?
590            .map_err(zx::Status::from_raw)?;
591        match result {
592            fio::ExtendedAttributeValue::Bytes(bytes) => Ok(bytes),
593            fio::ExtendedAttributeValue::Buffer(vmo) => {
594                let size = vmo.get_content_size().map_err(|_| zx::Status::IO)?;
595                let mut bytes = vec![0u8; size as usize];
596                vmo.read(&mut bytes, 0).map_err(|_| zx::Status::IO)?;
597                Ok(bytes)
598            }
599            _ => Err(zx::Status::NOT_SUPPORTED),
600        }
601    }
602
603    /// Wraps fuchsia.io/Node's SetExtendedAttribute.
604    pub fn xattr_set(
605        &self,
606        name: &[u8],
607        value: &[u8],
608        mode: syncio::XattrSetMode,
609    ) -> Result<(), zx::Status> {
610        let val = fio::ExtendedAttributeValue::Bytes(value.to_vec());
611        let fidl_mode = match mode {
612            syncio::XattrSetMode::Set => fio::SetExtendedAttributeMode::Set,
613            syncio::XattrSetMode::Create => fio::SetExtendedAttributeMode::Create,
614            syncio::XattrSetMode::Replace => fio::SetExtendedAttributeMode::Replace,
615        };
616        self.proxy
617            .set_extended_attribute(name, val, fidl_mode, zx::MonotonicInstant::INFINITE)
618            .map_err(|_| zx::Status::IO)?
619            .map_err(zx::Status::from_raw)
620    }
621
622    /// Wraps fuchsia.io/Node's RenoveExtendedAttribute.
623    pub fn xattr_remove(&self, name: &[u8]) -> Result<(), zx::Status> {
624        let name_str = std::str::from_utf8(name.as_ref()).map_err(|_| zx::Status::INVALID_ARGS)?;
625        self.proxy
626            .remove_extended_attribute(name_str.as_bytes(), zx::MonotonicInstant::INFINITE)
627            .map_err(|_| zx::Status::IO)?
628            .map_err(zx::Status::from_raw)
629    }
630
631    /// Wraps fuchsia.io/Node's ListExtendedAttributes.
632    pub fn xattr_list(&self) -> Result<Vec<Vec<u8>>, zx::Status> {
633        let (client_end, server_end) = zx::Channel::create();
634        self.proxy.list_extended_attributes(server_end.into()).map_err(|_| zx::Status::IO)?;
635        let iterator = fio::ExtendedAttributeIteratorSynchronousProxy::new(client_end);
636        let mut all_attrs = vec![];
637        loop {
638            let (attributes, last) = iterator
639                .get_next(zx::MonotonicInstant::INFINITE)
640                .map_err(|_| zx::Status::IO)?
641                .map_err(zx::Status::from_raw)?;
642            all_attrs.extend(attributes);
643            if last {
644                break;
645            }
646        }
647        Ok(all_attrs)
648    }
649}
650
651/// RemoteDirectory supports iteration of directories and things that you can do to directories via
652/// a file descriptor.  Other opterations, such as creating children, can be done via `RemoteIo`.
653/// Iteration is not safe to be done concurrently because there is a seek pointer; `readdir` will
654/// resume from the seek position.
655pub struct RemoteDirectory {
656    proxy: fio::DirectorySynchronousProxy,
657    state: Mutex<State>,
658}
659
660#[derive(Default)]
661struct State {
662    /// Buffer contains the last batch of entries read from the remote end.
663    buffer: Vec<u8>,
664
665    /// Position in the buffer for the next entry.
666    offset: usize,
667
668    /// If the last attempt to write to the sink failed, this contains the entry that is pending to
669    /// be added. This is also used to synthesize dot-dot.
670    pending_entry: Entry,
671
672    /// The current iterator position in the directory.
673    current_index: u64,
674}
675
676impl State {
677    fn name(&self, range: Range<usize>) -> &[u8] {
678        &self.buffer[range]
679    }
680
681    /// Returns the next dir entry. If no more entries are found, returns None.  Returns an error if
682    /// the iterator fails.
683    fn next(&mut self, proxy: &fio::DirectorySynchronousProxy) -> Result<Entry, zx::Status> {
684        let mut next_dirent = || -> Result<Entry, zx::Status> {
685            if self.offset >= self.buffer.len() {
686                match proxy.read_dirents(fio::MAX_BUF, zx::MonotonicInstant::INFINITE) {
687                    Ok((status, dirents)) => {
688                        zx::Status::ok(status)?;
689                        if dirents.is_empty() {
690                            return Ok(Entry::None);
691                        }
692                        self.buffer = dirents;
693                        self.offset = 0;
694                    }
695                    Err(_) => return Err(zx::Status::IO),
696                }
697            }
698
699            #[repr(C, packed)]
700            #[derive(FromBytes)]
701            struct DirectoryEntry {
702                ino: u64,
703                name_len: u8,
704                entry_type: u8,
705            }
706
707            let Some((ino, name, entry_type)) =
708                DirectoryEntry::read_from_prefix(&self.buffer[self.offset..]).ok().and_then(
709                    |(DirectoryEntry { ino, name_len, entry_type }, remainder)| {
710                        let name_len = name_len as usize;
711                        let name_start = self.offset + std::mem::size_of::<DirectoryEntry>();
712                        (remainder.len() >= name_len).then_some((
713                            ino,
714                            name_start..name_start + name_len,
715                            entry_type,
716                        ))
717                    },
718                )
719            else {
720                // Truncated entry.
721                return Ok(Entry::None);
722            };
723
724            self.offset = name.end;
725
726            Ok(Entry::Some {
727                ino,
728                entry_type: fio::DirentType::from_primitive(entry_type).ok_or(zx::Status::IO)?,
729                name,
730            })
731        };
732
733        let mut next = self.pending_entry.take();
734        if let Entry::None = next {
735            next = next_dirent()?;
736        }
737        // We only want to synthesize .. if . exists because the . and .. entries get removed if the
738        // directory is unlinked, so if the remote filesystem has removed ., we know to omit the
739        // .. entry.
740        match &next {
741            Entry::Some { name, .. } if self.name(name.clone()) == b"." => {
742                self.pending_entry = Entry::DotDot;
743            }
744            _ => {}
745        }
746        self.current_index += 1;
747        Ok(next)
748    }
749
750    fn rewind(&mut self, proxy: &fio::DirectorySynchronousProxy) -> Result<(), zx::Status> {
751        self.pending_entry = Entry::None;
752        let status = proxy.rewind(zx::MonotonicInstant::INFINITE).map_err(|_| zx::Status::IO)?;
753        zx::Status::ok(status)?;
754        self.buffer.clear();
755        self.offset = 0;
756        self.current_index = 0;
757        Ok(())
758    }
759}
760
761#[derive(Default)]
762enum Entry {
763    // Indicates no more entries.
764    #[default]
765    None,
766
767    Some {
768        ino: u64,
769        entry_type: fio::DirentType,
770        name: Range<usize>,
771    },
772
773    // Indicates dot-dot should be synthesized.
774    DotDot,
775}
776
777impl Entry {
778    fn take(&mut self) -> Entry {
779        std::mem::replace(self, Entry::None)
780    }
781}
782
783impl RemoteDirectory {
784    pub fn new(proxy: fio::DirectorySynchronousProxy) -> Self {
785        Self { proxy, state: Mutex::default() }
786    }
787
788    /// Seeks to `new_index` in the directory.
789    pub fn seek(&self, new_index: u64) -> Result<u64, zx::Status> {
790        let mut state = self.state.lock();
791
792        if new_index < state.current_index {
793            // Our iterator only goes forward, so reset it here.  Note: we *must* rewind it rather
794            // than just create a new iterator because the remote end maintains the offset.
795            state.rewind(&self.proxy)?;
796            state.current_index = 0;
797        }
798
799        // Advance the iterator to catch up with the offset.
800        for i in state.current_index..new_index {
801            match state.next(&self.proxy) {
802                Ok(Entry::Some { .. } | Entry::DotDot) => {}
803                Ok(Entry::None) => break, // No more entries.
804                Err(_) => {
805                    // In order to keep the offset and the iterator in sync, set the new offset
806                    // to be as far as we could get.
807                    // Note that failing the seek here would also cause the iterator and the
808                    // offset to not be in sync, because the iterator has already moved from
809                    // where it was.
810                    return Ok(i);
811                }
812            }
813        }
814
815        Ok(new_index)
816    }
817
818    /// Returns `None` if there are no more entries to be read.  `sink` can choose to return
819    /// `ControlFlow::Break(_)` in which case the entry will be returned the next time `readdir` is
820    /// called.
821    pub fn readdir<B, S: FnMut(u64, fio::DirentType, &[u8]) -> ControlFlow<B, ()>>(
822        &self,
823        mut sink: S,
824    ) -> Result<Option<B>, zx::Status> {
825        let mut state = self.state.lock();
826        loop {
827            let entry = state.next(&self.proxy)?;
828            if let ControlFlow::Break(b) = match &entry {
829                Entry::Some { ino, entry_type, name } => {
830                    sink(*ino, *entry_type, state.name(name.clone()))
831                }
832                Entry::DotDot => sink(0, fio::DirentType::Directory, b".."),
833                Entry::None => break,
834            } {
835                state.pending_entry = entry;
836                return Ok(Some(b));
837            }
838        }
839        Ok(None)
840    }
841
842    /// Wraps fuchsia.io/Node's Sync.
843    pub fn sync(&self) -> Result<(), zx::Status> {
844        self.proxy
845            .sync(zx::MonotonicInstant::INFINITE)
846            .map_err(|_| zx::Status::IO)?
847            .map_err(zx::Status::from_raw)
848    }
849
850    /// Clones (in the fuchsia.unknown.Clonable sense) the underlying proxy.
851    pub fn clone_proxy(&self) -> Result<fio::DirectorySynchronousProxy, zx::Status> {
852        let (client_end, server_end) = zx::Channel::create();
853        self.proxy.clone(server_end.into()).map_err(|_| zx::Status::IO)?;
854        Ok(client_end.into())
855    }
856}
857
858// Hook for test.
859#[cfg(test)]
860thread_local! {
861    pub static TEST_HOOK: std::cell::RefCell<Option<Box<dyn FnMut()>>> =
862        std::cell::RefCell::new(None);
863}
864
865#[cfg(test)]
866mod tests {
867    use super::*;
868    use fidl::endpoints::{ControlHandle, RequestStream};
869    use fuchsia_async as fasync;
870    use futures::StreamExt;
871    use std::sync::Arc;
872    use std::sync::atomic::{AtomicU64, Ordering};
873
874    fn hook(f: Box<dyn FnMut()>) -> impl Drop {
875        TEST_HOOK.with(|hook| {
876            *hook.borrow_mut() = Some(f);
877        });
878        return scopeguard::guard((), |_| {
879            TEST_HOOK.with(|hook| {
880                *hook.borrow_mut() = None;
881            });
882        });
883    }
884
885    #[fuchsia::test]
886    async fn test_read_chunking() {
887        let (client, mut stream) = fidl::endpoints::create_request_stream::<fio::FileMarker>();
888        let content = vec![0xAB; (fio::MAX_TRANSFER_SIZE + 100) as usize];
889        let content_clone = content.clone();
890
891        let _server_task = fasync::Task::spawn(async move {
892            while let Some(Ok(request)) = stream.next().await {
893                match request {
894                    fio::FileRequest::ReadAt { count, offset, responder } => {
895                        let start = offset as usize;
896                        let end = std::cmp::min(start + count as usize, content_clone.len());
897                        let data = if start < content_clone.len() {
898                            &content_clone[start..end]
899                        } else {
900                            &[]
901                        };
902                        responder.send(Ok(data)).unwrap();
903                    }
904                    _ => panic!("Unexpected request: {:?}", request),
905                }
906            }
907        });
908
909        let io = RemoteIo::new(client.into_channel().into());
910        fasync::unblock(move || {
911            let mut data = Vec::new();
912            let actual = io
913                .read(
914                    0,
915                    content.len(),
916                    |chunk| -> Result<usize, zx::Status> {
917                        let len = chunk.len();
918                        data.extend(chunk);
919                        Ok(len)
920                    },
921                    |status| status,
922                )
923                .unwrap();
924            assert_eq!(actual, content.len());
925            assert_eq!(data, content);
926        })
927        .await;
928    }
929
930    #[fuchsia::test]
931    async fn test_read_error_after_data() {
932        let (client, mut stream) = fidl::endpoints::create_request_stream::<fio::FileMarker>();
933        let chunk_size = 100;
934        let content = vec![0xAA; chunk_size];
935        let content_clone = content.clone();
936
937        let _server_task = fasync::Task::spawn(async move {
938            let mut request_count = 0;
939            while let Some(Ok(request)) = stream.next().await {
940                match request {
941                    fio::FileRequest::ReadAt { count: _, offset: _, responder } => {
942                        request_count += 1;
943                        if request_count == 1 {
944                            responder.send(Ok(&content_clone)).unwrap();
945                        } else {
946                            responder.send(Err(zx::sys::ZX_ERR_IO)).unwrap();
947                        }
948                    }
949                    _ => panic!("Unexpected request: {:?}", request),
950                }
951            }
952        });
953
954        let io = RemoteIo::new(client.into_channel().into());
955        fasync::unblock(move || {
956            let mut data = Vec::new();
957            // Ask for more than chunk_size to ensure a second request is made.
958            let actual = io
959                .read(
960                    0,
961                    chunk_size * 2,
962                    |chunk| -> Result<usize, zx::Status> {
963                        let len = chunk.len();
964                        data.extend(chunk);
965                        Ok(len)
966                    },
967                    |status| status,
968                )
969                .expect("read should succeed even if later chunks fail");
970            assert_eq!(actual, chunk_size);
971            assert_eq!(data.len(), chunk_size);
972            assert_eq!(data, content);
973        })
974        .await;
975    }
976
977    #[fuchsia::test]
978    async fn test_write_chunking() {
979        let (client, mut stream) = fidl::endpoints::create_request_stream::<fio::FileMarker>();
980        let content = vec![0xCD; (fio::MAX_TRANSFER_SIZE + 100) as usize];
981        let content2 = content.clone();
982
983        let server_task = fasync::Task::spawn(async move {
984            let mut written = vec![0; content2.len()];
985            while let Some(Ok(request)) = stream.next().await {
986                match request {
987                    fio::FileRequest::WriteAt { offset, data, responder, .. } => {
988                        let offset = offset as usize;
989                        written[offset..offset + data.len()].copy_from_slice(&data);
990                        responder.send(Ok(data.len() as u64)).unwrap();
991                    }
992                    _ => panic!("Unexpected request: {:?}", request),
993                }
994            }
995            assert_eq!(written, content2);
996        });
997
998        let io = RemoteIo::new(client.into_channel().into());
999        fasync::unblock(move || {
1000            let written = io.write(0, &content).expect("write failed");
1001            assert_eq!(written, content.len());
1002        })
1003        .await;
1004
1005        server_task.await;
1006    }
1007
1008    #[fuchsia::test]
1009    async fn test_write_error_after_data() {
1010        let (client, mut stream) = fidl::endpoints::create_request_stream::<fio::FileMarker>();
1011        let chunk_size = fio::MAX_TRANSFER_SIZE as usize;
1012        let content = vec![0xEE; chunk_size * 2];
1013
1014        let _server_task = fasync::Task::spawn(async move {
1015            let mut request_count = 0;
1016            while let Some(Ok(request)) = stream.next().await {
1017                match request {
1018                    fio::FileRequest::WriteAt { offset: _, data, responder, .. } => {
1019                        request_count += 1;
1020                        if request_count == 1 {
1021                            responder.send(Ok(data.len() as u64)).unwrap();
1022                        } else {
1023                            responder.send(Err(zx::sys::ZX_ERR_IO)).unwrap();
1024                        }
1025                    }
1026                    _ => panic!("Unexpected request: {:?}", request),
1027                }
1028            }
1029        });
1030
1031        let io = RemoteIo::new(client.into_channel().into());
1032        fasync::unblock(move || {
1033            let written = io.write(0, &content).expect("write should succeed partial");
1034            assert_eq!(written, chunk_size);
1035        })
1036        .await;
1037    }
1038
1039    #[fuchsia::test]
1040    async fn test_large_directory() {
1041        let (client, mut stream) = fidl::endpoints::create_request_stream::<fio::DirectoryMarker>();
1042        let num_entries = 2000;
1043
1044        let task = fasync::Task::spawn(async move {
1045            let mut sent_count = 0;
1046            let mut num_requests = 0;
1047            while let Some(Ok(request)) = stream.next().await {
1048                match request {
1049                    fio::DirectoryRequest::ReadDirents { max_bytes, responder } => {
1050                        num_requests += 1;
1051                        let mut buffer = Vec::new();
1052                        while sent_count < num_entries {
1053                            let name = if sent_count == 0 {
1054                                ".".to_string()
1055                            } else {
1056                                format!("file_{}", sent_count - 1)
1057                            };
1058                            let name_bytes = name.as_bytes();
1059                            let entry_size = 10 + name_bytes.len();
1060                            if buffer.len() + entry_size > max_bytes as usize {
1061                                break;
1062                            }
1063                            buffer.extend_from_slice(&(sent_count as u64 + 1).to_le_bytes());
1064                            buffer.push(name_bytes.len() as u8);
1065                            let entry_type = if sent_count == 0 {
1066                                fio::DirentType::Directory
1067                            } else {
1068                                fio::DirentType::File
1069                            };
1070                            buffer.push(entry_type.into_primitive());
1071                            buffer.extend_from_slice(name_bytes);
1072                            sent_count += 1;
1073                        }
1074                        let _ = responder.send(0, &buffer);
1075                    }
1076                    fio::DirectoryRequest::Rewind { responder } => {
1077                        sent_count = 0;
1078                        let _ = responder.send(0);
1079                    }
1080                    fio::DirectoryRequest::Close { responder } => {
1081                        let _ = responder.send(Ok(()));
1082                    }
1083                    _ => {}
1084                }
1085            }
1086            assert!(num_requests > 0);
1087        });
1088
1089        let dir = RemoteDirectory::new(client.into_channel().into());
1090        let count = Arc::new(AtomicU64::new(0));
1091        let count2 = count.clone();
1092        fasync::unblock(move || {
1093            dir.readdir::<(), _>(|_ino, _type, _name| {
1094                count.fetch_add(1, Ordering::Relaxed);
1095                ControlFlow::Continue(())
1096            })
1097            .unwrap();
1098        })
1099        .await;
1100        // Expect num_entries + 1 (for synthesized "..")
1101        assert_eq!(count2.load(Ordering::Relaxed), num_entries + 1);
1102        task.await;
1103    }
1104
1105    #[fuchsia::test]
1106    async fn test_seek_backwards() {
1107        let (client, mut stream) = fidl::endpoints::create_request_stream::<fio::DirectoryMarker>();
1108        let _server_task = fasync::Task::spawn(async move {
1109            let entries = vec![
1110                (1, fio::DirentType::Directory, "."),
1111                (2, fio::DirentType::File, "file_0"),
1112                (3, fio::DirentType::File, "file_1"),
1113                (4, fio::DirentType::File, "file_2"),
1114            ];
1115            let mut current_entry = 0;
1116
1117            while let Some(Ok(request)) = stream.next().await {
1118                match request {
1119                    fio::DirectoryRequest::ReadDirents { max_bytes, responder } => {
1120                        let mut buffer = Vec::new();
1121                        while current_entry < entries.len() {
1122                            let (ino, type_, name) = entries[current_entry];
1123                            let name_bytes = name.as_bytes();
1124                            let entry_size = 10 + name_bytes.len();
1125                            if buffer.len() + entry_size > max_bytes as usize {
1126                                break;
1127                            }
1128                            buffer.extend_from_slice(&(ino as u64).to_le_bytes());
1129                            buffer.push(name_bytes.len() as u8);
1130                            buffer.push(type_.into_primitive());
1131                            buffer.extend_from_slice(name_bytes);
1132                            current_entry += 1;
1133                        }
1134                        responder.send(0, &buffer).unwrap();
1135                    }
1136                    fio::DirectoryRequest::Rewind { responder } => {
1137                        current_entry = 0;
1138                        responder.send(0).unwrap();
1139                    }
1140                    _ => {}
1141                }
1142            }
1143        });
1144
1145        let dir = RemoteDirectory::new(client.into_channel().into());
1146        fasync::unblock(move || {
1147            let mut names = Vec::new();
1148            // Read 3 entries: ".", "..", "file_0".
1149            dir.readdir::<(), _>(|_ino, _type, name| {
1150                names.push(name.to_vec());
1151                if names.len() == 3 { ControlFlow::Break(()) } else { ControlFlow::Continue(()) }
1152            })
1153            .unwrap();
1154
1155            assert_eq!(names[0], b".");
1156            assert_eq!(names[1], b"..");
1157            assert_eq!(names[2], b"file_0");
1158
1159            // Seek to 1. This triggers rewind() internally because 1 < current_index (3).
1160            // Index 1 corresponds to "..".
1161            dir.seek(1).unwrap();
1162
1163            let mut names_after_seek = Vec::new();
1164            // Read 2 entries: "..", "file_0".
1165            dir.readdir::<(), _>(|_ino, _type, name| {
1166                names_after_seek.push(name.to_vec());
1167                if names_after_seek.len() == 2 {
1168                    ControlFlow::Break(())
1169                } else {
1170                    ControlFlow::Continue(())
1171                }
1172            })
1173            .unwrap();
1174
1175            assert_eq!(names_after_seek[0], b"..");
1176            assert_eq!(names_after_seek[1], b"file_0");
1177        })
1178        .await;
1179    }
1180
1181    struct DummyFactory;
1182
1183    impl Factory for DummyFactory {
1184        type Result = RemoteIo;
1185        fn create_node(self, io: RemoteIo, _info: fio::NodeInfo) -> Self::Result {
1186            io
1187        }
1188        fn create_directory(self, io: RemoteIo, _info: fio::DirectoryInfo) -> Self::Result {
1189            io
1190        }
1191        fn create_file(self, io: RemoteIo, _info: fio::FileInfo) -> Self::Result {
1192            io
1193        }
1194        fn create_symlink(self, io: RemoteIo, _info: fio::SymlinkInfo) -> Self::Result {
1195            io
1196        }
1197    }
1198
1199    #[fuchsia::test]
1200    async fn test_open_pipelined() {
1201        let (client, stream) = fidl::endpoints::create_request_stream::<fio::DirectoryMarker>();
1202
1203        fn serve_mock_directory(
1204            mut stream: fio::DirectoryRequestStream,
1205            mut expected_paths: Vec<String>,
1206        ) {
1207            fasync::Task::spawn(async move {
1208                if let Some(Ok(request)) = stream.next().await {
1209                    match request {
1210                        fio::DirectoryRequest::Open { path, flags, options: _, object, .. } => {
1211                            if !expected_paths.is_empty() {
1212                                let expected = expected_paths.remove(0);
1213                                assert_eq!(path, expected);
1214                                if expected == "path1" {
1215                                    assert!(flags.contains(fio::Flags::PROTOCOL_DIRECTORY));
1216                                } else if expected == "path2" {
1217                                    assert!(!flags.contains(fio::Flags::PROTOCOL_DIRECTORY));
1218                                }
1219                            }
1220                            let server_end =
1221                                fidl::endpoints::ServerEnd::<fio::DirectoryMarker>::new(object);
1222                            let dir_stream = server_end.into_stream();
1223                            let control_handle = dir_stream.control_handle();
1224                            let representation =
1225                                fio::Representation::Directory(fio::DirectoryInfo::default());
1226                            control_handle.send_on_representation(representation).unwrap();
1227
1228                            serve_mock_directory(dir_stream, expected_paths);
1229                        }
1230                        _ => {}
1231                    }
1232                }
1233            })
1234            .detach();
1235        }
1236
1237        serve_mock_directory(stream, vec!["path1".to_string(), "path2".to_string()]);
1238
1239        let io = RemoteIo::new(client.into_channel().into());
1240        fasync::unblock(move || {
1241            let results = io.open_pipelined(
1242                &["path1", "path2"],
1243                fio::Flags::empty(),
1244                fio::NodeAttributesQuery::empty(),
1245                || DummyFactory,
1246            );
1247            assert_eq!(results.len(), 2);
1248            assert!(results.iter().all(|r| r.is_ok()));
1249        })
1250        .await;
1251    }
1252
1253    #[fuchsia::test]
1254    async fn test_open_pipelined_not_found() {
1255        let (client, stream) = fidl::endpoints::create_request_stream::<fio::DirectoryMarker>();
1256
1257        fn serve_mock_directory_not_found(mut stream: fio::DirectoryRequestStream) {
1258            fasync::Task::spawn(async move {
1259                if let Some(Ok(request)) = stream.next().await {
1260                    match request {
1261                        fio::DirectoryRequest::Open {
1262                            path, flags: _, options: _, object, ..
1263                        } => {
1264                            let server_end =
1265                                fidl::endpoints::ServerEnd::<fio::DirectoryMarker>::new(object);
1266                            let dir_stream = server_end.into_stream();
1267                            let control_handle = dir_stream.control_handle();
1268
1269                            if path == "not_found" {
1270                                control_handle.shutdown_with_epitaph(zx::Status::NOT_FOUND);
1271                            } else {
1272                                let representation =
1273                                    fio::Representation::Directory(fio::DirectoryInfo::default());
1274                                control_handle.send_on_representation(representation).unwrap();
1275                                serve_mock_directory_not_found(dir_stream);
1276                            }
1277                        }
1278                        _ => {}
1279                    }
1280                }
1281            })
1282            .detach();
1283        }
1284
1285        serve_mock_directory_not_found(stream);
1286
1287        let io = RemoteIo::new(client.into_channel().into());
1288        fasync::unblock(move || {
1289            let results = io.open_pipelined(
1290                &["path1", "not_found", "path3"],
1291                fio::Flags::empty(),
1292                fio::NodeAttributesQuery::empty(),
1293                || DummyFactory,
1294            );
1295            assert_eq!(results.len(), 2);
1296            assert!(results[0].is_ok());
1297            assert_eq!(results[1].as_ref().err(), Some(&zx::Status::NOT_FOUND));
1298        })
1299        .await;
1300    }
1301
1302    #[fuchsia::test]
1303    async fn test_open_pipelined_peer_closed() {
1304        let (client, stream) = fidl::endpoints::create_request_stream::<fio::DirectoryMarker>();
1305
1306        fn serve_mock_directory_close_early(mut stream: fio::DirectoryRequestStream) {
1307            fasync::Task::spawn(async move {
1308                if let Some(Ok(request)) = stream.next().await {
1309                    match request {
1310                        fio::DirectoryRequest::Open { object, .. } => {
1311                            // We just drop the object (the server_end of the channel), closing it.
1312                            drop(object);
1313                        }
1314                        _ => {}
1315                    }
1316                }
1317            })
1318            .detach();
1319        }
1320
1321        serve_mock_directory_close_early(stream);
1322
1323        let io = RemoteIo::new(client.into_channel().into());
1324        fasync::unblock(move || {
1325            // Give the server time to drop the channel before the second open
1326            // We can't easily yield from synchronous code, but we can sleep.
1327            let _guard = hook(Box::new(|| {
1328                std::thread::sleep(std::time::Duration::from_millis(100));
1329            }));
1330            let results = io.open_pipelined(
1331                &["path1", "path2"],
1332                fio::Flags::empty(),
1333                fio::NodeAttributesQuery::empty(),
1334                || DummyFactory,
1335            );
1336            assert_eq!(results.len(), 1);
1337            assert_eq!(results[0].as_ref().err(), Some(&zx::Status::PEER_CLOSED));
1338        })
1339        .await;
1340    }
1341}