Skip to main content

blobfs/
lib.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![deny(missing_docs)]
6
7//! Typesafe wrappers around the /blob filesystem.
8
9use fidl::endpoints::ClientEnd;
10use fuchsia_hash::{Hash, ParseHashError};
11use futures::{StreamExt as _, stream};
12use log::{error, info};
13use std::collections::HashSet;
14use thiserror::Error;
15use vfs::execution_scope::ExecutionScope;
16use vfs::file::StreamIoConnection;
17use vfs::{ObjectRequest, ObjectRequestRef, ProtocolsExt};
18use zx::{self as zx, Status};
19use {fidl_fuchsia_fxfs as ffxfs, fidl_fuchsia_io as fio};
20
21pub mod mock;
22pub use mock::Mock;
23
24#[derive(Debug, Error)]
25#[allow(missing_docs)]
26pub enum BlobStatusError {
27    #[error("this client was not created with a blob creator so it cannot write blobs")]
28    WritingNotConfigured,
29
30    #[error("the fidl call returned an unexpected error")]
31    NeedsOverwrite(#[source] Status),
32}
33
34/// Blobfs client errors.
35#[derive(Debug, Error)]
36#[allow(missing_docs)]
37pub enum BlobfsError {
38    #[error("while opening blobfs dir")]
39    OpenDir(#[from] fuchsia_fs::node::OpenError),
40
41    #[error("while cloning the blobfs dir")]
42    CloneDir(#[from] fuchsia_fs::node::CloneError),
43
44    #[error("while listing blobfs dir")]
45    ReadDir(#[source] fuchsia_fs::directory::EnumerateError),
46
47    #[error("while deleting blob")]
48    Unlink(#[source] Status),
49
50    #[error("while sync'ing")]
51    Sync(#[source] Status),
52
53    #[error("while parsing blob merkle hash")]
54    ParseHash(#[from] ParseHashError),
55
56    #[error("FIDL error")]
57    Fidl(#[from] fidl::Error),
58
59    #[error("while connecting to fuchsia.fxfs/BlobCreator")]
60    ConnectToBlobCreator(#[source] anyhow::Error),
61
62    #[error("while connecting to fuchsia.fxfs/BlobReader")]
63    ConnectToBlobReader(#[source] anyhow::Error),
64
65    #[error("while setting the VmexResource")]
66    InitVmexResource(#[source] anyhow::Error),
67
68    #[error("while checking NeedsOverwrite for blob status")]
69    BlobStatus(BlobStatusError),
70}
71
72/// An error encountered while creating a blob
73#[derive(Debug, Error)]
74#[allow(missing_docs)]
75pub enum CreateError {
76    #[error("the blob already exists or is being concurrently written")]
77    AlreadyExists,
78
79    #[error("while creating the blob")]
80    Io(#[source] fuchsia_fs::node::OpenError),
81
82    #[error("while converting the proxy into a client end")]
83    ConvertToClientEnd,
84
85    #[error("FIDL error")]
86    Fidl(#[from] fidl::Error),
87
88    #[error("while calling fuchsia.fxfs/BlobCreator.Create: {0:?}")]
89    BlobCreator(ffxfs::CreateBlobError),
90
91    #[error("this client was not created with a blob creator so it cannot write blobs")]
92    WritingNotConfigured,
93}
94
95/// The response to a `BlobCreator.NeedsOverwrite` call, excepting unexpected internal errors.
96pub enum BlobStatus {
97    /// The blob is present and considered up to date.
98    UpToDate,
99
100    /// The blob is present, but should be overwritten.
101    NeedsOverwrite,
102
103    /// The blob is not present.
104    Absent,
105}
106
107impl From<ffxfs::CreateBlobError> for CreateError {
108    fn from(e: ffxfs::CreateBlobError) -> Self {
109        match e {
110            ffxfs::CreateBlobError::AlreadyExists => CreateError::AlreadyExists,
111            e @ ffxfs::CreateBlobError::Internal => CreateError::BlobCreator(e),
112        }
113    }
114}
115
116/// A builder for [`Client`]
117#[derive(Default)]
118pub struct ClientBuilder {
119    readable: bool,
120    writable: bool,
121    executable: bool,
122}
123
124impl ClientBuilder {
125    /// Opens the /blob directory in the component's namespace with readable, writable, and/or
126    /// executable flags. Connects to the fuchsia.fxfs.BlobCreator and BlobReader if requested.
127    /// Connects to and initializes the VmexResource if `use_vmex` is set. Returns a `Client`.
128    pub async fn build(self) -> Result<Client, BlobfsError> {
129        let mut flags = fio::Flags::empty();
130        if self.readable {
131            flags |= fio::PERM_READABLE
132        }
133        if self.writable {
134            flags |= fio::PERM_WRITABLE
135        }
136        if self.executable {
137            flags |= fio::PERM_EXECUTABLE
138        }
139        let dir = fuchsia_fs::directory::open_in_namespace("/blob", flags)?;
140        if let Ok(client) = fuchsia_component::client::connect_to_protocol::<
141            fidl_fuchsia_kernel::VmexResourceMarker,
142        >() && let Ok(vmex) = client.get().await
143        {
144            info!("Got vmex resource");
145            vmo_blob::init_vmex_resource(vmex).map_err(BlobfsError::InitVmexResource)?;
146        }
147        let reader = fuchsia_component::client::connect_to_protocol::<ffxfs::BlobReaderMarker>()
148            .map_err(BlobfsError::ConnectToBlobReader)?;
149        let creator = if self.writable {
150            Some(
151                fuchsia_component::client::connect_to_protocol::<ffxfs::BlobCreatorMarker>()
152                    .map_err(BlobfsError::ConnectToBlobCreator)?,
153            )
154        } else {
155            None
156        };
157
158        Ok(Client { dir, creator, reader })
159    }
160
161    /// If set, [`Client`] will connect to /blob in the current component's namespace with
162    /// [`fio::PERM_READABLE`].
163    pub fn readable(self) -> Self {
164        Self { readable: true, ..self }
165    }
166
167    /// If set, [`Client`] will connect to /blob in the current component's namespace with
168    /// [`fio::PERM_WRITABLE`] which needed so that [`Client::delete_blob`] can unlink the file,
169    /// and [`Client`] will connect to and use fuchsia.fxfs/BlobCreator for writes.
170    pub fn writable(self) -> Self {
171        Self { writable: true, ..self }
172    }
173
174    /// If set, [`Client`] will connect to /blob in the current component's namespace with
175    /// [`fio::PERM_EXECUTABLE`].
176    pub fn executable(self) -> Self {
177        Self { executable: true, ..self }
178    }
179}
180
181impl Client {
182    /// Create an empty `ClientBuilder`
183    pub fn builder() -> ClientBuilder {
184        Default::default()
185    }
186}
187/// Blobfs client
188#[derive(Debug, Clone)]
189pub struct Client {
190    dir: fio::DirectoryProxy,
191    creator: Option<ffxfs::BlobCreatorProxy>,
192    reader: ffxfs::BlobReaderProxy,
193}
194
195impl Client {
196    /// Returns a client connected to the given blob directory, BlobCreatorProxy, and
197    /// BlobReaderProxy. If `vmex` is passed in, sets the VmexResource, which is used to mark blobs
198    /// as executable. If `creator` is not supplied, writes will fail.
199    pub fn new(
200        dir: fio::DirectoryProxy,
201        creator: Option<ffxfs::BlobCreatorProxy>,
202        reader: ffxfs::BlobReaderProxy,
203        vmex: Option<zx::Resource>,
204    ) -> Result<Self, anyhow::Error> {
205        if let Some(vmex) = vmex {
206            vmo_blob::init_vmex_resource(vmex)?;
207        }
208        Ok(Self { dir, creator, reader })
209    }
210
211    /// Creates a new client backed by the returned request stream. This constructor should not be
212    /// used outside of tests.
213    ///
214    /// # Panics
215    ///
216    /// Panics on error
217    pub fn new_test() -> (
218        Self,
219        fio::DirectoryRequestStream,
220        ffxfs::BlobReaderRequestStream,
221        ffxfs::BlobCreatorRequestStream,
222    ) {
223        let (dir, dir_stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
224        let (reader, reader_stream) =
225            fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobReaderMarker>();
226        let (creator, creator_stream) =
227            fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
228
229        (Self { dir, creator: Some(creator), reader }, dir_stream, reader_stream, creator_stream)
230    }
231
232    /// Creates a new client backed by the returned mock. This constructor should not be used
233    /// outside of tests.
234    ///
235    /// # Panics
236    ///
237    /// Panics on error
238    pub fn new_mock() -> (Self, mock::Mock) {
239        let (dir, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
240        let (reader, reader_stream) =
241            fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobReaderMarker>();
242        let (creator, creator_stream) =
243            fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
244
245        (
246            Self { dir, creator: Some(creator), reader },
247            mock::Mock { stream, reader_stream, creator_stream },
248        )
249    }
250
251    /// Returns the read-only VMO backing the blob.
252    pub async fn get_blob_vmo(&self, hash: &Hash) -> Result<zx::Vmo, GetBlobVmoError> {
253        self.reader
254            .get_vmo(hash)
255            .await
256            .map_err(GetBlobVmoError::Fidl)?
257            .map_err(|s| GetBlobVmoError::GetVmo(Status::from_raw(s)))
258    }
259
260    /// Open a blob for read using open3. `scope` will only be used if the client was configured to
261    /// use fuchsia.fxfs.BlobReader.
262    pub fn open_blob_for_read(
263        &self,
264        blob: &Hash,
265        flags: fio::Flags,
266        scope: ExecutionScope,
267        object_request: ObjectRequestRef<'_>,
268    ) -> Result<(), zx::Status> {
269        if flags.rights().is_some_and(|rights| rights.contains(fio::Operations::WRITE_BYTES)) {
270            return Err(zx::Status::ACCESS_DENIED);
271        }
272        if flags.creation_mode() != vfs::CreationMode::Never {
273            return Err(zx::Status::NOT_SUPPORTED);
274        }
275        // Errors below will be communicated via the `object_request` channel.
276        let object_request = object_request.take();
277        let () = open_blob_with_reader(self.reader.clone(), *blob, scope, flags, object_request);
278        Ok(())
279    }
280
281    /// Returns the list of known blobs in blobfs.
282    pub async fn list_known_blobs(&self) -> Result<HashSet<Hash>, BlobfsError> {
283        // fuchsia.io.Directory.ReadDirents uses a per-connection index into the array of
284        // directory entries. To prevent contention over this index by concurrent calls (either
285        // from concurrent calls to list_known_blobs on this object, or on clones of this object,
286        // or other clones of the DirectoryProxy this object was made from), create a new
287        // connection which will have its own index.
288        let private_connection = fuchsia_fs::directory::clone(&self.dir)?;
289        fuchsia_fs::directory::readdir(&private_connection)
290            .await
291            .map_err(BlobfsError::ReadDir)?
292            .into_iter()
293            .filter(|entry| entry.kind == fuchsia_fs::directory::DirentKind::File)
294            .map(|entry| entry.name.parse().map_err(BlobfsError::ParseHash))
295            .collect()
296    }
297
298    /// Delete the blob with the given merkle hash.
299    pub async fn delete_blob(&self, blob: &Hash) -> Result<(), BlobfsError> {
300        self.dir
301            .unlink(&blob.to_string(), &fio::UnlinkOptions::default())
302            .await?
303            .map_err(|s| BlobfsError::Unlink(Status::from_raw(s)))
304    }
305
306    /// Open a new blob for write.
307    pub async fn open_blob_for_write(
308        &self,
309        blob: &Hash,
310        allow_existing: bool,
311    ) -> Result<ClientEnd<ffxfs::BlobWriterMarker>, CreateError> {
312        let Some(creator) = &self.creator else {
313            return Err(CreateError::WritingNotConfigured);
314        };
315        Ok(creator.create(blob, allow_existing).await??)
316    }
317
318    /// Returns whether blobfs has a blob with the given hash and blobfs considers it up to date.
319    pub async fn blob_present_and_up_to_date(&self, blob: &Hash) -> bool {
320        // This call is only used when we're considering writing a blob, so we should have a
321        // creator.
322        matches!(
323            self.creator.as_ref().expect("Missing BlobCreator access").needs_overwrite(blob).await,
324            Ok(Ok(false))
325        )
326    }
327
328    /// Looks up the current status of a blob using `BlobCreator.NeedsOverwrite`.
329    pub async fn blob_status(&self, blob: &Hash) -> Result<BlobStatus, BlobfsError> {
330        let Some(creator) = &self.creator else {
331            return Err(BlobfsError::BlobStatus(BlobStatusError::WritingNotConfigured));
332        };
333        match creator.needs_overwrite(blob).await? {
334            Ok(true) => Ok(BlobStatus::NeedsOverwrite),
335            Ok(false) => Ok(BlobStatus::UpToDate),
336            Err(status) if status == Status::NOT_FOUND.into_raw() => Ok(BlobStatus::Absent),
337            Err(s) => {
338                Err(BlobfsError::BlobStatus(BlobStatusError::NeedsOverwrite(Status::from_raw(s))))
339            }
340        }
341    }
342
343    /// Determines which blobs of `candidates` are missing from blobfs.
344    /// TODO(https://fxbug.dev/338477132) This fn is used during resolves after a meta.far is
345    /// fetched to determine which content blobs and subpackage meta.fars need to be fetched.
346    /// On c++blobfs, opening a partially written blob keeps that blob alive, creating the
347    /// following race condition:
348    /// 1. blob is partially written by resolve A
349    /// 2. blob is opened by this fn to check for presence by concurrent resolve B
350    /// 3. resolve A encounters an error and retries the fetch, which attempts to open the blob for
351    ///    write, which collides with the partially written blob from (1) that is being kept alive
352    ///    by (2) and so fails
353    pub async fn filter_to_missing_blobs(
354        &self,
355        candidates: impl IntoIterator<Item = Hash>,
356    ) -> HashSet<Hash> {
357        // Attempt to open each blob instead of using ReadDirents to catch more forms of filesystem
358        // metadata corruption.
359        // We don't use ReadDirents even as a pre-filter because emulator testing suggests
360        // ReadDirents on an fxblob with 1,000 blobs takes as long as ~60 sequential has_blob calls
361        // on missing blobs, and it's about 5x worse on c++blobfs (on which both ReadDirents is
362        // slower and has_blob is faster). The minor speedup on packages with a great number of
363        // missing blobs is not worth a rarely taken branch deep within package resolution.
364        stream::iter(candidates)
365            .map(move |blob| async move {
366                if self.blob_present_and_up_to_date(&blob).await { None } else { Some(blob) }
367            })
368            // Emulator testing suggests both c++blobfs and fxblob show diminishing returns after
369            // even three concurrent `has_blob` calls.
370            .buffer_unordered(10)
371            .filter_map(|blob| async move { blob })
372            .collect()
373            .await
374    }
375
376    /// Call fuchsia.io/Node.Sync on the blobfs directory.
377    pub async fn sync(&self) -> Result<(), BlobfsError> {
378        self.dir.sync().await?.map_err(zx::Status::from_raw).map_err(BlobfsError::Sync)
379    }
380}
381
382/// Spawns a task on `scope` to attempt opening `blob` via `reader`. Creates a file connection to
383/// the blob using [`vmo_blob::VmoBlob`]. Errors will be sent via `object_request` asynchronously.
384fn open_blob_with_reader<P: ProtocolsExt + Send>(
385    reader: ffxfs::BlobReaderProxy,
386    blob_hash: Hash,
387    scope: ExecutionScope,
388    protocols: P,
389    object_request: ObjectRequest,
390) {
391    scope.clone().spawn(object_request.handle_async(async move |object_request| {
392        let get_vmo_result = reader.get_vmo(&blob_hash.into()).await.map_err(|fidl_error| {
393            if let fidl::Error::ClientChannelClosed { status, .. } = fidl_error {
394                error!("Blob reader channel closed: {:?}", status);
395                status
396            } else {
397                error!("Transport error on get_vmo: {:?}", fidl_error);
398                zx::Status::INTERNAL
399            }
400        })?;
401        let vmo = get_vmo_result.map_err(zx::Status::from_raw)?;
402        let vmo_blob = vmo_blob::VmoBlob::new(vmo);
403        object_request
404            .create_connection::<StreamIoConnection<_>, _>(scope, vmo_blob, protocols)
405            .await
406    }));
407}
408
409#[derive(thiserror::Error, Debug)]
410#[allow(missing_docs)]
411pub enum GetBlobVmoError {
412    #[error("getting the vmo")]
413    GetVmo(#[source] Status),
414
415    #[error("opening the blob")]
416    OpenBlob(#[source] fuchsia_fs::node::OpenError),
417
418    #[error("making a fidl request")]
419    Fidl(#[source] fidl::Error),
420}
421
422#[cfg(test)]
423impl Client {
424    /// Constructs a new [`Client`] connected to the provided [`BlobfsRamdisk`]. Tests in this
425    /// crate should use this constructor rather than [`BlobfsRamdisk::client`], which returns
426    /// the non-cfg(test) build of this crate's [`blobfs::Client`]. While tests could use the
427    /// [`blobfs::Client`] returned by [`BlobfsRamdisk::client`], it will be a different type than
428    /// [`super::Client`], and the tests could not access its private members or any cfg(test)
429    /// specific functionality.
430    ///
431    /// # Panics
432    ///
433    /// Panics on error.
434    pub fn for_ramdisk(blobfs: &blobfs_ramdisk::BlobfsRamdisk) -> Self {
435        Self::new(
436            blobfs.root_dir_proxy().unwrap(),
437            Some(blobfs.blob_creator_proxy().unwrap()),
438            blobfs.blob_reader_proxy().unwrap(),
439            None,
440        )
441        .unwrap()
442    }
443}
444
445#[cfg(test)]
446#[allow(clippy::bool_assert_comparison)]
447mod tests {
448    use super::*;
449    use assert_matches::assert_matches;
450    use blobfs_ramdisk::BlobfsRamdisk;
451    use fuchsia_async as fasync;
452    use futures::stream::TryStreamExt as _;
453    use std::sync::Arc;
454    use test_case::test_case;
455
456    #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
457    #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
458    #[fuchsia::test]
459    async fn list_known_blobs_empty(blob_impl: blobfs_ramdisk::Implementation) {
460        let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
461        let client = Client::for_ramdisk(&blobfs);
462
463        assert_eq!(client.list_known_blobs().await.unwrap(), HashSet::new());
464        blobfs.stop().await.unwrap();
465    }
466
467    #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
468    #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
469    #[fuchsia::test]
470    async fn list_known_blobs(blob_impl: blobfs_ramdisk::Implementation) {
471        let blobfs = BlobfsRamdisk::builder()
472            .implementation(blob_impl)
473            .with_blob(&b"blob 1"[..])
474            .with_blob(&b"blob 2"[..])
475            .start()
476            .await
477            .unwrap();
478        let client = Client::for_ramdisk(&blobfs);
479
480        let expected = blobfs.list_blobs().unwrap().into_iter().collect();
481        assert_eq!(client.list_known_blobs().await.unwrap(), expected);
482        blobfs.stop().await.unwrap();
483    }
484
485    #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
486    #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
487    #[fuchsia::test]
488    async fn delete_blob_and_then_list(blob_impl: blobfs_ramdisk::Implementation) {
489        let blobfs = BlobfsRamdisk::builder()
490            .implementation(blob_impl)
491            .with_blob(&b"blob 1"[..])
492            .with_blob(&b"blob 2"[..])
493            .start()
494            .await
495            .unwrap();
496        let client = Client::for_ramdisk(&blobfs);
497
498        let merkle = fuchsia_merkle::root_from_slice(b"blob 1");
499        assert_matches!(client.delete_blob(&merkle).await, Ok(()));
500
501        let expected = HashSet::from([fuchsia_merkle::root_from_slice(b"blob 2")]);
502        assert_eq!(client.list_known_blobs().await.unwrap(), expected);
503        blobfs.stop().await.unwrap();
504    }
505
506    #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
507    #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
508    #[fuchsia::test]
509    async fn delete_nonexistent_blob(blob_impl: blobfs_ramdisk::Implementation) {
510        let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
511        let client = Client::for_ramdisk(&blobfs);
512        let blob_merkle = Hash::from([1; 32]);
513
514        assert_matches!(
515            client.delete_blob(&blob_merkle).await,
516            Err(BlobfsError::Unlink(Status::NOT_FOUND))
517        );
518        blobfs.stop().await.unwrap();
519    }
520
521    #[fuchsia::test]
522    async fn delete_blob_mock() {
523        let (client, mut stream, _, _) = Client::new_test();
524        let blob_merkle = Hash::from([1; 32]);
525        fasync::Task::spawn(async move {
526            match stream.try_next().await.unwrap().unwrap() {
527                fio::DirectoryRequest::Unlink { name, responder, .. } => {
528                    assert_eq!(name, blob_merkle.to_string());
529                    responder.send(Ok(())).unwrap();
530                }
531                other => panic!("unexpected request: {other:?}"),
532            }
533        })
534        .detach();
535
536        assert_matches!(client.delete_blob(&blob_merkle).await, Ok(()));
537    }
538
539    #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
540    #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
541    #[fuchsia::test]
542    async fn has_blob(blob_impl: blobfs_ramdisk::Implementation) {
543        let blobfs = BlobfsRamdisk::builder()
544            .implementation(blob_impl)
545            .with_blob(&b"blob 1"[..])
546            .start()
547            .await
548            .unwrap();
549        let client = Client::for_ramdisk(&blobfs);
550
551        assert!(
552            client.blob_present_and_up_to_date(&fuchsia_merkle::root_from_slice(b"blob 1")).await
553        );
554        assert!(!client.blob_present_and_up_to_date(&Hash::from([1; 32])).await);
555
556        blobfs.stop().await.unwrap();
557    }
558
559    #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
560    #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
561    #[fuchsia::test]
562    async fn has_blob_return_false_if_blob_is_partially_written(
563        blob_impl: blobfs_ramdisk::Implementation,
564    ) {
565        let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
566        let client = Client::for_ramdisk(&blobfs);
567
568        let content = &[3; 1024];
569        let hash = fuchsia_merkle::root_from_slice(content);
570        let delivery_content =
571            delivery_blob::Type1Blob::generate(content, delivery_blob::CompressionMode::Always);
572
573        let writer = client.open_blob_for_write(&hash, false).await.unwrap().into_proxy();
574        assert!(!client.blob_present_and_up_to_date(&hash).await);
575
576        let n = delivery_content.len();
577        let vmo = writer.get_vmo(n.try_into().unwrap()).await.unwrap().unwrap();
578        assert!(!client.blob_present_and_up_to_date(&hash).await);
579
580        let () = vmo.write(&delivery_content[0..n - 1], 0).unwrap();
581        let () = writer.bytes_ready((n - 1).try_into().unwrap()).await.unwrap().unwrap();
582        assert!(!client.blob_present_and_up_to_date(&hash).await);
583
584        let () = vmo.write(&delivery_content[n - 1..], (n - 1).try_into().unwrap()).unwrap();
585        let () = writer.bytes_ready(1.try_into().unwrap()).await.unwrap().unwrap();
586        assert!(client.blob_present_and_up_to_date(&hash).await);
587
588        blobfs.stop().await.unwrap();
589    }
590
591    async fn fully_write_blob(client: &Client, content: &[u8]) -> Hash {
592        let hash = fuchsia_merkle::root_from_slice(content);
593        let delivery_content =
594            delivery_blob::Type1Blob::generate(content, delivery_blob::CompressionMode::Always);
595        let writer = client.open_blob_for_write(&hash, false).await.unwrap().into_proxy();
596        let vmo = writer
597            .get_vmo(delivery_content.len().try_into().unwrap())
598            .await
599            .expect("a")
600            .map_err(zx::Status::from_raw)
601            .expect("b");
602        let () = vmo.write(&delivery_content, 0).unwrap();
603        let () =
604            writer.bytes_ready(delivery_content.len().try_into().unwrap()).await.unwrap().unwrap();
605        hash
606    }
607
608    #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
609    #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
610    #[fuchsia::test]
611    async fn filter_to_missing_blobs(blob_impl: blobfs_ramdisk::Implementation) {
612        let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
613        let client = Client::for_ramdisk(&blobfs);
614
615        let missing_hash0 = Hash::from([0; 32]);
616        let missing_hash1 = Hash::from([1; 32]);
617
618        let present_blob0 = fully_write_blob(&client, &[2; 1024]).await;
619        let present_blob1 = fully_write_blob(&client, &[3; 1024]).await;
620
621        assert_eq!(
622            client
623                .filter_to_missing_blobs([
624                    missing_hash0,
625                    missing_hash1,
626                    present_blob0,
627                    present_blob1
628                ])
629                .await,
630            HashSet::from([missing_hash0, missing_hash1])
631        );
632
633        blobfs.stop().await.unwrap();
634    }
635
636    #[fuchsia::test]
637    async fn sync() {
638        let counter = Arc::new(std::sync::atomic::AtomicU32::new(0));
639        let counter_clone = Arc::clone(&counter);
640        let (client, mut stream, _, _) = Client::new_test();
641        fasync::Task::spawn(async move {
642            match stream.try_next().await.unwrap().unwrap() {
643                fio::DirectoryRequest::Sync { responder } => {
644                    counter_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
645                    responder.send(Ok(())).unwrap();
646                }
647                other => panic!("unexpected request: {other:?}"),
648            }
649        })
650        .detach();
651
652        assert_matches!(client.sync().await, Ok(()));
653        assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1);
654    }
655
656    #[fuchsia::test]
657    async fn open_blob_for_write_maps_already_exists() {
658        let (blob_creator, mut blob_creator_stream) =
659            fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
660        let (blob_reader, _) = fidl::endpoints::create_proxy::<ffxfs::BlobReaderMarker>();
661
662        let client = Client::new(
663            fidl::endpoints::create_proxy::<fio::DirectoryMarker>().0,
664            Some(blob_creator),
665            blob_reader,
666            None,
667        )
668        .unwrap();
669
670        fuchsia_async::Task::spawn(async move {
671            match blob_creator_stream.next().await.unwrap().unwrap() {
672                ffxfs::BlobCreatorRequest::Create { hash, allow_existing, responder } => {
673                    assert_eq!(hash, [0; 32]);
674                    assert!(!allow_existing);
675                    let () = responder.send(Err(ffxfs::CreateBlobError::AlreadyExists)).unwrap();
676                }
677                ffxfs::BlobCreatorRequest::NeedsOverwrite { .. } => {
678                    unreachable!("This code path is not yet exercised.");
679                }
680            }
681        })
682        .detach();
683
684        assert_matches!(
685            client.open_blob_for_write(&[0; 32].into(), false).await,
686            Err(CreateError::AlreadyExists)
687        );
688    }
689
690    #[fuchsia::test]
691    async fn concurrent_list_known_blobs_all_return_full_contents() {
692        use futures::StreamExt;
693        let blobfs = BlobfsRamdisk::builder().start().await.unwrap();
694        let client = Client::for_ramdisk(&blobfs);
695
696        // ReadDirents returns an 8,192 byte buffer, and each entry is 74 bytes [0] (including 64
697        // bytes of filename), so use more than 110 entries to guarantee that listing all contents
698        // requires multiple ReadDirents calls. This isn't necessary to cause conflict, because
699        // each successful listing requires a call to Rewind as well, but it does make conflict
700        // more likely.
701        // [0] https://cs.opensource.google/fuchsia/fuchsia/+/main:sdk/fidl/fuchsia.io/directory.fidl;l=261;drc=9e84e19d3f42240c46d2b0c3c132c2f0b5a3343f
702        for i in 0..256u16 {
703            let _: Hash = fully_write_blob(&client, i.to_le_bytes().as_slice()).await;
704        }
705
706        let () = futures::stream::iter(0..100)
707            .for_each_concurrent(None, |_| async {
708                assert_eq!(client.list_known_blobs().await.unwrap().len(), 256);
709            })
710            .await;
711    }
712}