blobfs/
lib.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![deny(missing_docs)]
6
7//! Typesafe wrappers around the /blob filesystem.
8
9use fidl::endpoints::ClientEnd;
10use fuchsia_hash::{Hash, ParseHashError};
11use futures::{StreamExt as _, stream};
12use log::{error, info};
13use std::collections::HashSet;
14use thiserror::Error;
15use vfs::execution_scope::ExecutionScope;
16use vfs::file::StreamIoConnection;
17use vfs::{ObjectRequest, ObjectRequestRef, ProtocolsExt};
18use zx::{self as zx, Status};
19use {fidl_fuchsia_fxfs as ffxfs, fidl_fuchsia_io as fio};
20
21pub mod mock;
22pub use mock::Mock;
23
24/// Blobfs client errors.
25#[derive(Debug, Error)]
26#[allow(missing_docs)]
27pub enum BlobfsError {
28    #[error("while opening blobfs dir")]
29    OpenDir(#[from] fuchsia_fs::node::OpenError),
30
31    #[error("while cloning the blobfs dir")]
32    CloneDir(#[from] fuchsia_fs::node::CloneError),
33
34    #[error("while listing blobfs dir")]
35    ReadDir(#[source] fuchsia_fs::directory::EnumerateError),
36
37    #[error("while deleting blob")]
38    Unlink(#[source] Status),
39
40    #[error("while sync'ing")]
41    Sync(#[source] Status),
42
43    #[error("while parsing blob merkle hash")]
44    ParseHash(#[from] ParseHashError),
45
46    #[error("FIDL error")]
47    Fidl(#[from] fidl::Error),
48
49    #[error("while connecting to fuchsia.fxfs/BlobCreator")]
50    ConnectToBlobCreator(#[source] anyhow::Error),
51
52    #[error("while connecting to fuchsia.fxfs/BlobReader")]
53    ConnectToBlobReader(#[source] anyhow::Error),
54
55    #[error("while setting the VmexResource")]
56    InitVmexResource(#[source] anyhow::Error),
57}
58
59/// An error encountered while creating a blob
60#[derive(Debug, Error)]
61#[allow(missing_docs)]
62pub enum CreateError {
63    #[error("the blob already exists or is being concurrently written")]
64    AlreadyExists,
65
66    #[error("while creating the blob")]
67    Io(#[source] fuchsia_fs::node::OpenError),
68
69    #[error("while converting the proxy into a client end")]
70    ConvertToClientEnd,
71
72    #[error("FIDL error")]
73    Fidl(#[from] fidl::Error),
74
75    #[error("while calling fuchsia.fxfs/BlobCreator.Create: {0:?}")]
76    BlobCreator(ffxfs::CreateBlobError),
77
78    #[error("this client was not created with a blob creator so it cannot write blobs")]
79    WritingNotConfigured,
80}
81
82impl From<ffxfs::CreateBlobError> for CreateError {
83    fn from(e: ffxfs::CreateBlobError) -> Self {
84        match e {
85            ffxfs::CreateBlobError::AlreadyExists => CreateError::AlreadyExists,
86            e @ ffxfs::CreateBlobError::Internal => CreateError::BlobCreator(e),
87        }
88    }
89}
90
91/// A builder for [`Client`]
92#[derive(Default)]
93pub struct ClientBuilder {
94    readable: bool,
95    writable: bool,
96    executable: bool,
97}
98
99impl ClientBuilder {
100    /// Opens the /blob directory in the component's namespace with readable, writable, and/or
101    /// executable flags. Connects to the fuchsia.fxfs.BlobCreator and BlobReader if requested.
102    /// Connects to and initializes the VmexResource if `use_vmex` is set. Returns a `Client`.
103    pub async fn build(self) -> Result<Client, BlobfsError> {
104        let mut flags = fio::Flags::empty();
105        if self.readable {
106            flags |= fio::PERM_READABLE
107        }
108        if self.writable {
109            flags |= fio::PERM_WRITABLE
110        }
111        if self.executable {
112            flags |= fio::PERM_EXECUTABLE
113        }
114        let dir = fuchsia_fs::directory::open_in_namespace("/blob", flags)?;
115        if let Ok(client) = fuchsia_component::client::connect_to_protocol::<
116            fidl_fuchsia_kernel::VmexResourceMarker,
117        >() && let Ok(vmex) = client.get().await
118        {
119            info!("Got vmex resource");
120            vmo_blob::init_vmex_resource(vmex).map_err(BlobfsError::InitVmexResource)?;
121        }
122        let reader = fuchsia_component::client::connect_to_protocol::<ffxfs::BlobReaderMarker>()
123            .map_err(BlobfsError::ConnectToBlobReader)?;
124        let creator = if self.writable {
125            Some(
126                fuchsia_component::client::connect_to_protocol::<ffxfs::BlobCreatorMarker>()
127                    .map_err(BlobfsError::ConnectToBlobCreator)?,
128            )
129        } else {
130            None
131        };
132
133        Ok(Client { dir, creator, reader })
134    }
135
136    /// If set, [`Client`] will connect to /blob in the current component's namespace with
137    /// [`fio::PERM_READABLE`].
138    pub fn readable(self) -> Self {
139        Self { readable: true, ..self }
140    }
141
142    /// If set, [`Client`] will connect to /blob in the current component's namespace with
143    /// [`fio::PERM_WRITABLE`] which needed so that [`Client::delete_blob`] can unlink the file,
144    /// and [`Client`] will connect to and use fuchsia.fxfs/BlobCreator for writes.
145    pub fn writable(self) -> Self {
146        Self { writable: true, ..self }
147    }
148
149    /// If set, [`Client`] will connect to /blob in the current component's namespace with
150    /// [`fio::PERM_EXECUTABLE`].
151    pub fn executable(self) -> Self {
152        Self { executable: true, ..self }
153    }
154}
155
156impl Client {
157    /// Create an empty `ClientBuilder`
158    pub fn builder() -> ClientBuilder {
159        Default::default()
160    }
161}
162/// Blobfs client
163#[derive(Debug, Clone)]
164pub struct Client {
165    dir: fio::DirectoryProxy,
166    creator: Option<ffxfs::BlobCreatorProxy>,
167    reader: ffxfs::BlobReaderProxy,
168}
169
170impl Client {
171    /// Returns a client connected to the given blob directory, BlobCreatorProxy, and
172    /// BlobReaderProxy. If `vmex` is passed in, sets the VmexResource, which is used to mark blobs
173    /// as executable. If `creator` is not supplied, writes will fail.
174    pub fn new(
175        dir: fio::DirectoryProxy,
176        creator: Option<ffxfs::BlobCreatorProxy>,
177        reader: ffxfs::BlobReaderProxy,
178        vmex: Option<zx::Resource>,
179    ) -> Result<Self, anyhow::Error> {
180        if let Some(vmex) = vmex {
181            vmo_blob::init_vmex_resource(vmex)?;
182        }
183        Ok(Self { dir, creator, reader })
184    }
185
186    /// Creates a new client backed by the returned request stream. This constructor should not be
187    /// used outside of tests.
188    ///
189    /// # Panics
190    ///
191    /// Panics on error
192    pub fn new_test() -> (
193        Self,
194        fio::DirectoryRequestStream,
195        ffxfs::BlobReaderRequestStream,
196        ffxfs::BlobCreatorRequestStream,
197    ) {
198        let (dir, dir_stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
199        let (reader, reader_stream) =
200            fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobReaderMarker>();
201        let (creator, creator_stream) =
202            fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
203
204        (Self { dir, creator: Some(creator), reader }, dir_stream, reader_stream, creator_stream)
205    }
206
207    /// Creates a new client backed by the returned mock. This constructor should not be used
208    /// outside of tests.
209    ///
210    /// # Panics
211    ///
212    /// Panics on error
213    pub fn new_mock() -> (Self, mock::Mock) {
214        let (dir, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
215        let (reader, reader_stream) =
216            fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobReaderMarker>();
217        let (creator, creator_stream) =
218            fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
219
220        (
221            Self { dir, creator: Some(creator), reader },
222            mock::Mock { stream, reader_stream, creator_stream },
223        )
224    }
225
226    /// Returns the read-only VMO backing the blob.
227    pub async fn get_blob_vmo(&self, hash: &Hash) -> Result<zx::Vmo, GetBlobVmoError> {
228        self.reader
229            .get_vmo(hash)
230            .await
231            .map_err(GetBlobVmoError::Fidl)?
232            .map_err(|s| GetBlobVmoError::GetVmo(Status::from_raw(s)))
233    }
234
235    /// Open a blob for read using open3. `scope` will only be used if the client was configured to
236    /// use fuchsia.fxfs.BlobReader.
237    pub fn open_blob_for_read(
238        &self,
239        blob: &Hash,
240        flags: fio::Flags,
241        scope: ExecutionScope,
242        object_request: ObjectRequestRef<'_>,
243    ) -> Result<(), zx::Status> {
244        if flags.rights().is_some_and(|rights| rights.contains(fio::Operations::WRITE_BYTES)) {
245            return Err(zx::Status::ACCESS_DENIED);
246        }
247        if flags.creation_mode() != vfs::CreationMode::Never {
248            return Err(zx::Status::NOT_SUPPORTED);
249        }
250        // Errors below will be communicated via the `object_request` channel.
251        let object_request = object_request.take();
252        let () = open_blob_with_reader(self.reader.clone(), *blob, scope, flags, object_request);
253        Ok(())
254    }
255
256    /// Returns the list of known blobs in blobfs.
257    pub async fn list_known_blobs(&self) -> Result<HashSet<Hash>, BlobfsError> {
258        // fuchsia.io.Directory.ReadDirents uses a per-connection index into the array of
259        // directory entries. To prevent contention over this index by concurrent calls (either
260        // from concurrent calls to list_known_blobs on this object, or on clones of this object,
261        // or other clones of the DirectoryProxy this object was made from), create a new
262        // connection which will have its own index.
263        let private_connection = fuchsia_fs::directory::clone(&self.dir)?;
264        fuchsia_fs::directory::readdir(&private_connection)
265            .await
266            .map_err(BlobfsError::ReadDir)?
267            .into_iter()
268            .filter(|entry| entry.kind == fuchsia_fs::directory::DirentKind::File)
269            .map(|entry| entry.name.parse().map_err(BlobfsError::ParseHash))
270            .collect()
271    }
272
273    /// Delete the blob with the given merkle hash.
274    pub async fn delete_blob(&self, blob: &Hash) -> Result<(), BlobfsError> {
275        self.dir
276            .unlink(&blob.to_string(), &fio::UnlinkOptions::default())
277            .await?
278            .map_err(|s| BlobfsError::Unlink(Status::from_raw(s)))
279    }
280
281    /// Open a new blob for write.
282    pub async fn open_blob_for_write(
283        &self,
284        blob: &Hash,
285        allow_existing: bool,
286    ) -> Result<ClientEnd<ffxfs::BlobWriterMarker>, CreateError> {
287        let Some(creator) = &self.creator else {
288            return Err(CreateError::WritingNotConfigured);
289        };
290        Ok(creator.create(blob, allow_existing).await??)
291    }
292
293    /// Returns whether blobfs has a blob with the given hash.
294    /// On c++blobfs, this should only be called if there are no concurrent attempts to write the
295    /// blob. On c++blobfs, open connections to even partially written blobs keep the blob alive,
296    /// and so if this call overlaps with a concurrent attempt to create the blob that fails and
297    /// is then retried, this open connection will prevent the partially written blob from being
298    /// removed and block the creation of the new write connection.
299    pub async fn has_blob(&self, blob: &Hash) -> bool {
300        // TODO(https://fxbug.dev/295552228): Use faster API for determining blob presence.
301        matches!(self.reader.get_vmo(blob).await, Ok(Ok(_)))
302    }
303
304    /// Determines which blobs of `candidates` are missing from blobfs.
305    /// TODO(https://fxbug.dev/338477132) This fn is used during resolves after a meta.far is
306    /// fetched to determine which content blobs and subpackage meta.fars need to be fetched.
307    /// On c++blobfs, opening a partially written blob keeps that blob alive, creating the
308    /// following race condition:
309    /// 1. blob is partially written by resolve A
310    /// 2. blob is opened by this fn to check for presence by concurrent resolve B
311    /// 3. resolve A encounters an error and retries the fetch, which attempts to open the blob for
312    ///    write, which collides with the partially written blob from (1) that is being kept alive
313    ///    by (2) and so fails
314    pub async fn filter_to_missing_blobs(
315        &self,
316        candidates: impl IntoIterator<Item = Hash>,
317    ) -> HashSet<Hash> {
318        // Attempt to open each blob instead of using ReadDirents to catch more forms of filesystem
319        // metadata corruption.
320        // We don't use ReadDirents even as a pre-filter because emulator testing suggests
321        // ReadDirents on an fxblob with 1,000 blobs takes as long as ~60 sequential has_blob calls
322        // on missing blobs, and it's about 5x worse on c++blobfs (on which both ReadDirents is
323        // slower and has_blob is faster). The minor speedup on packages with a great number of
324        // missing blobs is not worth a rarely taken branch deep within package resolution.
325        stream::iter(candidates)
326            .map(move |blob| async move {
327                if self.has_blob(&blob).await {
328                    None
329                } else {
330                    Some(blob)
331                }
332            })
333            // Emulator testing suggests both c++blobfs and fxblob show diminishing returns after
334            // even three concurrent `has_blob` calls.
335            .buffer_unordered(10)
336            .filter_map(|blob| async move { blob })
337            .collect()
338            .await
339    }
340
341    /// Call fuchsia.io/Node.Sync on the blobfs directory.
342    pub async fn sync(&self) -> Result<(), BlobfsError> {
343        self.dir.sync().await?.map_err(zx::Status::from_raw).map_err(BlobfsError::Sync)
344    }
345}
346
347/// Spawns a task on `scope` to attempt opening `blob` via `reader`. Creates a file connection to
348/// the blob using [`vmo_blob::VmoBlob`]. Errors will be sent via `object_request` asynchronously.
349fn open_blob_with_reader<P: ProtocolsExt + Send>(
350    reader: ffxfs::BlobReaderProxy,
351    blob_hash: Hash,
352    scope: ExecutionScope,
353    protocols: P,
354    object_request: ObjectRequest,
355) {
356    scope.clone().spawn(object_request.handle_async(async move |object_request| {
357        let get_vmo_result = reader.get_vmo(&blob_hash.into()).await.map_err(|fidl_error| {
358            if let fidl::Error::ClientChannelClosed { status, .. } = fidl_error {
359                error!("Blob reader channel closed: {:?}", status);
360                status
361            } else {
362                error!("Transport error on get_vmo: {:?}", fidl_error);
363                zx::Status::INTERNAL
364            }
365        })?;
366        let vmo = get_vmo_result.map_err(zx::Status::from_raw)?;
367        let vmo_blob = vmo_blob::VmoBlob::new(vmo);
368        object_request
369            .create_connection::<StreamIoConnection<_>, _>(scope, vmo_blob, protocols)
370            .await
371    }));
372}
373
374#[derive(thiserror::Error, Debug)]
375#[allow(missing_docs)]
376pub enum GetBlobVmoError {
377    #[error("getting the vmo")]
378    GetVmo(#[source] Status),
379
380    #[error("opening the blob")]
381    OpenBlob(#[source] fuchsia_fs::node::OpenError),
382
383    #[error("making a fidl request")]
384    Fidl(#[source] fidl::Error),
385}
386
387#[cfg(test)]
388impl Client {
389    /// Constructs a new [`Client`] connected to the provided [`BlobfsRamdisk`]. Tests in this
390    /// crate should use this constructor rather than [`BlobfsRamdisk::client`], which returns
391    /// the non-cfg(test) build of this crate's [`blobfs::Client`]. While tests could use the
392    /// [`blobfs::Client`] returned by [`BlobfsRamdisk::client`], it will be a different type than
393    /// [`super::Client`], and the tests could not access its private members or any cfg(test)
394    /// specific functionality.
395    ///
396    /// # Panics
397    ///
398    /// Panics on error.
399    pub fn for_ramdisk(blobfs: &blobfs_ramdisk::BlobfsRamdisk) -> Self {
400        Self::new(
401            blobfs.root_dir_proxy().unwrap(),
402            Some(blobfs.blob_creator_proxy().unwrap()),
403            blobfs.blob_reader_proxy().unwrap(),
404            None,
405        )
406        .unwrap()
407    }
408}
409
410#[cfg(test)]
411#[allow(clippy::bool_assert_comparison)]
412mod tests {
413    use super::*;
414    use assert_matches::assert_matches;
415    use blobfs_ramdisk::BlobfsRamdisk;
416    use fuchsia_async as fasync;
417    use futures::stream::TryStreamExt as _;
418    use std::sync::Arc;
419    use test_case::test_case;
420
421    #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
422    #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
423    #[fuchsia::test]
424    async fn list_known_blobs_empty(blob_impl: blobfs_ramdisk::Implementation) {
425        let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
426        let client = Client::for_ramdisk(&blobfs);
427
428        assert_eq!(client.list_known_blobs().await.unwrap(), HashSet::new());
429        blobfs.stop().await.unwrap();
430    }
431
432    #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
433    #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
434    #[fuchsia::test]
435    async fn list_known_blobs(blob_impl: blobfs_ramdisk::Implementation) {
436        let blobfs = BlobfsRamdisk::builder()
437            .implementation(blob_impl)
438            .with_blob(&b"blob 1"[..])
439            .with_blob(&b"blob 2"[..])
440            .start()
441            .await
442            .unwrap();
443        let client = Client::for_ramdisk(&blobfs);
444
445        let expected = blobfs.list_blobs().unwrap().into_iter().collect();
446        assert_eq!(client.list_known_blobs().await.unwrap(), expected);
447        blobfs.stop().await.unwrap();
448    }
449
450    #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
451    #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
452    #[fuchsia::test]
453    async fn delete_blob_and_then_list(blob_impl: blobfs_ramdisk::Implementation) {
454        let blobfs = BlobfsRamdisk::builder()
455            .implementation(blob_impl)
456            .with_blob(&b"blob 1"[..])
457            .with_blob(&b"blob 2"[..])
458            .start()
459            .await
460            .unwrap();
461        let client = Client::for_ramdisk(&blobfs);
462
463        let merkle = fuchsia_merkle::root_from_slice(b"blob 1");
464        assert_matches!(client.delete_blob(&merkle).await, Ok(()));
465
466        let expected = HashSet::from([fuchsia_merkle::root_from_slice(b"blob 2")]);
467        assert_eq!(client.list_known_blobs().await.unwrap(), expected);
468        blobfs.stop().await.unwrap();
469    }
470
471    #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
472    #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
473    #[fuchsia::test]
474    async fn delete_nonexistent_blob(blob_impl: blobfs_ramdisk::Implementation) {
475        let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
476        let client = Client::for_ramdisk(&blobfs);
477        let blob_merkle = Hash::from([1; 32]);
478
479        assert_matches!(
480            client.delete_blob(&blob_merkle).await,
481            Err(BlobfsError::Unlink(Status::NOT_FOUND))
482        );
483        blobfs.stop().await.unwrap();
484    }
485
486    #[fuchsia::test]
487    async fn delete_blob_mock() {
488        let (client, mut stream, _, _) = Client::new_test();
489        let blob_merkle = Hash::from([1; 32]);
490        fasync::Task::spawn(async move {
491            match stream.try_next().await.unwrap().unwrap() {
492                fio::DirectoryRequest::Unlink { name, responder, .. } => {
493                    assert_eq!(name, blob_merkle.to_string());
494                    responder.send(Ok(())).unwrap();
495                }
496                other => panic!("unexpected request: {other:?}"),
497            }
498        })
499        .detach();
500
501        assert_matches!(client.delete_blob(&blob_merkle).await, Ok(()));
502    }
503
504    #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
505    #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
506    #[fuchsia::test]
507    async fn has_blob(blob_impl: blobfs_ramdisk::Implementation) {
508        let blobfs = BlobfsRamdisk::builder()
509            .implementation(blob_impl)
510            .with_blob(&b"blob 1"[..])
511            .start()
512            .await
513            .unwrap();
514        let client = Client::for_ramdisk(&blobfs);
515
516        assert!(client.has_blob(&fuchsia_merkle::root_from_slice(b"blob 1")).await);
517        assert!(!client.has_blob(&Hash::from([1; 32])).await);
518
519        blobfs.stop().await.unwrap();
520    }
521
522    #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
523    #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
524    #[fuchsia::test]
525    async fn has_blob_return_false_if_blob_is_partially_written(
526        blob_impl: blobfs_ramdisk::Implementation,
527    ) {
528        let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
529        let client = Client::for_ramdisk(&blobfs);
530
531        let content = &[3; 1024];
532        let hash = fuchsia_merkle::root_from_slice(content);
533        let delivery_content =
534            delivery_blob::Type1Blob::generate(content, delivery_blob::CompressionMode::Always);
535
536        let writer = client.open_blob_for_write(&hash, false).await.unwrap().into_proxy();
537        assert!(!client.has_blob(&hash).await);
538
539        let n = delivery_content.len();
540        let vmo = writer.get_vmo(n.try_into().unwrap()).await.unwrap().unwrap();
541        assert!(!client.has_blob(&hash).await);
542
543        let () = vmo.write(&delivery_content[0..n - 1], 0).unwrap();
544        let () = writer.bytes_ready((n - 1).try_into().unwrap()).await.unwrap().unwrap();
545        assert!(!client.has_blob(&hash).await);
546
547        let () = vmo.write(&delivery_content[n - 1..], (n - 1).try_into().unwrap()).unwrap();
548        let () = writer.bytes_ready(1.try_into().unwrap()).await.unwrap().unwrap();
549        assert!(client.has_blob(&hash).await);
550
551        blobfs.stop().await.unwrap();
552    }
553
554    async fn fully_write_blob(client: &Client, content: &[u8]) -> Hash {
555        let hash = fuchsia_merkle::root_from_slice(content);
556        let delivery_content =
557            delivery_blob::Type1Blob::generate(content, delivery_blob::CompressionMode::Always);
558        let writer = client.open_blob_for_write(&hash, false).await.unwrap().into_proxy();
559        let vmo = writer
560            .get_vmo(delivery_content.len().try_into().unwrap())
561            .await
562            .expect("a")
563            .map_err(zx::Status::from_raw)
564            .expect("b");
565        let () = vmo.write(&delivery_content, 0).unwrap();
566        let () =
567            writer.bytes_ready(delivery_content.len().try_into().unwrap()).await.unwrap().unwrap();
568        hash
569    }
570
571    #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
572    #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
573    #[fuchsia::test]
574    async fn filter_to_missing_blobs(blob_impl: blobfs_ramdisk::Implementation) {
575        let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
576        let client = Client::for_ramdisk(&blobfs);
577
578        let missing_hash0 = Hash::from([0; 32]);
579        let missing_hash1 = Hash::from([1; 32]);
580
581        let present_blob0 = fully_write_blob(&client, &[2; 1024]).await;
582        let present_blob1 = fully_write_blob(&client, &[3; 1024]).await;
583
584        assert_eq!(
585            client
586                .filter_to_missing_blobs([
587                    missing_hash0,
588                    missing_hash1,
589                    present_blob0,
590                    present_blob1
591                ])
592                .await,
593            HashSet::from([missing_hash0, missing_hash1])
594        );
595
596        blobfs.stop().await.unwrap();
597    }
598
599    #[fuchsia::test]
600    async fn sync() {
601        let counter = Arc::new(std::sync::atomic::AtomicU32::new(0));
602        let counter_clone = Arc::clone(&counter);
603        let (client, mut stream, _, _) = Client::new_test();
604        fasync::Task::spawn(async move {
605            match stream.try_next().await.unwrap().unwrap() {
606                fio::DirectoryRequest::Sync { responder } => {
607                    counter_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
608                    responder.send(Ok(())).unwrap();
609                }
610                other => panic!("unexpected request: {other:?}"),
611            }
612        })
613        .detach();
614
615        assert_matches!(client.sync().await, Ok(()));
616        assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1);
617    }
618
619    #[fuchsia::test]
620    async fn open_blob_for_write_maps_already_exists() {
621        let (blob_creator, mut blob_creator_stream) =
622            fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
623        let (blob_reader, _) = fidl::endpoints::create_proxy::<ffxfs::BlobReaderMarker>();
624
625        let client = Client::new(
626            fidl::endpoints::create_proxy::<fio::DirectoryMarker>().0,
627            Some(blob_creator),
628            blob_reader,
629            None,
630        )
631        .unwrap();
632
633        fuchsia_async::Task::spawn(async move {
634            match blob_creator_stream.next().await.unwrap().unwrap() {
635                ffxfs::BlobCreatorRequest::Create { hash, allow_existing, responder } => {
636                    assert_eq!(hash, [0; 32]);
637                    assert!(!allow_existing);
638                    let () = responder.send(Err(ffxfs::CreateBlobError::AlreadyExists)).unwrap();
639                }
640                ffxfs::BlobCreatorRequest::NeedsOverwrite { .. } => {
641                    unreachable!("This code path is not yet exercised.");
642                }
643            }
644        })
645        .detach();
646
647        assert_matches!(
648            client.open_blob_for_write(&[0; 32].into(), false).await,
649            Err(CreateError::AlreadyExists)
650        );
651    }
652
653    #[fuchsia::test]
654    async fn concurrent_list_known_blobs_all_return_full_contents() {
655        use futures::StreamExt;
656        let blobfs = BlobfsRamdisk::builder().start().await.unwrap();
657        let client = Client::for_ramdisk(&blobfs);
658
659        // ReadDirents returns an 8,192 byte buffer, and each entry is 74 bytes [0] (including 64
660        // bytes of filename), so use more than 110 entries to guarantee that listing all contents
661        // requires multiple ReadDirents calls. This isn't necessary to cause conflict, because
662        // each successful listing requires a call to Rewind as well, but it does make conflict
663        // more likely.
664        // [0] https://cs.opensource.google/fuchsia/fuchsia/+/main:sdk/fidl/fuchsia.io/directory.fidl;l=261;drc=9e84e19d3f42240c46d2b0c3c132c2f0b5a3343f
665        for i in 0..256u16 {
666            let _: Hash = fully_write_blob(&client, i.to_le_bytes().as_slice()).await;
667        }
668
669        let () = futures::stream::iter(0..100)
670            .for_each_concurrent(None, |_| async {
671                assert_eq!(client.list_known_blobs().await.unwrap().len(), 256);
672            })
673            .await;
674    }
675}