blobfs/
lib.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![deny(missing_docs)]
6
7//! Typesafe wrappers around the /blob filesystem.
8
9use fidl::endpoints::{ClientEnd, ServerEnd};
10use fuchsia_hash::{Hash, ParseHashError};
11use futures::{StreamExt as _, stream};
12use log::{error, info};
13use std::collections::HashSet;
14use thiserror::Error;
15use vfs::common::send_on_open_with_error;
16use vfs::execution_scope::ExecutionScope;
17use vfs::file::StreamIoConnection;
18use vfs::{ObjectRequest, ObjectRequestRef, ProtocolsExt, ToObjectRequest as _};
19use zx::Status;
20use {fidl_fuchsia_fxfs as ffxfs, fidl_fuchsia_io as fio};
21
22pub mod mock;
23pub use mock::Mock;
24
25/// Blobfs client errors.
26#[derive(Debug, Error)]
27#[allow(missing_docs)]
28pub enum BlobfsError {
29    #[error("while opening blobfs dir")]
30    OpenDir(#[from] fuchsia_fs::node::OpenError),
31
32    #[error("while cloning the blobfs dir")]
33    CloneDir(#[from] fuchsia_fs::node::CloneError),
34
35    #[error("while listing blobfs dir")]
36    ReadDir(#[source] fuchsia_fs::directory::EnumerateError),
37
38    #[error("while deleting blob")]
39    Unlink(#[source] Status),
40
41    #[error("while sync'ing")]
42    Sync(#[source] Status),
43
44    #[error("while parsing blob merkle hash")]
45    ParseHash(#[from] ParseHashError),
46
47    #[error("FIDL error")]
48    Fidl(#[from] fidl::Error),
49
50    #[error("while connecting to fuchsia.fxfs/BlobCreator")]
51    ConnectToBlobCreator(#[source] anyhow::Error),
52
53    #[error("while connecting to fuchsia.fxfs/BlobReader")]
54    ConnectToBlobReader(#[source] anyhow::Error),
55
56    #[error("while setting the VmexResource")]
57    InitVmexResource(#[source] anyhow::Error),
58}
59
60/// An error encountered while creating a blob
61#[derive(Debug, Error)]
62#[allow(missing_docs)]
63pub enum CreateError {
64    #[error("the blob already exists or is being concurrently written")]
65    AlreadyExists,
66
67    #[error("while creating the blob")]
68    Io(#[source] fuchsia_fs::node::OpenError),
69
70    #[error("while converting the proxy into a client end")]
71    ConvertToClientEnd,
72
73    #[error("FIDL error")]
74    Fidl(#[from] fidl::Error),
75
76    #[error("while calling fuchsia.fxfs/BlobCreator.Create: {0:?}")]
77    BlobCreator(ffxfs::CreateBlobError),
78
79    #[error("this client was not created with a blob creator so it cannot write blobs")]
80    WritingNotConfigured,
81}
82
83impl From<ffxfs::CreateBlobError> for CreateError {
84    fn from(e: ffxfs::CreateBlobError) -> Self {
85        match e {
86            ffxfs::CreateBlobError::AlreadyExists => CreateError::AlreadyExists,
87            e @ ffxfs::CreateBlobError::Internal => CreateError::BlobCreator(e),
88        }
89    }
90}
91
92/// A builder for [`Client`]
93#[derive(Default)]
94pub struct ClientBuilder {
95    readable: bool,
96    writable: bool,
97    executable: bool,
98}
99
100impl ClientBuilder {
101    /// Opens the /blob directory in the component's namespace with readable, writable, and/or
102    /// executable flags. Connects to the fuchsia.fxfs.BlobCreator and BlobReader if requested.
103    /// Connects to and initializes the VmexResource if `use_vmex` is set. Returns a `Client`.
104    pub async fn build(self) -> Result<Client, BlobfsError> {
105        let mut flags = fio::Flags::empty();
106        if self.readable {
107            flags |= fio::PERM_READABLE
108        }
109        if self.writable {
110            flags |= fio::PERM_WRITABLE
111        }
112        if self.executable {
113            flags |= fio::PERM_EXECUTABLE
114        }
115        let dir = fuchsia_fs::directory::open_in_namespace("/blob", flags)?;
116        if let Ok(client) = fuchsia_component::client::connect_to_protocol::<
117            fidl_fuchsia_kernel::VmexResourceMarker,
118        >() {
119            if let Ok(vmex) = client.get().await {
120                info!("Got vmex resource");
121                vmo_blob::init_vmex_resource(vmex).map_err(BlobfsError::InitVmexResource)?;
122            }
123        }
124        let reader = fuchsia_component::client::connect_to_protocol::<ffxfs::BlobReaderMarker>()
125            .map_err(BlobfsError::ConnectToBlobReader)?;
126        let creator = if self.writable {
127            Some(
128                fuchsia_component::client::connect_to_protocol::<ffxfs::BlobCreatorMarker>()
129                    .map_err(BlobfsError::ConnectToBlobCreator)?,
130            )
131        } else {
132            None
133        };
134
135        Ok(Client { dir, creator, reader })
136    }
137
138    /// If set, [`Client`] will connect to /blob in the current component's namespace with
139    /// [`fio::PERM_READABLE`].
140    pub fn readable(self) -> Self {
141        Self { readable: true, ..self }
142    }
143
144    /// If set, [`Client`] will connect to /blob in the current component's namespace with
145    /// [`fio::PERM_WRITABLE`] which needed so that [`Client::delete_blob`] can unlink the file,
146    /// and [`Client`] will connect to and use fuchsia.fxfs/BlobCreator for writes.
147    pub fn writable(self) -> Self {
148        Self { writable: true, ..self }
149    }
150
151    /// If set, [`Client`] will connect to /blob in the current component's namespace with
152    /// [`fio::PERM_EXECUTABLE`].
153    pub fn executable(self) -> Self {
154        Self { executable: true, ..self }
155    }
156}
157
158impl Client {
159    /// Create an empty `ClientBuilder`
160    pub fn builder() -> ClientBuilder {
161        Default::default()
162    }
163}
164/// Blobfs client
165#[derive(Debug, Clone)]
166pub struct Client {
167    dir: fio::DirectoryProxy,
168    creator: Option<ffxfs::BlobCreatorProxy>,
169    reader: ffxfs::BlobReaderProxy,
170}
171
172impl Client {
173    /// Returns a client connected to the given blob directory, BlobCreatorProxy, and
174    /// BlobReaderProxy. If `vmex` is passed in, sets the VmexResource, which is used to mark blobs
175    /// as executable. If `creator` is not supplied, writes will fail.
176    pub fn new(
177        dir: fio::DirectoryProxy,
178        creator: Option<ffxfs::BlobCreatorProxy>,
179        reader: ffxfs::BlobReaderProxy,
180        vmex: Option<zx::Resource>,
181    ) -> Result<Self, anyhow::Error> {
182        if let Some(vmex) = vmex {
183            vmo_blob::init_vmex_resource(vmex)?;
184        }
185        Ok(Self { dir, creator, reader })
186    }
187
188    /// Creates a new client backed by the returned request stream. This constructor should not be
189    /// used outside of tests.
190    ///
191    /// # Panics
192    ///
193    /// Panics on error
194    pub fn new_test() -> (
195        Self,
196        fio::DirectoryRequestStream,
197        ffxfs::BlobReaderRequestStream,
198        ffxfs::BlobCreatorRequestStream,
199    ) {
200        let (dir, dir_stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
201        let (reader, reader_stream) =
202            fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobReaderMarker>();
203        let (creator, creator_stream) =
204            fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
205
206        (Self { dir, creator: Some(creator), reader }, dir_stream, reader_stream, creator_stream)
207    }
208
209    /// Creates a new client backed by the returned mock. This constructor should not be used
210    /// outside of tests.
211    ///
212    /// # Panics
213    ///
214    /// Panics on error
215    pub fn new_mock() -> (Self, mock::Mock) {
216        let (dir, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
217        let (reader, reader_stream) =
218            fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobReaderMarker>();
219        let (creator, creator_stream) =
220            fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
221
222        (
223            Self { dir, creator: Some(creator), reader },
224            mock::Mock { stream, reader_stream, creator_stream },
225        )
226    }
227
228    /// Returns the read-only VMO backing the blob.
229    pub async fn get_blob_vmo(&self, hash: &Hash) -> Result<zx::Vmo, GetBlobVmoError> {
230        self.reader
231            .get_vmo(hash)
232            .await
233            .map_err(GetBlobVmoError::Fidl)?
234            .map_err(|s| GetBlobVmoError::GetVmo(Status::from_raw(s)))
235    }
236
237    /// Open a blob for read.
238    pub fn deprecated_open_blob_for_read(
239        &self,
240        blob: &Hash,
241        flags: fio::OpenFlags,
242        scope: ExecutionScope,
243        server_end: ServerEnd<fio::NodeMarker>,
244    ) -> Result<(), fidl::Error> {
245        let describe = flags.contains(fio::OpenFlags::DESCRIBE);
246        // Reject requests that attempt to open blobs as writable.
247        if flags.contains(fio::OpenFlags::RIGHT_WRITABLE) {
248            send_on_open_with_error(describe, server_end, zx::Status::ACCESS_DENIED);
249            return Ok(());
250        }
251        // Reject requests that attempt to create new blobs.
252        if flags.intersects(fio::OpenFlags::CREATE | fio::OpenFlags::CREATE_IF_ABSENT) {
253            send_on_open_with_error(describe, server_end, zx::Status::NOT_SUPPORTED);
254            return Ok(());
255        }
256        let object_request = flags.to_object_request(server_end);
257        let () = open_blob_with_reader(self.reader.clone(), *blob, scope, flags, object_request);
258        Ok(())
259    }
260
261    /// Open a blob for read using open3.
262    pub fn open_blob_for_read(
263        &self,
264        blob: &Hash,
265        flags: fio::Flags,
266        scope: ExecutionScope,
267        object_request: ObjectRequestRef<'_>,
268    ) -> Result<(), zx::Status> {
269        if flags.rights().is_some_and(|rights| rights.contains(fio::Operations::WRITE_BYTES)) {
270            return Err(zx::Status::ACCESS_DENIED);
271        }
272        if flags.creation_mode() != vfs::CreationMode::Never {
273            return Err(zx::Status::NOT_SUPPORTED);
274        }
275        // Errors below will be communicated via the `object_request` channel.
276        let object_request = object_request.take();
277        let () = open_blob_with_reader(self.reader.clone(), *blob, scope, flags, object_request);
278        Ok(())
279    }
280
281    /// Returns the list of known blobs in blobfs.
282    pub async fn list_known_blobs(&self) -> Result<HashSet<Hash>, BlobfsError> {
283        // fuchsia.io.Directory.ReadDirents uses a per-connection index into the array of
284        // directory entries. To prevent contention over this index by concurrent calls (either
285        // from concurrent calls to list_known_blobs on this object, or on clones of this object,
286        // or other clones of the DirectoryProxy this object was made from), create a new
287        // connection which will have its own index.
288        let private_connection = fuchsia_fs::directory::clone(&self.dir)?;
289        fuchsia_fs::directory::readdir(&private_connection)
290            .await
291            .map_err(BlobfsError::ReadDir)?
292            .into_iter()
293            .filter(|entry| entry.kind == fuchsia_fs::directory::DirentKind::File)
294            .map(|entry| entry.name.parse().map_err(BlobfsError::ParseHash))
295            .collect()
296    }
297
298    /// Delete the blob with the given merkle hash.
299    pub async fn delete_blob(&self, blob: &Hash) -> Result<(), BlobfsError> {
300        self.dir
301            .unlink(&blob.to_string(), &fio::UnlinkOptions::default())
302            .await?
303            .map_err(|s| BlobfsError::Unlink(Status::from_raw(s)))
304    }
305
306    /// Open a new blob for write.
307    pub async fn open_blob_for_write(
308        &self,
309        blob: &Hash,
310    ) -> Result<ClientEnd<ffxfs::BlobWriterMarker>, CreateError> {
311        let Some(creator) = &self.creator else {
312            return Err(CreateError::WritingNotConfigured);
313        };
314        Ok(creator.create(blob, false).await??)
315    }
316
317    /// Returns whether blobfs has a blob with the given hash.
318    /// On c++blobfs, this should only be called if there are no concurrent attempts to write the
319    /// blob. On c++blobfs, open connections to even partially written blobs keep the blob alive,
320    /// and so if this call overlaps with a concurrent attempt to create the blob that fails and
321    /// is then retried, this open connection will prevent the partially written blob from being
322    /// removed and block the creation of the new write connection.
323    pub async fn has_blob(&self, blob: &Hash) -> bool {
324        // TODO(https://fxbug.dev/295552228): Use faster API for determining blob presence.
325        matches!(self.reader.get_vmo(blob).await, Ok(Ok(_)))
326    }
327
328    /// Determines which blobs of `candidates` are missing from blobfs.
329    /// TODO(https://fxbug.dev/338477132) This fn is used during resolves after a meta.far is
330    /// fetched to determine which content blobs and subpackage meta.fars need to be fetched.
331    /// On c++blobfs, opening a partially written blob keeps that blob alive, creating the
332    /// following race condition:
333    /// 1. blob is partially written by resolve A
334    /// 2. blob is opened by this fn to check for presence by concurrent resolve B
335    /// 3. resolve A encounters an error and retries the fetch, which attempts to open the blob for
336    ///    write, which collides with the partially written blob from (1) that is being kept alive
337    ///    by (2) and so fails
338    pub async fn filter_to_missing_blobs(
339        &self,
340        candidates: impl IntoIterator<Item = Hash>,
341    ) -> HashSet<Hash> {
342        // Attempt to open each blob instead of using ReadDirents to catch more forms of filesystem
343        // metadata corruption.
344        // We don't use ReadDirents even as a pre-filter because emulator testing suggests
345        // ReadDirents on an fxblob with 1,000 blobs takes as long as ~60 sequential has_blob calls
346        // on missing blobs, and it's about 5x worse on c++blobfs (on which both ReadDirents is
347        // slower and has_blob is faster). The minor speedup on packages with a great number of
348        // missing blobs is not worth a rarely taken branch deep within package resolution.
349        stream::iter(candidates)
350            .map(move |blob| async move {
351                if self.has_blob(&blob).await {
352                    None
353                } else {
354                    Some(blob)
355                }
356            })
357            // Emulator testing suggests both c++blobfs and fxblob show diminishing returns after
358            // even three concurrent `has_blob` calls.
359            .buffer_unordered(10)
360            .filter_map(|blob| async move { blob })
361            .collect()
362            .await
363    }
364
365    /// Call fuchsia.io/Node.Sync on the blobfs directory.
366    pub async fn sync(&self) -> Result<(), BlobfsError> {
367        self.dir.sync().await?.map_err(zx::Status::from_raw).map_err(BlobfsError::Sync)
368    }
369}
370
371/// Spawns a task on `scope` to attempt opening `blob` via `reader`. Creates a file connection to
372/// the blob using [`vmo_blob::VmoBlob`]. Errors will be sent via `object_request` asynchronously.
373fn open_blob_with_reader<P: ProtocolsExt + Send>(
374    reader: ffxfs::BlobReaderProxy,
375    blob_hash: Hash,
376    scope: ExecutionScope,
377    protocols: P,
378    object_request: ObjectRequest,
379) {
380    scope.clone().spawn(object_request.handle_async(async move |object_request| {
381        let get_vmo_result = reader.get_vmo(&blob_hash.into()).await.map_err(|fidl_error| {
382            if let fidl::Error::ClientChannelClosed { status, .. } = fidl_error {
383                error!("Blob reader channel closed: {:?}", status);
384                status
385            } else {
386                error!("Transport error on get_vmo: {:?}", fidl_error);
387                zx::Status::INTERNAL
388            }
389        })?;
390        let vmo = get_vmo_result.map_err(zx::Status::from_raw)?;
391        let vmo_blob = vmo_blob::VmoBlob::new(vmo);
392        object_request
393            .create_connection::<StreamIoConnection<_>, _>(scope, vmo_blob, protocols)
394            .await
395    }));
396}
397
398#[derive(thiserror::Error, Debug)]
399#[allow(missing_docs)]
400pub enum GetBlobVmoError {
401    #[error("getting the vmo")]
402    GetVmo(#[source] Status),
403
404    #[error("opening the blob")]
405    OpenBlob(#[source] fuchsia_fs::node::OpenError),
406
407    #[error("making a fidl request")]
408    Fidl(#[source] fidl::Error),
409}
410
411#[cfg(test)]
412impl Client {
413    /// Constructs a new [`Client`] connected to the provided [`BlobfsRamdisk`]. Tests in this
414    /// crate should use this constructor rather than [`BlobfsRamdisk::client`], which returns
415    /// the non-cfg(test) build of this crate's [`blobfs::Client`]. While tests could use the
416    /// [`blobfs::Client`] returned by [`BlobfsRamdisk::client`], it will be a different type than
417    /// [`super::Client`], and the tests could not access its private members or any cfg(test)
418    /// specific functionality.
419    ///
420    /// # Panics
421    ///
422    /// Panics on error.
423    pub fn for_ramdisk(blobfs: &blobfs_ramdisk::BlobfsRamdisk) -> Self {
424        Self::new(
425            blobfs.root_dir_proxy().unwrap(),
426            Some(blobfs.blob_creator_proxy().unwrap()),
427            blobfs.blob_reader_proxy().unwrap(),
428            None,
429        )
430        .unwrap()
431    }
432}
433
434#[cfg(test)]
435#[allow(clippy::bool_assert_comparison)]
436mod tests {
437    use super::*;
438    use assert_matches::assert_matches;
439    use blobfs_ramdisk::BlobfsRamdisk;
440    use fuchsia_async as fasync;
441    use futures::stream::TryStreamExt as _;
442    use std::sync::Arc;
443    use test_case::test_case;
444
445    #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
446    #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
447    #[fuchsia::test]
448    async fn list_known_blobs_empty(blob_impl: blobfs_ramdisk::Implementation) {
449        let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
450        let client = Client::for_ramdisk(&blobfs);
451
452        assert_eq!(client.list_known_blobs().await.unwrap(), HashSet::new());
453        blobfs.stop().await.unwrap();
454    }
455
456    #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
457    #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
458    #[fuchsia::test]
459    async fn list_known_blobs(blob_impl: blobfs_ramdisk::Implementation) {
460        let blobfs = BlobfsRamdisk::builder()
461            .implementation(blob_impl)
462            .with_blob(&b"blob 1"[..])
463            .with_blob(&b"blob 2"[..])
464            .start()
465            .await
466            .unwrap();
467        let client = Client::for_ramdisk(&blobfs);
468
469        let expected = blobfs.list_blobs().unwrap().into_iter().collect();
470        assert_eq!(client.list_known_blobs().await.unwrap(), expected);
471        blobfs.stop().await.unwrap();
472    }
473
474    #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
475    #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
476    #[fuchsia::test]
477    async fn delete_blob_and_then_list(blob_impl: blobfs_ramdisk::Implementation) {
478        let blobfs = BlobfsRamdisk::builder()
479            .implementation(blob_impl)
480            .with_blob(&b"blob 1"[..])
481            .with_blob(&b"blob 2"[..])
482            .start()
483            .await
484            .unwrap();
485        let client = Client::for_ramdisk(&blobfs);
486
487        let merkle = fuchsia_merkle::from_slice(&b"blob 1"[..]).root();
488        assert_matches!(client.delete_blob(&merkle).await, Ok(()));
489
490        let expected = HashSet::from([fuchsia_merkle::from_slice(&b"blob 2"[..]).root()]);
491        assert_eq!(client.list_known_blobs().await.unwrap(), expected);
492        blobfs.stop().await.unwrap();
493    }
494
495    #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
496    #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
497    #[fuchsia::test]
498    async fn delete_nonexistent_blob(blob_impl: blobfs_ramdisk::Implementation) {
499        let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
500        let client = Client::for_ramdisk(&blobfs);
501        let blob_merkle = Hash::from([1; 32]);
502
503        assert_matches!(
504            client.delete_blob(&blob_merkle).await,
505            Err(BlobfsError::Unlink(Status::NOT_FOUND))
506        );
507        blobfs.stop().await.unwrap();
508    }
509
510    #[fuchsia::test]
511    async fn delete_blob_mock() {
512        let (client, mut stream, _, _) = Client::new_test();
513        let blob_merkle = Hash::from([1; 32]);
514        fasync::Task::spawn(async move {
515            match stream.try_next().await.unwrap().unwrap() {
516                fio::DirectoryRequest::Unlink { name, responder, .. } => {
517                    assert_eq!(name, blob_merkle.to_string());
518                    responder.send(Ok(())).unwrap();
519                }
520                other => panic!("unexpected request: {other:?}"),
521            }
522        })
523        .detach();
524
525        assert_matches!(client.delete_blob(&blob_merkle).await, Ok(()));
526    }
527
528    #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
529    #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
530    #[fuchsia::test]
531    async fn has_blob(blob_impl: blobfs_ramdisk::Implementation) {
532        let blobfs = BlobfsRamdisk::builder()
533            .implementation(blob_impl)
534            .with_blob(&b"blob 1"[..])
535            .start()
536            .await
537            .unwrap();
538        let client = Client::for_ramdisk(&blobfs);
539
540        assert!(client.has_blob(&fuchsia_merkle::from_slice(&b"blob 1"[..]).root()).await);
541        assert!(!client.has_blob(&Hash::from([1; 32])).await);
542
543        blobfs.stop().await.unwrap();
544    }
545
546    #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
547    #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
548    #[fuchsia::test]
549    async fn has_blob_return_false_if_blob_is_partially_written(
550        blob_impl: blobfs_ramdisk::Implementation,
551    ) {
552        let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
553        let client = Client::for_ramdisk(&blobfs);
554
555        let content = [3; 1024];
556        let hash = fuchsia_merkle::from_slice(&content).root();
557        let delivery_content =
558            delivery_blob::Type1Blob::generate(&content, delivery_blob::CompressionMode::Always);
559
560        let writer = client.open_blob_for_write(&hash).await.unwrap().into_proxy();
561        assert!(!client.has_blob(&hash).await);
562
563        let n = delivery_content.len();
564        let vmo = writer.get_vmo(n.try_into().unwrap()).await.unwrap().unwrap();
565        assert!(!client.has_blob(&hash).await);
566
567        let () = vmo.write(&delivery_content[0..n - 1], 0).unwrap();
568        let () = writer.bytes_ready((n - 1).try_into().unwrap()).await.unwrap().unwrap();
569        assert!(!client.has_blob(&hash).await);
570
571        let () = vmo.write(&delivery_content[n - 1..], (n - 1).try_into().unwrap()).unwrap();
572        let () = writer.bytes_ready(1.try_into().unwrap()).await.unwrap().unwrap();
573        assert!(client.has_blob(&hash).await);
574
575        blobfs.stop().await.unwrap();
576    }
577
578    async fn fully_write_blob(client: &Client, content: &[u8]) -> Hash {
579        let hash = fuchsia_merkle::from_slice(content).root();
580        let delivery_content =
581            delivery_blob::Type1Blob::generate(content, delivery_blob::CompressionMode::Always);
582        let writer = client.open_blob_for_write(&hash).await.unwrap().into_proxy();
583        let vmo = writer
584            .get_vmo(delivery_content.len().try_into().unwrap())
585            .await
586            .expect("a")
587            .map_err(zx::Status::from_raw)
588            .expect("b");
589        let () = vmo.write(&delivery_content, 0).unwrap();
590        let () =
591            writer.bytes_ready(delivery_content.len().try_into().unwrap()).await.unwrap().unwrap();
592        hash
593    }
594
595    #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
596    #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
597    #[fuchsia::test]
598    async fn filter_to_missing_blobs(blob_impl: blobfs_ramdisk::Implementation) {
599        let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
600        let client = Client::for_ramdisk(&blobfs);
601
602        let missing_hash0 = Hash::from([0; 32]);
603        let missing_hash1 = Hash::from([1; 32]);
604
605        let present_blob0 = fully_write_blob(&client, &[2; 1024]).await;
606        let present_blob1 = fully_write_blob(&client, &[3; 1024]).await;
607
608        assert_eq!(
609            client
610                .filter_to_missing_blobs([
611                    missing_hash0,
612                    missing_hash1,
613                    present_blob0,
614                    present_blob1
615                ])
616                .await,
617            HashSet::from([missing_hash0, missing_hash1])
618        );
619
620        blobfs.stop().await.unwrap();
621    }
622
623    #[fuchsia::test]
624    async fn sync() {
625        let counter = Arc::new(std::sync::atomic::AtomicU32::new(0));
626        let counter_clone = Arc::clone(&counter);
627        let (client, mut stream, _, _) = Client::new_test();
628        fasync::Task::spawn(async move {
629            match stream.try_next().await.unwrap().unwrap() {
630                fio::DirectoryRequest::Sync { responder } => {
631                    counter_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
632                    responder.send(Ok(())).unwrap();
633                }
634                other => panic!("unexpected request: {other:?}"),
635            }
636        })
637        .detach();
638
639        assert_matches!(client.sync().await, Ok(()));
640        assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1);
641    }
642
643    #[fuchsia::test]
644    async fn open_blob_for_write_maps_already_exists() {
645        let (blob_creator, mut blob_creator_stream) =
646            fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
647        let (blob_reader, _) = fidl::endpoints::create_proxy::<ffxfs::BlobReaderMarker>();
648
649        let client = Client::new(
650            fidl::endpoints::create_proxy::<fio::DirectoryMarker>().0,
651            Some(blob_creator),
652            blob_reader,
653            None,
654        )
655        .unwrap();
656
657        fuchsia_async::Task::spawn(async move {
658            match blob_creator_stream.next().await.unwrap().unwrap() {
659                ffxfs::BlobCreatorRequest::Create { hash, allow_existing, responder } => {
660                    assert_eq!(hash, [0; 32]);
661                    assert!(!allow_existing);
662                    let () = responder.send(Err(ffxfs::CreateBlobError::AlreadyExists)).unwrap();
663                }
664            }
665        })
666        .detach();
667
668        assert_matches!(
669            client.open_blob_for_write(&[0; 32].into()).await,
670            Err(CreateError::AlreadyExists)
671        );
672    }
673
674    #[fuchsia::test]
675    async fn concurrent_list_known_blobs_all_return_full_contents() {
676        use futures::StreamExt;
677        let blobfs = BlobfsRamdisk::builder().start().await.unwrap();
678        let client = Client::for_ramdisk(&blobfs);
679
680        // ReadDirents returns an 8,192 byte buffer, and each entry is 74 bytes [0] (including 64
681        // bytes of filename), so use more than 110 entries to guarantee that listing all contents
682        // requires multiple ReadDirents calls. This isn't necessary to cause conflict, because
683        // each successful listing requires a call to Rewind as well, but it does make conflict
684        // more likely.
685        // [0] https://cs.opensource.google/fuchsia/fuchsia/+/main:sdk/fidl/fuchsia.io/directory.fidl;l=261;drc=9e84e19d3f42240c46d2b0c3c132c2f0b5a3343f
686        for i in 0..256u16 {
687            let _: Hash = fully_write_blob(&client, i.to_le_bytes().as_slice()).await;
688        }
689
690        let () = futures::stream::iter(0..100)
691            .for_each_concurrent(None, |_| async {
692                assert_eq!(client.list_known_blobs().await.unwrap().len(), 256);
693            })
694            .await;
695    }
696}