blobfs/
lib.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![deny(missing_docs)]
6
7//! Typesafe wrappers around the /blob filesystem.
8
9use fidl::endpoints::{Proxy as _, ServerEnd};
10use fuchsia_hash::{Hash, ParseHashError};
11use futures::{stream, StreamExt as _};
12use log::{error, info, warn};
13use std::collections::HashSet;
14use thiserror::Error;
15use vfs::common::send_on_open_with_error;
16use vfs::execution_scope::ExecutionScope;
17use vfs::file::StreamIoConnection;
18use vfs::{ObjectRequest, ObjectRequestRef, ProtocolsExt, ToObjectRequest as _};
19use zx::{self as zx, AsHandleRef as _, Status};
20use {fidl_fuchsia_fxfs as ffxfs, fidl_fuchsia_io as fio, fidl_fuchsia_pkg as fpkg};
21
22pub mod mock;
23pub use mock::Mock;
24
25/// Blobfs client errors.
26#[derive(Debug, Error)]
27#[allow(missing_docs)]
28pub enum BlobfsError {
29    #[error("while opening blobfs dir")]
30    OpenDir(#[from] fuchsia_fs::node::OpenError),
31
32    #[error("while cloning the blobfs dir")]
33    CloneDir(#[from] fuchsia_fs::node::CloneError),
34
35    #[error("while listing blobfs dir")]
36    ReadDir(#[source] fuchsia_fs::directory::EnumerateError),
37
38    #[error("while deleting blob")]
39    Unlink(#[source] Status),
40
41    #[error("while sync'ing")]
42    Sync(#[source] Status),
43
44    #[error("while parsing blob merkle hash")]
45    ParseHash(#[from] ParseHashError),
46
47    #[error("FIDL error")]
48    Fidl(#[from] fidl::Error),
49
50    #[error("while connecting to fuchsia.fxfs/BlobCreator")]
51    ConnectToBlobCreator(#[source] anyhow::Error),
52
53    #[error("while connecting to fuchsia.fxfs/BlobReader")]
54    ConnectToBlobReader(#[source] anyhow::Error),
55
56    #[error("while setting the VmexResource")]
57    InitVmexResource(#[source] anyhow::Error),
58}
59
60/// An error encountered while creating a blob
61#[derive(Debug, Error)]
62#[allow(missing_docs)]
63pub enum CreateError {
64    #[error("the blob already exists or is being concurrently written")]
65    AlreadyExists,
66
67    #[error("while creating the blob")]
68    Io(#[source] fuchsia_fs::node::OpenError),
69
70    #[error("while converting the proxy into a client end")]
71    ConvertToClientEnd,
72
73    #[error("FIDL error")]
74    Fidl(#[from] fidl::Error),
75
76    #[error("while calling fuchsia.fxfs/BlobCreator.Create: {0:?}")]
77    BlobCreator(ffxfs::CreateBlobError),
78}
79
80impl From<ffxfs::CreateBlobError> for CreateError {
81    fn from(e: ffxfs::CreateBlobError) -> Self {
82        match e {
83            ffxfs::CreateBlobError::AlreadyExists => CreateError::AlreadyExists,
84            e @ ffxfs::CreateBlobError::Internal => CreateError::BlobCreator(e),
85        }
86    }
87}
88
89/// A builder for [`Client`]
90#[derive(Default)]
91pub struct ClientBuilder {
92    use_reader: Reader,
93    use_creator: bool,
94    readable: bool,
95    writable: bool,
96    executable: bool,
97}
98
99#[derive(Default)]
100enum Reader {
101    #[default]
102    DontUse,
103    Use {
104        use_vmex: bool,
105    },
106}
107
108impl ClientBuilder {
109    /// Opens the /blob directory in the component's namespace with readable, writable, and/or
110    /// executable flags. Connects to the fuchsia.fxfs.BlobCreator and BlobReader if requested.
111    /// Connects to and initializes the VmexResource if `use_vmex` is set. Returns a `Client`.
112    pub async fn build(self) -> Result<Client, BlobfsError> {
113        let mut flags = fio::Flags::empty();
114        if self.readable {
115            flags |= fio::PERM_READABLE
116        }
117        if self.writable {
118            flags |= fio::PERM_WRITABLE
119        }
120        if self.executable {
121            flags |= fio::PERM_EXECUTABLE
122        }
123        let dir = fuchsia_fs::directory::open_in_namespace("/blob", flags)?;
124        let reader = match self.use_reader {
125            Reader::DontUse => None,
126            Reader::Use { use_vmex } => {
127                if use_vmex {
128                    if let Ok(client) = fuchsia_component::client::connect_to_protocol::<
129                        fidl_fuchsia_kernel::VmexResourceMarker,
130                    >() {
131                        if let Ok(vmex) = client.get().await {
132                            info!("Got vmex resource");
133                            vmo_blob::init_vmex_resource(vmex)
134                                .map_err(BlobfsError::InitVmexResource)?;
135                        }
136                    }
137                }
138                Some(
139                    fuchsia_component::client::connect_to_protocol::<ffxfs::BlobReaderMarker>()
140                        .map_err(BlobfsError::ConnectToBlobReader)?,
141                )
142            }
143        };
144
145        let creator = if self.use_creator {
146            Some(
147                fuchsia_component::client::connect_to_protocol::<ffxfs::BlobCreatorMarker>()
148                    .map_err(BlobfsError::ConnectToBlobCreator)?,
149            )
150        } else {
151            None
152        };
153
154        Ok(Client { dir, creator, reader })
155    }
156
157    /// [`Client`] will connect to and use fuchsia.fxfs/BlobReader for reads. Sets the VmexResource
158    /// for `Client`. The VmexResource is used by `get_backing_memory` to mark blobs as executable.
159    pub fn use_reader(self) -> Self {
160        Self { use_reader: Reader::Use { use_vmex: true }, ..self }
161    }
162
163    /// [`Client`] will connect to and use fuchsia.fxfs/BlobReader for reads. Does not set the
164    /// VmexResource.
165    pub fn use_reader_no_vmex(self) -> Self {
166        Self { use_reader: Reader::Use { use_vmex: false }, ..self }
167    }
168
169    /// If set, [`Client`] will connect to and use fuchsia.fxfs/BlobCreator for writes.
170    pub fn use_creator(self) -> Self {
171        Self { use_creator: true, ..self }
172    }
173
174    /// If set, [`Client`] will connect to /blob in the current component's namespace with
175    /// [`fio::PERM_READABLE`].
176    pub fn readable(self) -> Self {
177        Self { readable: true, ..self }
178    }
179
180    /// If set, [`Client`] will connect to /blob in the current component's namespace with
181    /// [`fio::PERM_WRITABLE`] which needed so that [`Client::delete_blob`] can unlink the file.
182    pub fn writable(self) -> Self {
183        Self { writable: true, ..self }
184    }
185
186    /// If set, [`Client`] will connect to /blob in the current component's namespace with
187    /// [`fio::PERM_EXECUTABLE`].
188    pub fn executable(self) -> Self {
189        Self { executable: true, ..self }
190    }
191}
192
193impl Client {
194    /// Create an empty `ClientBuilder`
195    pub fn builder() -> ClientBuilder {
196        Default::default()
197    }
198}
199/// Blobfs client
200#[derive(Debug, Clone)]
201pub struct Client {
202    dir: fio::DirectoryProxy,
203    creator: Option<ffxfs::BlobCreatorProxy>,
204    reader: Option<ffxfs::BlobReaderProxy>,
205}
206
207impl Client {
208    /// Returns a client connected to the given blob directory, BlobCreatorProxy, and
209    /// BlobReaderProxy. If `vmex` is passed in, sets the VmexResource, which is used to mark blobs
210    /// as executable. If `creator` or `reader` is not supplied, writes or reads respectively will
211    /// be performed through the blob directory.
212    pub fn new(
213        dir: fio::DirectoryProxy,
214        creator: Option<ffxfs::BlobCreatorProxy>,
215        reader: Option<ffxfs::BlobReaderProxy>,
216        vmex: Option<zx::Resource>,
217    ) -> Result<Self, anyhow::Error> {
218        if let Some(vmex) = vmex {
219            vmo_blob::init_vmex_resource(vmex)?;
220        }
221        Ok(Self { dir, creator, reader })
222    }
223
224    /// Creates a new client backed by the returned request stream. This constructor should not be
225    /// used outside of tests.
226    ///
227    /// # Panics
228    ///
229    /// Panics on error
230    pub fn new_test() -> (Self, fio::DirectoryRequestStream) {
231        let (dir, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
232
233        (Self { dir, creator: None, reader: None }, stream)
234    }
235
236    /// Creates a new client backed by the returned mock. This constructor should not be used
237    /// outside of tests.
238    ///
239    /// # Panics
240    ///
241    /// Panics on error
242    pub fn new_mock() -> (Self, mock::Mock) {
243        let (dir, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
244
245        (Self { dir, creator: None, reader: None }, mock::Mock { stream })
246    }
247
248    /// Returns the read-only VMO backing the blob.
249    pub async fn get_blob_vmo(&self, hash: &Hash) -> Result<zx::Vmo, GetBlobVmoError> {
250        if let Some(reader) = &self.reader {
251            reader
252                .get_vmo(hash)
253                .await
254                .map_err(GetBlobVmoError::Fidl)?
255                .map_err(|s| GetBlobVmoError::GetVmo(Status::from_raw(s)))
256        } else {
257            let blob =
258                fuchsia_fs::directory::open_file(&self.dir, &hash.to_string(), fio::PERM_READABLE)
259                    .await
260                    .map_err(GetBlobVmoError::OpenBlob)?;
261            blob.get_backing_memory(fio::VmoFlags::READ)
262                .await
263                .map_err(GetBlobVmoError::Fidl)?
264                .map_err(|s| GetBlobVmoError::GetVmo(Status::from_raw(s)))
265        }
266    }
267
268    /// Open a blob for read. `scope` will only be used if the client was configured to use
269    /// fuchsia.fxfs.BlobReader.
270    pub fn deprecated_open_blob_for_read(
271        &self,
272        blob: &Hash,
273        flags: fio::OpenFlags,
274        scope: ExecutionScope,
275        server_end: ServerEnd<fio::NodeMarker>,
276    ) -> Result<(), fidl::Error> {
277        let describe = flags.contains(fio::OpenFlags::DESCRIBE);
278        // Reject requests that attempt to open blobs as writable.
279        if flags.contains(fio::OpenFlags::RIGHT_WRITABLE) {
280            send_on_open_with_error(describe, server_end, zx::Status::ACCESS_DENIED);
281            return Ok(());
282        }
283        // Reject requests that attempt to create new blobs.
284        if flags.intersects(fio::OpenFlags::CREATE | fio::OpenFlags::CREATE_IF_ABSENT) {
285            send_on_open_with_error(describe, server_end, zx::Status::NOT_SUPPORTED);
286            return Ok(());
287        }
288        // Use blob reader protocol if available, otherwise fallback to fuchsia.io/Directory.Open.
289        if let Some(reader) = &self.reader {
290            let object_request = flags.to_object_request(server_end);
291            let () = open_blob_with_reader(reader.clone(), *blob, scope, flags, object_request);
292            Ok(())
293        } else {
294            self.dir.deprecated_open(flags, fio::ModeType::empty(), &blob.to_string(), server_end)
295        }
296    }
297
298    /// Open a blob for read using open3. `scope` will only be used if the client was configured to
299    /// use fuchsia.fxfs.BlobReader.
300    pub fn open_blob_for_read(
301        &self,
302        blob: &Hash,
303        flags: fio::Flags,
304        scope: ExecutionScope,
305        object_request: ObjectRequestRef<'_>,
306    ) -> Result<(), zx::Status> {
307        // Reject requests that attempt to open blobs as writable.
308        if flags.rights().is_some_and(|rights| rights.contains(fio::Operations::WRITE_BYTES)) {
309            return Err(zx::Status::ACCESS_DENIED);
310        }
311        // Reject requests that attempt to create new blobs.
312        if flags.creation_mode() != vfs::CreationMode::Never {
313            return Err(zx::Status::NOT_SUPPORTED);
314        }
315        // Errors below will be communicated via the `object_request` channel.
316        let object_request = object_request.take();
317        // Use blob reader protocol if available, otherwise fallback to fuchsia.io/Directory.Open3.
318        if let Some(reader) = &self.reader {
319            let () = open_blob_with_reader(reader.clone(), *blob, scope, flags, object_request);
320        } else {
321            let _: Result<(), ()> = self
322                .dir
323                .open(
324                    &blob.to_string(),
325                    flags,
326                    &object_request.options(),
327                    object_request.into_channel(),
328                )
329                .map_err(|fidl_error| warn!("Failed to open blob {:?}: {:?}", blob, fidl_error));
330        }
331        Ok(())
332    }
333
334    /// Returns the list of known blobs in blobfs.
335    pub async fn list_known_blobs(&self) -> Result<HashSet<Hash>, BlobfsError> {
336        // fuchsia.io.Directory.ReadDirents uses a per-connection index into the array of
337        // directory entries. To prevent contention over this index by concurrent calls (either
338        // from concurrent calls to list_known_blobs on this object, or on clones of this object,
339        // or other clones of the DirectoryProxy this object was made from), create a new
340        // connection which will have its own index.
341        let private_connection = fuchsia_fs::directory::clone(&self.dir)?;
342        fuchsia_fs::directory::readdir(&private_connection)
343            .await
344            .map_err(BlobfsError::ReadDir)?
345            .into_iter()
346            .filter(|entry| entry.kind == fuchsia_fs::directory::DirentKind::File)
347            .map(|entry| entry.name.parse().map_err(BlobfsError::ParseHash))
348            .collect()
349    }
350
351    /// Delete the blob with the given merkle hash.
352    pub async fn delete_blob(&self, blob: &Hash) -> Result<(), BlobfsError> {
353        self.dir
354            .unlink(&blob.to_string(), &fio::UnlinkOptions::default())
355            .await?
356            .map_err(|s| BlobfsError::Unlink(Status::from_raw(s)))
357    }
358
359    /// Open a new blob for write.
360    pub async fn open_blob_for_write(&self, blob: &Hash) -> Result<fpkg::BlobWriter, CreateError> {
361        Ok(if let Some(blob_creator) = &self.creator {
362            fpkg::BlobWriter::Writer(blob_creator.create(blob, false).await??)
363        } else {
364            fpkg::BlobWriter::File(
365                self.open_blob_proxy_from_dir_for_write(blob)
366                    .await?
367                    .into_channel()
368                    .map_err(|_: fio::FileProxy| CreateError::ConvertToClientEnd)?
369                    .into_zx_channel()
370                    .into(),
371            )
372        })
373    }
374
375    /// Open a new blob for write, unconditionally using the blob directory.
376    async fn open_blob_proxy_from_dir_for_write(
377        &self,
378        blob: &Hash,
379    ) -> Result<fio::FileProxy, CreateError> {
380        let flags = fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_WRITABLE | fio::PERM_READABLE;
381
382        let path = delivery_blob::delivery_blob_path(blob);
383        fuchsia_fs::directory::open_file(&self.dir, &path, flags).await.map_err(|e| match e {
384            fuchsia_fs::node::OpenError::OpenError(Status::ACCESS_DENIED) => {
385                CreateError::AlreadyExists
386            }
387            other => CreateError::Io(other),
388        })
389    }
390
391    /// Returns whether blobfs has a blob with the given hash.
392    /// On c++blobfs, this should only be called if there are no concurrent attempts to write the
393    /// blob. On c++blobfs, open connections to even partially written blobs keep the blob alive,
394    /// and so if this call overlaps with a concurrent attempt to create the blob that fails and
395    /// is then retried, this open connection will prevent the partially written blob from being
396    /// removed and block the creation of the new write connection.
397    /// TODO(https://fxbug.dev/294286136) Add GetVmo support to c++blobfs.
398    pub async fn has_blob(&self, blob: &Hash) -> bool {
399        if let Some(reader) = &self.reader {
400            // TODO(https://fxbug.dev/295552228): Use faster API for determining blob presence.
401            matches!(reader.get_vmo(blob).await, Ok(Ok(_)))
402        } else {
403            let file = match fuchsia_fs::directory::open_file_async(
404                &self.dir,
405                &blob.to_string(),
406                fio::Flags::FLAG_SEND_REPRESENTATION | fio::PERM_READABLE,
407            ) {
408                Ok(file) => file,
409                Err(_) => return false,
410            };
411
412            let mut events = file.take_event_stream();
413
414            let event = match events.next().await {
415                None => return false,
416                Some(event) => match event {
417                    Err(_) => return false,
418                    Ok(event) => match event {
419                        fio::FileEvent::OnOpen_ { s, info } => {
420                            if Status::from_raw(s) != Status::OK {
421                                return false;
422                            }
423
424                            match info {
425                                Some(info) => match *info {
426                                    fio::NodeInfoDeprecated::File(fio::FileObject {
427                                        event: Some(event),
428                                        stream: _, // TODO(https://fxbug.dev/293606235): Use stream
429                                    }) => event,
430                                    _ => return false,
431                                },
432                                _ => return false,
433                            }
434                        }
435                        fio::FileEvent::OnRepresentation { payload } => match payload {
436                            fio::Representation::File(fio::FileInfo {
437                                observer: Some(event),
438                                stream: _, // TODO(https://fxbug.dev/293606235): Use stream
439                                ..
440                            }) => event,
441                            _ => return false,
442                        },
443                        fio::FileEvent::_UnknownEvent { .. } => return false,
444                    },
445                },
446            };
447
448            // Check that the USER_0 signal has been asserted on the file's event to make sure we
449            // return false on the edge case of the blob is current being written.
450            match event
451                .wait_handle(zx::Signals::USER_0, zx::MonotonicInstant::INFINITE_PAST)
452                .to_result()
453            {
454                Ok(_) => true,
455                Err(status) => {
456                    if status != Status::TIMED_OUT {
457                        warn!("blobfs: unknown error asserting blob existence: {}", status);
458                    }
459                    false
460                }
461            }
462        }
463    }
464
465    /// Determines which blobs of `candidates` are missing from blobfs.
466    /// TODO(https://fxbug.dev/338477132) This fn is used during resolves after a meta.far is
467    /// fetched to determine which content blobs and subpackage meta.fars need to be fetched.
468    /// On c++blobfs, opening a partially written blob keeps that blob alive, creating the
469    /// following race condition:
470    /// 1. blob is partially written by resolve A
471    /// 2. blob is opened by this fn to check for presence by concurrent resolve B
472    /// 3. resolve A encounters an error and retries the fetch, which attempts to open the blob for
473    ///    write, which collides with the partially written blob from (1) that is being kept alive
474    ///    by (2) and so fails
475    pub async fn filter_to_missing_blobs(&self, candidates: &HashSet<Hash>) -> HashSet<Hash> {
476        // Attempt to open each blob instead of using ReadDirents to catch more forms of filesystem
477        // metadata corruption.
478        // We don't use ReadDirents even as a pre-filter because emulator testing suggests
479        // ReadDirents on an fxblob with 1,000 blobs takes as long as ~60 sequential has_blob calls
480        // on missing blobs, and it's about 5x worse on c++blobfs (on which both ReadDirents is
481        // slower and has_blob is faster). The minor speedup on packages with a great number of
482        // missing blobs is not worth a rarely taken branch deep within package resolution.
483        stream::iter(candidates.clone())
484            .map(move |blob| async move {
485                if self.has_blob(&blob).await {
486                    None
487                } else {
488                    Some(blob)
489                }
490            })
491            // Emulator testing suggests both c++blobfs and fxblob show diminishing returns after
492            // even three concurrent `has_blob` calls.
493            .buffer_unordered(10)
494            .filter_map(|blob| async move { blob })
495            .collect()
496            .await
497    }
498
499    /// Call fuchsia.io/Node.Sync on the blobfs directory.
500    pub async fn sync(&self) -> Result<(), BlobfsError> {
501        self.dir.sync().await?.map_err(zx::Status::from_raw).map_err(BlobfsError::Sync)
502    }
503}
504
505/// Spawns a task on `scope` to attempt opening `blob` via `reader`. Creates a file connection to
506/// the blob using [`vmo_blob::VmoBlob`]. Errors will be sent via `object_request` asynchronously.
507fn open_blob_with_reader<P: ProtocolsExt + Send>(
508    reader: ffxfs::BlobReaderProxy,
509    blob_hash: Hash,
510    scope: ExecutionScope,
511    protocols: P,
512    object_request: ObjectRequest,
513) {
514    scope.clone().spawn(object_request.handle_async(async move |object_request| {
515        let get_vmo_result = reader.get_vmo(&blob_hash.into()).await.map_err(|fidl_error| {
516            if let fidl::Error::ClientChannelClosed { status, .. } = fidl_error {
517                error!("Blob reader channel closed: {:?}", status);
518                status
519            } else {
520                error!("Transport error on get_vmo: {:?}", fidl_error);
521                zx::Status::INTERNAL
522            }
523        })?;
524        let vmo = get_vmo_result.map_err(zx::Status::from_raw)?;
525        let vmo_blob = vmo_blob::VmoBlob::new(vmo);
526        object_request
527            .create_connection::<StreamIoConnection<_>, _>(scope, vmo_blob, protocols)
528            .await
529    }));
530}
531
532#[derive(thiserror::Error, Debug)]
533#[allow(missing_docs)]
534pub enum GetBlobVmoError {
535    #[error("getting the vmo")]
536    GetVmo(#[source] Status),
537
538    #[error("opening the blob")]
539    OpenBlob(#[source] fuchsia_fs::node::OpenError),
540
541    #[error("making a fidl request")]
542    Fidl(#[source] fidl::Error),
543}
544
545#[cfg(test)]
546impl Client {
547    /// Constructs a new [`Client`] connected to the provided [`BlobfsRamdisk`]. Tests in this
548    /// crate should use this constructor rather than [`BlobfsRamdisk::client`], which returns
549    /// the non-cfg(test) build of this crate's [`blobfs::Client`]. While tests could use the
550    /// [`blobfs::Client`] returned by [`BlobfsRamdisk::client`], it will be a different type than
551    /// [`super::Client`], and the tests could not access its private members or any cfg(test)
552    /// specific functionality.
553    ///
554    /// # Panics
555    ///
556    /// Panics on error.
557    pub fn for_ramdisk(blobfs: &blobfs_ramdisk::BlobfsRamdisk) -> Self {
558        Self::new(
559            blobfs.root_dir_proxy().unwrap(),
560            blobfs.blob_creator_proxy().unwrap(),
561            blobfs.blob_reader_proxy().unwrap(),
562            None,
563        )
564        .unwrap()
565    }
566}
567
568#[cfg(test)]
569#[allow(clippy::bool_assert_comparison)]
570mod tests {
571    use super::*;
572    use assert_matches::assert_matches;
573    use blobfs_ramdisk::BlobfsRamdisk;
574    use fuchsia_async as fasync;
575    use futures::stream::TryStreamExt;
576    use maplit::hashset;
577    use std::io::Write as _;
578    use std::sync::Arc;
579
580    #[fasync::run_singlethreaded(test)]
581    async fn list_known_blobs_empty() {
582        let blobfs = BlobfsRamdisk::start().await.unwrap();
583        let client = Client::for_ramdisk(&blobfs);
584
585        assert_eq!(client.list_known_blobs().await.unwrap(), HashSet::new());
586        blobfs.stop().await.unwrap();
587    }
588
589    #[fasync::run_singlethreaded(test)]
590    async fn list_known_blobs() {
591        let blobfs = BlobfsRamdisk::builder()
592            .with_blob(&b"blob 1"[..])
593            .with_blob(&b"blob 2"[..])
594            .start()
595            .await
596            .unwrap();
597        let client = Client::for_ramdisk(&blobfs);
598
599        let expected = blobfs.list_blobs().unwrap().into_iter().collect();
600        assert_eq!(client.list_known_blobs().await.unwrap(), expected);
601        blobfs.stop().await.unwrap();
602    }
603
604    #[fasync::run_singlethreaded(test)]
605    async fn delete_blob_and_then_list() {
606        let blobfs = BlobfsRamdisk::builder()
607            .with_blob(&b"blob 1"[..])
608            .with_blob(&b"blob 2"[..])
609            .start()
610            .await
611            .unwrap();
612        let client = Client::for_ramdisk(&blobfs);
613
614        let merkle = fuchsia_merkle::from_slice(&b"blob 1"[..]).root();
615        assert_matches!(client.delete_blob(&merkle).await, Ok(()));
616
617        let expected = hashset! {fuchsia_merkle::from_slice(&b"blob 2"[..]).root()};
618        assert_eq!(client.list_known_blobs().await.unwrap(), expected);
619        blobfs.stop().await.unwrap();
620    }
621
622    #[fasync::run_singlethreaded(test)]
623    async fn delete_non_existing_blob() {
624        let blobfs = BlobfsRamdisk::start().await.unwrap();
625        let client = Client::for_ramdisk(&blobfs);
626        let blob_merkle = Hash::from([1; 32]);
627
628        assert_matches!(
629            client.delete_blob(&blob_merkle).await,
630            Err(BlobfsError::Unlink(Status::NOT_FOUND))
631        );
632        blobfs.stop().await.unwrap();
633    }
634
635    #[fasync::run_singlethreaded(test)]
636    async fn delete_blob_mock() {
637        let (client, mut stream) = Client::new_test();
638        let blob_merkle = Hash::from([1; 32]);
639        fasync::Task::spawn(async move {
640            match stream.try_next().await.unwrap().unwrap() {
641                fio::DirectoryRequest::Unlink { name, responder, .. } => {
642                    assert_eq!(name, blob_merkle.to_string());
643                    responder.send(Ok(())).unwrap();
644                }
645                other => panic!("unexpected request: {other:?}"),
646            }
647        })
648        .detach();
649
650        assert_matches!(client.delete_blob(&blob_merkle).await, Ok(()));
651    }
652
653    #[fasync::run_singlethreaded(test)]
654    async fn has_blob() {
655        let blobfs = BlobfsRamdisk::builder().with_blob(&b"blob 1"[..]).start().await.unwrap();
656        let client = Client::for_ramdisk(&blobfs);
657
658        assert_eq!(client.has_blob(&fuchsia_merkle::from_slice(&b"blob 1"[..]).root()).await, true);
659        assert_eq!(client.has_blob(&Hash::from([1; 32])).await, false);
660
661        blobfs.stop().await.unwrap();
662    }
663
664    #[fasync::run_singlethreaded(test)]
665    async fn has_blob_fxblob() {
666        let blobfs =
667            BlobfsRamdisk::builder().fxblob().with_blob(&b"blob 1"[..]).start().await.unwrap();
668        let client = Client::for_ramdisk(&blobfs);
669
670        assert!(client.has_blob(&fuchsia_merkle::from_slice(&b"blob 1"[..]).root()).await);
671        assert!(!client.has_blob(&Hash::from([1; 32])).await);
672
673        blobfs.stop().await.unwrap();
674    }
675
676    #[fasync::run_singlethreaded(test)]
677    async fn has_blob_return_false_if_blob_is_partially_written() {
678        let blobfs = BlobfsRamdisk::start().await.unwrap();
679        let client = Client::for_ramdisk(&blobfs);
680
681        let blob = [3; 1024];
682        let hash = fuchsia_merkle::from_slice(&blob).root();
683
684        let mut file = blobfs.root_dir().unwrap().write_file(hash.to_string(), 0o777).unwrap();
685        assert_eq!(client.has_blob(&hash).await, false);
686        file.set_len(blob.len() as u64).unwrap();
687        assert_eq!(client.has_blob(&hash).await, false);
688        file.write_all(&blob[..512]).unwrap();
689        assert_eq!(client.has_blob(&hash).await, false);
690        file.write_all(&blob[512..]).unwrap();
691        assert_eq!(client.has_blob(&hash).await, true);
692
693        blobfs.stop().await.unwrap();
694    }
695
696    async fn resize(blob: &fio::FileProxy, size: usize) {
697        let () = blob.resize(size as u64).await.unwrap().map_err(Status::from_raw).unwrap();
698    }
699
700    async fn write(blob: &fio::FileProxy, bytes: &[u8]) {
701        assert_eq!(
702            blob.write(bytes).await.unwrap().map_err(Status::from_raw).unwrap(),
703            bytes.len() as u64
704        );
705    }
706
707    #[fasync::run_singlethreaded(test)]
708    async fn write_delivery_blob() {
709        let blobfs = BlobfsRamdisk::start().await.unwrap();
710        let client = Client::for_ramdisk(&blobfs);
711
712        let content = [3; 1024];
713        let hash = fuchsia_merkle::from_slice(&content).root();
714        let delivery_content =
715            delivery_blob::Type1Blob::generate(&content, delivery_blob::CompressionMode::Always);
716
717        let proxy = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
718
719        let () = resize(&proxy, delivery_content.len()).await;
720        let () = write(&proxy, &delivery_content).await;
721
722        assert!(client.has_blob(&hash).await);
723
724        blobfs.stop().await.unwrap();
725    }
726
727    /// Wrapper for a blob and its hash. This lets the tests retain ownership of the Blob,
728    /// which is important because it ensures blobfs will not close partially written blobs for the
729    /// duration of the test.
730    struct TestBlob {
731        _blob: fio::FileProxy,
732        hash: Hash,
733    }
734
735    async fn open_blob_only(client: &Client, content: &[u8]) -> TestBlob {
736        let hash = fuchsia_merkle::from_slice(content).root();
737        let _blob = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
738        TestBlob { _blob, hash }
739    }
740
741    async fn open_and_truncate_blob(client: &Client, content: &[u8]) -> TestBlob {
742        let hash = fuchsia_merkle::from_slice(content).root();
743        let _blob = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
744        let () = resize(&_blob, content.len()).await;
745        TestBlob { _blob, hash }
746    }
747
748    async fn partially_write_blob(client: &Client, content: &[u8]) -> TestBlob {
749        let hash = fuchsia_merkle::from_slice(content).root();
750        let _blob = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
751        let content = delivery_blob::generate(delivery_blob::DeliveryBlobType::Type1, content);
752        let () = resize(&_blob, content.len()).await;
753        let () = write(&_blob, &content[..content.len() / 2]).await;
754        TestBlob { _blob, hash }
755    }
756
757    async fn fully_write_blob(client: &Client, content: &[u8]) -> TestBlob {
758        let hash = fuchsia_merkle::from_slice(content).root();
759        let _blob = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
760        let content = delivery_blob::generate(delivery_blob::DeliveryBlobType::Type1, content);
761        let () = resize(&_blob, content.len()).await;
762        let () = write(&_blob, &content).await;
763        TestBlob { _blob, hash }
764    }
765
766    #[fasync::run_singlethreaded(test)]
767    async fn filter_to_missing_blobs() {
768        let blobfs = BlobfsRamdisk::builder().start().await.unwrap();
769        let client = Client::for_ramdisk(&blobfs);
770
771        let missing_hash0 = Hash::from([0; 32]);
772        let missing_hash1 = Hash::from([1; 32]);
773
774        let present_blob0 = fully_write_blob(&client, &[2; 1024]).await;
775        let present_blob1 = fully_write_blob(&client, &[3; 1024]).await;
776
777        assert_eq!(
778            client
779                .filter_to_missing_blobs(
780                    // Pass in <= 20 candidates so the heuristic is not used.
781                    &hashset! { missing_hash0, missing_hash1,
782                        present_blob0.hash, present_blob1.hash
783                    },
784                )
785                .await,
786            hashset! { missing_hash0, missing_hash1 }
787        );
788
789        blobfs.stop().await.unwrap();
790    }
791
792    /// Similar to the above test, except also test that partially written blobs count as missing.
793    #[fasync::run_singlethreaded(test)]
794    async fn filter_to_missing_blobs_with_partially_written_blobs() {
795        let blobfs = BlobfsRamdisk::builder().start().await.unwrap();
796        let client = Client::for_ramdisk(&blobfs);
797
798        // Some blobs are created (but not yet truncated).
799        let missing_blob0 = open_blob_only(&client, &[0; 1024]).await;
800
801        // Some are truncated but not written.
802        let missing_blob1 = open_and_truncate_blob(&client, &[1; 1024]).await;
803
804        // Some are partially written.
805        let missing_blob2 = partially_write_blob(&client, &[2; 1024]).await;
806
807        // Some are fully written.
808        let present_blob = fully_write_blob(&client, &[3; 1024]).await;
809
810        assert_eq!(
811            client
812                .filter_to_missing_blobs(&hashset! {
813                    missing_blob0.hash,
814                    missing_blob1.hash,
815                    missing_blob2.hash,
816                    present_blob.hash
817                },)
818                .await,
819            // All partially written blobs should count as missing.
820            hashset! { missing_blob0.hash, missing_blob1.hash, missing_blob2.hash }
821        );
822
823        blobfs.stop().await.unwrap();
824    }
825
826    #[fasync::run_singlethreaded(test)]
827    async fn sync() {
828        let counter = Arc::new(std::sync::atomic::AtomicU32::new(0));
829        let counter_clone = Arc::clone(&counter);
830        let (client, mut stream) = Client::new_test();
831        fasync::Task::spawn(async move {
832            match stream.try_next().await.unwrap().unwrap() {
833                fio::DirectoryRequest::Sync { responder } => {
834                    counter_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
835                    responder.send(Ok(())).unwrap();
836                }
837                other => panic!("unexpected request: {other:?}"),
838            }
839        })
840        .detach();
841
842        assert_matches!(client.sync().await, Ok(()));
843        assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1);
844    }
845
846    #[fasync::run_singlethreaded(test)]
847    async fn open_blob_for_write_uses_fxblob_if_configured() {
848        let (blob_creator, mut blob_creator_stream) =
849            fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
850        let (blob_reader, _) = fidl::endpoints::create_proxy::<ffxfs::BlobReaderMarker>();
851        let client = Client::new(
852            fidl::endpoints::create_proxy::<fio::DirectoryMarker>().0,
853            Some(blob_creator),
854            Some(blob_reader),
855            None,
856        )
857        .unwrap();
858
859        fuchsia_async::Task::spawn(async move {
860            match blob_creator_stream.next().await.unwrap().unwrap() {
861                ffxfs::BlobCreatorRequest::Create { hash, allow_existing, responder } => {
862                    assert_eq!(hash, [0; 32]);
863                    assert!(!allow_existing);
864                    let () = responder.send(Ok(fidl::endpoints::create_endpoints().0)).unwrap();
865                }
866            }
867        })
868        .detach();
869
870        assert_matches!(
871            client.open_blob_for_write(&[0; 32].into()).await,
872            Ok(fpkg::BlobWriter::Writer(_))
873        );
874    }
875
876    #[fasync::run_singlethreaded(test)]
877    async fn open_blob_for_write_fxblob_maps_already_exists() {
878        let (blob_creator, mut blob_creator_stream) =
879            fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
880        let (blob_reader, _) = fidl::endpoints::create_proxy::<ffxfs::BlobReaderMarker>();
881
882        let client = Client::new(
883            fidl::endpoints::create_proxy::<fio::DirectoryMarker>().0,
884            Some(blob_creator),
885            Some(blob_reader),
886            None,
887        )
888        .unwrap();
889
890        fuchsia_async::Task::spawn(async move {
891            match blob_creator_stream.next().await.unwrap().unwrap() {
892                ffxfs::BlobCreatorRequest::Create { hash, allow_existing, responder } => {
893                    assert_eq!(hash, [0; 32]);
894                    assert!(!allow_existing);
895                    let () = responder.send(Err(ffxfs::CreateBlobError::AlreadyExists)).unwrap();
896                }
897            }
898        })
899        .detach();
900
901        assert_matches!(
902            client.open_blob_for_write(&[0; 32].into()).await,
903            Err(CreateError::AlreadyExists)
904        );
905    }
906
907    #[fasync::run_singlethreaded(test)]
908    async fn concurrent_list_known_blobs_all_return_full_contents() {
909        use futures::StreamExt;
910        let blobfs = BlobfsRamdisk::builder().start().await.unwrap();
911        let client = Client::for_ramdisk(&blobfs);
912
913        // ReadDirents returns an 8,192 byte buffer, and each entry is 74 bytes [0] (including 64
914        // bytes of filename), so use more than 110 entries to guarantee that listing all contents
915        // requires multiple ReadDirents calls. This isn't necessary to cause conflict, because
916        // each successful listing requires a call to Rewind as well, but it does make conflict
917        // more likely.
918        // [0] https://cs.opensource.google/fuchsia/fuchsia/+/main:sdk/fidl/fuchsia.io/directory.fidl;l=261;drc=9e84e19d3f42240c46d2b0c3c132c2f0b5a3343f
919        for i in 0..256u16 {
920            let _: TestBlob = fully_write_blob(&client, i.to_le_bytes().as_slice()).await;
921        }
922
923        let () = futures::stream::iter(0..100)
924            .for_each_concurrent(None, |_| async {
925                assert_eq!(client.list_known_blobs().await.unwrap().len(), 256);
926            })
927            .await;
928    }
929}