blobfs/
lib.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![deny(missing_docs)]
6
7//! Typesafe wrappers around the /blob filesystem.
8
9use fidl::endpoints::{ClientEnd, ServerEnd};
10use fuchsia_hash::{Hash, ParseHashError};
11use futures::{StreamExt as _, stream};
12use log::{error, info};
13use std::collections::HashSet;
14use thiserror::Error;
15use vfs::common::send_on_open_with_error;
16use vfs::execution_scope::ExecutionScope;
17use vfs::file::StreamIoConnection;
18use vfs::{ObjectRequest, ObjectRequestRef, ProtocolsExt, ToObjectRequest as _};
19use zx::Status;
20use {fidl_fuchsia_fxfs as ffxfs, fidl_fuchsia_io as fio};
21
22pub mod mock;
23pub use mock::Mock;
24
25/// Blobfs client errors.
26#[derive(Debug, Error)]
27#[allow(missing_docs)]
28pub enum BlobfsError {
29    #[error("while opening blobfs dir")]
30    OpenDir(#[from] fuchsia_fs::node::OpenError),
31
32    #[error("while cloning the blobfs dir")]
33    CloneDir(#[from] fuchsia_fs::node::CloneError),
34
35    #[error("while listing blobfs dir")]
36    ReadDir(#[source] fuchsia_fs::directory::EnumerateError),
37
38    #[error("while deleting blob")]
39    Unlink(#[source] Status),
40
41    #[error("while sync'ing")]
42    Sync(#[source] Status),
43
44    #[error("while parsing blob merkle hash")]
45    ParseHash(#[from] ParseHashError),
46
47    #[error("FIDL error")]
48    Fidl(#[from] fidl::Error),
49
50    #[error("while connecting to fuchsia.fxfs/BlobCreator")]
51    ConnectToBlobCreator(#[source] anyhow::Error),
52
53    #[error("while connecting to fuchsia.fxfs/BlobReader")]
54    ConnectToBlobReader(#[source] anyhow::Error),
55
56    #[error("while setting the VmexResource")]
57    InitVmexResource(#[source] anyhow::Error),
58}
59
60/// An error encountered while creating a blob
61#[derive(Debug, Error)]
62#[allow(missing_docs)]
63pub enum CreateError {
64    #[error("the blob already exists or is being concurrently written")]
65    AlreadyExists,
66
67    #[error("while creating the blob")]
68    Io(#[source] fuchsia_fs::node::OpenError),
69
70    #[error("while converting the proxy into a client end")]
71    ConvertToClientEnd,
72
73    #[error("FIDL error")]
74    Fidl(#[from] fidl::Error),
75
76    #[error("while calling fuchsia.fxfs/BlobCreator.Create: {0:?}")]
77    BlobCreator(ffxfs::CreateBlobError),
78
79    #[error("this client was not created with a blob creator so it cannot write blobs")]
80    WritingNotConfigured,
81}
82
83impl From<ffxfs::CreateBlobError> for CreateError {
84    fn from(e: ffxfs::CreateBlobError) -> Self {
85        match e {
86            ffxfs::CreateBlobError::AlreadyExists => CreateError::AlreadyExists,
87            e @ ffxfs::CreateBlobError::Internal => CreateError::BlobCreator(e),
88        }
89    }
90}
91
92/// A builder for [`Client`]
93#[derive(Default)]
94pub struct ClientBuilder {
95    readable: bool,
96    writable: bool,
97    executable: bool,
98}
99
100impl ClientBuilder {
101    /// Opens the /blob directory in the component's namespace with readable, writable, and/or
102    /// executable flags. Connects to the fuchsia.fxfs.BlobCreator and BlobReader if requested.
103    /// Connects to and initializes the VmexResource if `use_vmex` is set. Returns a `Client`.
104    pub async fn build(self) -> Result<Client, BlobfsError> {
105        let mut flags = fio::Flags::empty();
106        if self.readable {
107            flags |= fio::PERM_READABLE
108        }
109        if self.writable {
110            flags |= fio::PERM_WRITABLE
111        }
112        if self.executable {
113            flags |= fio::PERM_EXECUTABLE
114        }
115        let dir = fuchsia_fs::directory::open_in_namespace("/blob", flags)?;
116        if let Ok(client) = fuchsia_component::client::connect_to_protocol::<
117            fidl_fuchsia_kernel::VmexResourceMarker,
118        >() && let Ok(vmex) = client.get().await
119        {
120            info!("Got vmex resource");
121            vmo_blob::init_vmex_resource(vmex).map_err(BlobfsError::InitVmexResource)?;
122        }
123        let reader = fuchsia_component::client::connect_to_protocol::<ffxfs::BlobReaderMarker>()
124            .map_err(BlobfsError::ConnectToBlobReader)?;
125        let creator = if self.writable {
126            Some(
127                fuchsia_component::client::connect_to_protocol::<ffxfs::BlobCreatorMarker>()
128                    .map_err(BlobfsError::ConnectToBlobCreator)?,
129            )
130        } else {
131            None
132        };
133
134        Ok(Client { dir, creator, reader })
135    }
136
137    /// If set, [`Client`] will connect to /blob in the current component's namespace with
138    /// [`fio::PERM_READABLE`].
139    pub fn readable(self) -> Self {
140        Self { readable: true, ..self }
141    }
142
143    /// If set, [`Client`] will connect to /blob in the current component's namespace with
144    /// [`fio::PERM_WRITABLE`] which needed so that [`Client::delete_blob`] can unlink the file,
145    /// and [`Client`] will connect to and use fuchsia.fxfs/BlobCreator for writes.
146    pub fn writable(self) -> Self {
147        Self { writable: true, ..self }
148    }
149
150    /// If set, [`Client`] will connect to /blob in the current component's namespace with
151    /// [`fio::PERM_EXECUTABLE`].
152    pub fn executable(self) -> Self {
153        Self { executable: true, ..self }
154    }
155}
156
157impl Client {
158    /// Create an empty `ClientBuilder`
159    pub fn builder() -> ClientBuilder {
160        Default::default()
161    }
162}
163/// Blobfs client
164#[derive(Debug, Clone)]
165pub struct Client {
166    dir: fio::DirectoryProxy,
167    creator: Option<ffxfs::BlobCreatorProxy>,
168    reader: ffxfs::BlobReaderProxy,
169}
170
171impl Client {
172    /// Returns a client connected to the given blob directory, BlobCreatorProxy, and
173    /// BlobReaderProxy. If `vmex` is passed in, sets the VmexResource, which is used to mark blobs
174    /// as executable. If `creator` is not supplied, writes will fail.
175    pub fn new(
176        dir: fio::DirectoryProxy,
177        creator: Option<ffxfs::BlobCreatorProxy>,
178        reader: ffxfs::BlobReaderProxy,
179        vmex: Option<zx::Resource>,
180    ) -> Result<Self, anyhow::Error> {
181        if let Some(vmex) = vmex {
182            vmo_blob::init_vmex_resource(vmex)?;
183        }
184        Ok(Self { dir, creator, reader })
185    }
186
187    /// Creates a new client backed by the returned request stream. This constructor should not be
188    /// used outside of tests.
189    ///
190    /// # Panics
191    ///
192    /// Panics on error
193    pub fn new_test() -> (
194        Self,
195        fio::DirectoryRequestStream,
196        ffxfs::BlobReaderRequestStream,
197        ffxfs::BlobCreatorRequestStream,
198    ) {
199        let (dir, dir_stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
200        let (reader, reader_stream) =
201            fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobReaderMarker>();
202        let (creator, creator_stream) =
203            fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
204
205        (Self { dir, creator: Some(creator), reader }, dir_stream, reader_stream, creator_stream)
206    }
207
208    /// Creates a new client backed by the returned mock. This constructor should not be used
209    /// outside of tests.
210    ///
211    /// # Panics
212    ///
213    /// Panics on error
214    pub fn new_mock() -> (Self, mock::Mock) {
215        let (dir, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
216        let (reader, reader_stream) =
217            fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobReaderMarker>();
218        let (creator, creator_stream) =
219            fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
220
221        (
222            Self { dir, creator: Some(creator), reader },
223            mock::Mock { stream, reader_stream, creator_stream },
224        )
225    }
226
227    /// Returns the read-only VMO backing the blob.
228    pub async fn get_blob_vmo(&self, hash: &Hash) -> Result<zx::Vmo, GetBlobVmoError> {
229        self.reader
230            .get_vmo(hash)
231            .await
232            .map_err(GetBlobVmoError::Fidl)?
233            .map_err(|s| GetBlobVmoError::GetVmo(Status::from_raw(s)))
234    }
235
236    /// Open a blob for read.
237    pub fn deprecated_open_blob_for_read(
238        &self,
239        blob: &Hash,
240        flags: fio::OpenFlags,
241        scope: ExecutionScope,
242        server_end: ServerEnd<fio::NodeMarker>,
243    ) -> Result<(), fidl::Error> {
244        let describe = flags.contains(fio::OpenFlags::DESCRIBE);
245        // Reject requests that attempt to open blobs as writable.
246        if flags.contains(fio::OpenFlags::RIGHT_WRITABLE) {
247            send_on_open_with_error(describe, server_end, zx::Status::ACCESS_DENIED);
248            return Ok(());
249        }
250        // Reject requests that attempt to create new blobs.
251        if flags.intersects(fio::OpenFlags::CREATE | fio::OpenFlags::CREATE_IF_ABSENT) {
252            send_on_open_with_error(describe, server_end, zx::Status::NOT_SUPPORTED);
253            return Ok(());
254        }
255        let object_request = flags.to_object_request(server_end);
256        let () = open_blob_with_reader(self.reader.clone(), *blob, scope, flags, object_request);
257        Ok(())
258    }
259
260    /// Open a blob for read using open3.
261    pub fn open_blob_for_read(
262        &self,
263        blob: &Hash,
264        flags: fio::Flags,
265        scope: ExecutionScope,
266        object_request: ObjectRequestRef<'_>,
267    ) -> Result<(), zx::Status> {
268        if flags.rights().is_some_and(|rights| rights.contains(fio::Operations::WRITE_BYTES)) {
269            return Err(zx::Status::ACCESS_DENIED);
270        }
271        if flags.creation_mode() != vfs::CreationMode::Never {
272            return Err(zx::Status::NOT_SUPPORTED);
273        }
274        // Errors below will be communicated via the `object_request` channel.
275        let object_request = object_request.take();
276        let () = open_blob_with_reader(self.reader.clone(), *blob, scope, flags, object_request);
277        Ok(())
278    }
279
280    /// Returns the list of known blobs in blobfs.
281    pub async fn list_known_blobs(&self) -> Result<HashSet<Hash>, BlobfsError> {
282        // fuchsia.io.Directory.ReadDirents uses a per-connection index into the array of
283        // directory entries. To prevent contention over this index by concurrent calls (either
284        // from concurrent calls to list_known_blobs on this object, or on clones of this object,
285        // or other clones of the DirectoryProxy this object was made from), create a new
286        // connection which will have its own index.
287        let private_connection = fuchsia_fs::directory::clone(&self.dir)?;
288        fuchsia_fs::directory::readdir(&private_connection)
289            .await
290            .map_err(BlobfsError::ReadDir)?
291            .into_iter()
292            .filter(|entry| entry.kind == fuchsia_fs::directory::DirentKind::File)
293            .map(|entry| entry.name.parse().map_err(BlobfsError::ParseHash))
294            .collect()
295    }
296
297    /// Delete the blob with the given merkle hash.
298    pub async fn delete_blob(&self, blob: &Hash) -> Result<(), BlobfsError> {
299        self.dir
300            .unlink(&blob.to_string(), &fio::UnlinkOptions::default())
301            .await?
302            .map_err(|s| BlobfsError::Unlink(Status::from_raw(s)))
303    }
304
305    /// Open a new blob for write.
306    pub async fn open_blob_for_write(
307        &self,
308        blob: &Hash,
309    ) -> Result<ClientEnd<ffxfs::BlobWriterMarker>, CreateError> {
310        let Some(creator) = &self.creator else {
311            return Err(CreateError::WritingNotConfigured);
312        };
313        Ok(creator.create(blob, false).await??)
314    }
315
316    /// Returns whether blobfs has a blob with the given hash.
317    /// On c++blobfs, this should only be called if there are no concurrent attempts to write the
318    /// blob. On c++blobfs, open connections to even partially written blobs keep the blob alive,
319    /// and so if this call overlaps with a concurrent attempt to create the blob that fails and
320    /// is then retried, this open connection will prevent the partially written blob from being
321    /// removed and block the creation of the new write connection.
322    pub async fn has_blob(&self, blob: &Hash) -> bool {
323        // TODO(https://fxbug.dev/295552228): Use faster API for determining blob presence.
324        matches!(self.reader.get_vmo(blob).await, Ok(Ok(_)))
325    }
326
327    /// Determines which blobs of `candidates` are missing from blobfs.
328    /// TODO(https://fxbug.dev/338477132) This fn is used during resolves after a meta.far is
329    /// fetched to determine which content blobs and subpackage meta.fars need to be fetched.
330    /// On c++blobfs, opening a partially written blob keeps that blob alive, creating the
331    /// following race condition:
332    /// 1. blob is partially written by resolve A
333    /// 2. blob is opened by this fn to check for presence by concurrent resolve B
334    /// 3. resolve A encounters an error and retries the fetch, which attempts to open the blob for
335    ///    write, which collides with the partially written blob from (1) that is being kept alive
336    ///    by (2) and so fails
337    pub async fn filter_to_missing_blobs(
338        &self,
339        candidates: impl IntoIterator<Item = Hash>,
340    ) -> HashSet<Hash> {
341        // Attempt to open each blob instead of using ReadDirents to catch more forms of filesystem
342        // metadata corruption.
343        // We don't use ReadDirents even as a pre-filter because emulator testing suggests
344        // ReadDirents on an fxblob with 1,000 blobs takes as long as ~60 sequential has_blob calls
345        // on missing blobs, and it's about 5x worse on c++blobfs (on which both ReadDirents is
346        // slower and has_blob is faster). The minor speedup on packages with a great number of
347        // missing blobs is not worth a rarely taken branch deep within package resolution.
348        stream::iter(candidates)
349            .map(move |blob| async move {
350                if self.has_blob(&blob).await {
351                    None
352                } else {
353                    Some(blob)
354                }
355            })
356            // Emulator testing suggests both c++blobfs and fxblob show diminishing returns after
357            // even three concurrent `has_blob` calls.
358            .buffer_unordered(10)
359            .filter_map(|blob| async move { blob })
360            .collect()
361            .await
362    }
363
364    /// Call fuchsia.io/Node.Sync on the blobfs directory.
365    pub async fn sync(&self) -> Result<(), BlobfsError> {
366        self.dir.sync().await?.map_err(zx::Status::from_raw).map_err(BlobfsError::Sync)
367    }
368}
369
370/// Spawns a task on `scope` to attempt opening `blob` via `reader`. Creates a file connection to
371/// the blob using [`vmo_blob::VmoBlob`]. Errors will be sent via `object_request` asynchronously.
372fn open_blob_with_reader<P: ProtocolsExt + Send>(
373    reader: ffxfs::BlobReaderProxy,
374    blob_hash: Hash,
375    scope: ExecutionScope,
376    protocols: P,
377    object_request: ObjectRequest,
378) {
379    scope.clone().spawn(object_request.handle_async(async move |object_request| {
380        let get_vmo_result = reader.get_vmo(&blob_hash.into()).await.map_err(|fidl_error| {
381            if let fidl::Error::ClientChannelClosed { status, .. } = fidl_error {
382                error!("Blob reader channel closed: {:?}", status);
383                status
384            } else {
385                error!("Transport error on get_vmo: {:?}", fidl_error);
386                zx::Status::INTERNAL
387            }
388        })?;
389        let vmo = get_vmo_result.map_err(zx::Status::from_raw)?;
390        let vmo_blob = vmo_blob::VmoBlob::new(vmo);
391        object_request
392            .create_connection::<StreamIoConnection<_>, _>(scope, vmo_blob, protocols)
393            .await
394    }));
395}
396
397#[derive(thiserror::Error, Debug)]
398#[allow(missing_docs)]
399pub enum GetBlobVmoError {
400    #[error("getting the vmo")]
401    GetVmo(#[source] Status),
402
403    #[error("opening the blob")]
404    OpenBlob(#[source] fuchsia_fs::node::OpenError),
405
406    #[error("making a fidl request")]
407    Fidl(#[source] fidl::Error),
408}
409
410#[cfg(test)]
411impl Client {
412    /// Constructs a new [`Client`] connected to the provided [`BlobfsRamdisk`]. Tests in this
413    /// crate should use this constructor rather than [`BlobfsRamdisk::client`], which returns
414    /// the non-cfg(test) build of this crate's [`blobfs::Client`]. While tests could use the
415    /// [`blobfs::Client`] returned by [`BlobfsRamdisk::client`], it will be a different type than
416    /// [`super::Client`], and the tests could not access its private members or any cfg(test)
417    /// specific functionality.
418    ///
419    /// # Panics
420    ///
421    /// Panics on error.
422    pub fn for_ramdisk(blobfs: &blobfs_ramdisk::BlobfsRamdisk) -> Self {
423        Self::new(
424            blobfs.root_dir_proxy().unwrap(),
425            Some(blobfs.blob_creator_proxy().unwrap()),
426            blobfs.blob_reader_proxy().unwrap(),
427            None,
428        )
429        .unwrap()
430    }
431}
432
433#[cfg(test)]
434#[allow(clippy::bool_assert_comparison)]
435mod tests {
436    use super::*;
437    use assert_matches::assert_matches;
438    use blobfs_ramdisk::BlobfsRamdisk;
439    use fuchsia_async as fasync;
440    use futures::stream::TryStreamExt as _;
441    use std::sync::Arc;
442    use test_case::test_case;
443
444    #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
445    #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
446    #[fuchsia::test]
447    async fn list_known_blobs_empty(blob_impl: blobfs_ramdisk::Implementation) {
448        let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
449        let client = Client::for_ramdisk(&blobfs);
450
451        assert_eq!(client.list_known_blobs().await.unwrap(), HashSet::new());
452        blobfs.stop().await.unwrap();
453    }
454
455    #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
456    #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
457    #[fuchsia::test]
458    async fn list_known_blobs(blob_impl: blobfs_ramdisk::Implementation) {
459        let blobfs = BlobfsRamdisk::builder()
460            .implementation(blob_impl)
461            .with_blob(&b"blob 1"[..])
462            .with_blob(&b"blob 2"[..])
463            .start()
464            .await
465            .unwrap();
466        let client = Client::for_ramdisk(&blobfs);
467
468        let expected = blobfs.list_blobs().unwrap().into_iter().collect();
469        assert_eq!(client.list_known_blobs().await.unwrap(), expected);
470        blobfs.stop().await.unwrap();
471    }
472
473    #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
474    #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
475    #[fuchsia::test]
476    async fn delete_blob_and_then_list(blob_impl: blobfs_ramdisk::Implementation) {
477        let blobfs = BlobfsRamdisk::builder()
478            .implementation(blob_impl)
479            .with_blob(&b"blob 1"[..])
480            .with_blob(&b"blob 2"[..])
481            .start()
482            .await
483            .unwrap();
484        let client = Client::for_ramdisk(&blobfs);
485
486        let merkle = fuchsia_merkle::root_from_slice(b"blob 1");
487        assert_matches!(client.delete_blob(&merkle).await, Ok(()));
488
489        let expected = HashSet::from([fuchsia_merkle::root_from_slice(b"blob 2")]);
490        assert_eq!(client.list_known_blobs().await.unwrap(), expected);
491        blobfs.stop().await.unwrap();
492    }
493
494    #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
495    #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
496    #[fuchsia::test]
497    async fn delete_nonexistent_blob(blob_impl: blobfs_ramdisk::Implementation) {
498        let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
499        let client = Client::for_ramdisk(&blobfs);
500        let blob_merkle = Hash::from([1; 32]);
501
502        assert_matches!(
503            client.delete_blob(&blob_merkle).await,
504            Err(BlobfsError::Unlink(Status::NOT_FOUND))
505        );
506        blobfs.stop().await.unwrap();
507    }
508
509    #[fuchsia::test]
510    async fn delete_blob_mock() {
511        let (client, mut stream, _, _) = Client::new_test();
512        let blob_merkle = Hash::from([1; 32]);
513        fasync::Task::spawn(async move {
514            match stream.try_next().await.unwrap().unwrap() {
515                fio::DirectoryRequest::Unlink { name, responder, .. } => {
516                    assert_eq!(name, blob_merkle.to_string());
517                    responder.send(Ok(())).unwrap();
518                }
519                other => panic!("unexpected request: {other:?}"),
520            }
521        })
522        .detach();
523
524        assert_matches!(client.delete_blob(&blob_merkle).await, Ok(()));
525    }
526
527    #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
528    #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
529    #[fuchsia::test]
530    async fn has_blob(blob_impl: blobfs_ramdisk::Implementation) {
531        let blobfs = BlobfsRamdisk::builder()
532            .implementation(blob_impl)
533            .with_blob(&b"blob 1"[..])
534            .start()
535            .await
536            .unwrap();
537        let client = Client::for_ramdisk(&blobfs);
538
539        assert!(client.has_blob(&fuchsia_merkle::root_from_slice(b"blob 1")).await);
540        assert!(!client.has_blob(&Hash::from([1; 32])).await);
541
542        blobfs.stop().await.unwrap();
543    }
544
545    #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
546    #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
547    #[fuchsia::test]
548    async fn has_blob_return_false_if_blob_is_partially_written(
549        blob_impl: blobfs_ramdisk::Implementation,
550    ) {
551        let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
552        let client = Client::for_ramdisk(&blobfs);
553
554        let content = &[3; 1024];
555        let hash = fuchsia_merkle::root_from_slice(content);
556        let delivery_content =
557            delivery_blob::Type1Blob::generate(content, delivery_blob::CompressionMode::Always);
558
559        let writer = client.open_blob_for_write(&hash).await.unwrap().into_proxy();
560        assert!(!client.has_blob(&hash).await);
561
562        let n = delivery_content.len();
563        let vmo = writer.get_vmo(n.try_into().unwrap()).await.unwrap().unwrap();
564        assert!(!client.has_blob(&hash).await);
565
566        let () = vmo.write(&delivery_content[0..n - 1], 0).unwrap();
567        let () = writer.bytes_ready((n - 1).try_into().unwrap()).await.unwrap().unwrap();
568        assert!(!client.has_blob(&hash).await);
569
570        let () = vmo.write(&delivery_content[n - 1..], (n - 1).try_into().unwrap()).unwrap();
571        let () = writer.bytes_ready(1.try_into().unwrap()).await.unwrap().unwrap();
572        assert!(client.has_blob(&hash).await);
573
574        blobfs.stop().await.unwrap();
575    }
576
577    async fn fully_write_blob(client: &Client, content: &[u8]) -> Hash {
578        let hash = fuchsia_merkle::root_from_slice(content);
579        let delivery_content =
580            delivery_blob::Type1Blob::generate(content, delivery_blob::CompressionMode::Always);
581        let writer = client.open_blob_for_write(&hash).await.unwrap().into_proxy();
582        let vmo = writer
583            .get_vmo(delivery_content.len().try_into().unwrap())
584            .await
585            .expect("a")
586            .map_err(zx::Status::from_raw)
587            .expect("b");
588        let () = vmo.write(&delivery_content, 0).unwrap();
589        let () =
590            writer.bytes_ready(delivery_content.len().try_into().unwrap()).await.unwrap().unwrap();
591        hash
592    }
593
594    #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
595    #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
596    #[fuchsia::test]
597    async fn filter_to_missing_blobs(blob_impl: blobfs_ramdisk::Implementation) {
598        let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
599        let client = Client::for_ramdisk(&blobfs);
600
601        let missing_hash0 = Hash::from([0; 32]);
602        let missing_hash1 = Hash::from([1; 32]);
603
604        let present_blob0 = fully_write_blob(&client, &[2; 1024]).await;
605        let present_blob1 = fully_write_blob(&client, &[3; 1024]).await;
606
607        assert_eq!(
608            client
609                .filter_to_missing_blobs([
610                    missing_hash0,
611                    missing_hash1,
612                    present_blob0,
613                    present_blob1
614                ])
615                .await,
616            HashSet::from([missing_hash0, missing_hash1])
617        );
618
619        blobfs.stop().await.unwrap();
620    }
621
622    #[fuchsia::test]
623    async fn sync() {
624        let counter = Arc::new(std::sync::atomic::AtomicU32::new(0));
625        let counter_clone = Arc::clone(&counter);
626        let (client, mut stream, _, _) = Client::new_test();
627        fasync::Task::spawn(async move {
628            match stream.try_next().await.unwrap().unwrap() {
629                fio::DirectoryRequest::Sync { responder } => {
630                    counter_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
631                    responder.send(Ok(())).unwrap();
632                }
633                other => panic!("unexpected request: {other:?}"),
634            }
635        })
636        .detach();
637
638        assert_matches!(client.sync().await, Ok(()));
639        assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1);
640    }
641
642    #[fuchsia::test]
643    async fn open_blob_for_write_maps_already_exists() {
644        let (blob_creator, mut blob_creator_stream) =
645            fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
646        let (blob_reader, _) = fidl::endpoints::create_proxy::<ffxfs::BlobReaderMarker>();
647
648        let client = Client::new(
649            fidl::endpoints::create_proxy::<fio::DirectoryMarker>().0,
650            Some(blob_creator),
651            blob_reader,
652            None,
653        )
654        .unwrap();
655
656        fuchsia_async::Task::spawn(async move {
657            match blob_creator_stream.next().await.unwrap().unwrap() {
658                ffxfs::BlobCreatorRequest::Create { hash, allow_existing, responder } => {
659                    assert_eq!(hash, [0; 32]);
660                    assert!(!allow_existing);
661                    let () = responder.send(Err(ffxfs::CreateBlobError::AlreadyExists)).unwrap();
662                }
663                ffxfs::BlobCreatorRequest::NeedsOverwrite { .. } => {
664                    unreachable!("This code path is not yet exercised.");
665                }
666            }
667        })
668        .detach();
669
670        assert_matches!(
671            client.open_blob_for_write(&[0; 32].into()).await,
672            Err(CreateError::AlreadyExists)
673        );
674    }
675
676    #[fuchsia::test]
677    async fn concurrent_list_known_blobs_all_return_full_contents() {
678        use futures::StreamExt;
679        let blobfs = BlobfsRamdisk::builder().start().await.unwrap();
680        let client = Client::for_ramdisk(&blobfs);
681
682        // ReadDirents returns an 8,192 byte buffer, and each entry is 74 bytes [0] (including 64
683        // bytes of filename), so use more than 110 entries to guarantee that listing all contents
684        // requires multiple ReadDirents calls. This isn't necessary to cause conflict, because
685        // each successful listing requires a call to Rewind as well, but it does make conflict
686        // more likely.
687        // [0] https://cs.opensource.google/fuchsia/fuchsia/+/main:sdk/fidl/fuchsia.io/directory.fidl;l=261;drc=9e84e19d3f42240c46d2b0c3c132c2f0b5a3343f
688        for i in 0..256u16 {
689            let _: Hash = fully_write_blob(&client, i.to_le_bytes().as_slice()).await;
690        }
691
692        let () = futures::stream::iter(0..100)
693            .for_each_concurrent(None, |_| async {
694                assert_eq!(client.list_known_blobs().await.unwrap().len(), 256);
695            })
696            .await;
697    }
698}