Skip to main content

fxfs_platform/fuchsia/fxblob/
directory.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! This module contains the [`BlobDirectory`] node type used to represent a directory of immutable
6//! content-addressable blobs.
7
8use crate::fuchsia::component::map_to_raw_status;
9use crate::fuchsia::directory::FxDirectory;
10use crate::fuchsia::fxblob::blob::{CompressionInfo, FxBlob};
11use crate::fuchsia::fxblob::writer::DeliveryBlobWriter;
12use crate::fuchsia::node::{FxNode, GetResult, OpenedNode};
13use crate::fuchsia::volume::{FxVolume, RootDir};
14use anyhow::{Context as _, Error, anyhow, ensure};
15use delivery_blob::compression::CompressionAlgorithm;
16use fidl::endpoints::{ClientEnd, DiscoverableProtocolMarker, ServerEnd, create_request_stream};
17use fidl_fuchsia_fxfs::{
18    BlobCreatorMarker, BlobCreatorRequest, BlobCreatorRequestStream, BlobReaderMarker,
19    BlobReaderRequest, BlobReaderRequestStream, BlobWriterMarker, CreateBlobError,
20};
21use fidl_fuchsia_io::{self as fio, FilesystemInfo, NodeMarker, WatchMask};
22use fuchsia_hash::Hash;
23use futures::TryStreamExt;
24use fxfs::blob_metadata::{BlobFormat, BlobMetadata};
25use fxfs::errors::FxfsError;
26use fxfs::object_handle::ReadObjectHandle;
27use fxfs::object_store::transaction::{LockKey, lock_keys};
28use fxfs::object_store::{self, HandleOptions, ObjectDescriptor, ObjectStore};
29use fxfs_macros::ToWeakNode;
30use fxfs_trace::{TraceFutureExt, trace_future_args};
31use std::str::FromStr;
32use std::sync::Arc;
33use vfs::directory::dirents_sink;
34use vfs::directory::entry::{DirectoryEntry, EntryInfo, GetEntryInfo, OpenRequest};
35use vfs::directory::entry_container::{
36    Directory as VfsDirectory, DirectoryWatcher, MutableDirectory,
37};
38use vfs::directory::helper::DirectlyMutable;
39use vfs::directory::mutable::connection::MutableConnection;
40use vfs::directory::simple::Simple;
41use vfs::directory::traversal_position::TraversalPosition;
42use vfs::execution_scope::ExecutionScope;
43use vfs::path::Path;
44use vfs::{ObjectRequestRef, ProtocolsExt, ToObjectRequest};
45use zx::Status;
46
47/// A flat directory containing content-addressable blobs (names are their hashes).
48/// It is not possible to create sub-directories.
49/// It is not possible to write to an existing blob.
50/// It is not possible to open or read a blob until it is written and verified.
51#[derive(ToWeakNode)]
52pub struct BlobDirectory {
53    directory: Arc<FxDirectory>,
54}
55
56/// Instead of constantly switching back and forth between strings and hashes. Do it once and then
57/// just pass around a reference to that.
58pub(crate) struct Identifier {
59    pub string: String,
60    pub hash: Hash,
61}
62
63impl TryFrom<&str> for Identifier {
64    type Error = FxfsError;
65    fn try_from(value: &str) -> Result<Self, Self::Error> {
66        Ok(Self {
67            string: value.to_owned(),
68            hash: Hash::from_str(value).map_err(|_| FxfsError::InvalidArgs)?,
69        })
70    }
71}
72
73impl From<Hash> for Identifier {
74    fn from(hash: Hash) -> Self {
75        Self { string: hash.to_string(), hash }
76    }
77}
78
79impl RootDir for BlobDirectory {
80    fn as_directory_entry(self: Arc<Self>) -> Arc<dyn DirectoryEntry> {
81        self
82    }
83
84    fn serve(self: Arc<Self>, flags: fio::Flags, server_end: ServerEnd<fio::DirectoryMarker>) {
85        let scope = self.volume().scope().clone();
86        vfs::directory::serve_on(self, flags, scope, server_end);
87    }
88
89    fn as_node(self: Arc<Self>) -> Arc<dyn FxNode> {
90        self as Arc<dyn FxNode>
91    }
92
93    fn register_additional_volume_services(self: Arc<Self>, svc_dir: &Simple) -> Result<(), Error> {
94        let this = self.clone();
95        svc_dir.add_entry(
96            BlobCreatorMarker::PROTOCOL_NAME,
97            vfs::service::host(move |r| this.clone().handle_blob_creator_requests(r)),
98        )?;
99
100        svc_dir.add_entry(
101            BlobReaderMarker::PROTOCOL_NAME,
102            vfs::service::host(move |r| self.clone().handle_blob_reader_requests(r)),
103        )?;
104
105        Ok(())
106    }
107}
108
109impl BlobDirectory {
110    fn new(directory: FxDirectory) -> Self {
111        Self { directory: Arc::new(directory) }
112    }
113
114    pub fn directory(&self) -> &Arc<FxDirectory> {
115        &self.directory
116    }
117
118    pub fn volume(&self) -> &Arc<FxVolume> {
119        self.directory.volume()
120    }
121
122    fn store(&self) -> &ObjectStore {
123        self.directory.store()
124    }
125
126    /// Open blob and get the child vmo. This allows the creation of the child vmo to be atomic with
127    /// the open.
128    pub(crate) async fn open_blob_get_vmo(
129        self: &Arc<Self>,
130        id: &Identifier,
131    ) -> Result<(Arc<FxBlob>, zx::Vmo), Error> {
132        let store = self.store();
133        let fs = store.filesystem();
134        let keys = lock_keys![LockKey::object(store.store_object_id(), self.directory.object_id())];
135        // A lock needs to be held over searching the directory and incrementing the open count.
136        let _guard = fs.lock_manager().read_lock(keys.clone()).await;
137        let blob = self.open_blob_locked(id).await?.ok_or(FxfsError::NotFound)?;
138        let vmo = blob.create_child_vmo()?;
139        // Downgrade from an OpenedNode<Node> to a Node.
140        Ok((blob.clone(), vmo))
141    }
142
143    /// Wraps ['open_blob_locked'] while taking the locks.
144    pub(crate) async fn open_blob(
145        self: &Arc<Self>,
146        id: &Identifier,
147    ) -> Result<Option<OpenedNode<FxBlob>>, Error> {
148        let store = self.store();
149        let fs = store.filesystem();
150        let keys = lock_keys![LockKey::object(store.store_object_id(), self.directory.object_id())];
151        // A lock needs to be held over searching the directory and incrementing the open count.
152        let _guard = fs.lock_manager().read_lock(keys.clone()).await;
153        self.open_blob_locked(id).await
154    }
155
156    /// Attempt to open and cache the blob with `id` in this directory. Returns `Ok(None)` if no
157    /// blob matching `id` was found. Requires holding locks for at least the object store and
158    /// directory object.
159    async fn open_blob_locked(
160        self: &Arc<Self>,
161        id: &Identifier,
162    ) -> Result<Option<OpenedNode<FxBlob>>, Error> {
163        let node = match self
164            .directory
165            .directory()
166            .owner()
167            .dirent_cache()
168            .lookup(&(self.directory.object_id(), &id.string))
169        {
170            Some(node) => Some(node),
171            None => {
172                if let Some((object_id, _, _)) =
173                    self.directory.directory().lookup(&id.string).await?
174                {
175                    let node = self.get_or_load_node(object_id, &id).await?;
176                    self.directory.directory().owner().dirent_cache().insert(
177                        self.directory.object_id(),
178                        id.string.clone(),
179                        node.clone(),
180                    );
181                    Some(node)
182                } else {
183                    None
184                }
185            }
186        };
187        let Some(node) = node else {
188            return Ok(None);
189        };
190        if node.object_descriptor() != ObjectDescriptor::File {
191            return Err(FxfsError::Inconsistent)
192                .with_context(|| format!("Blob {} has invalid object descriptor!", id.string));
193        }
194        node.into_any()
195            .downcast::<FxBlob>()
196            .map(|node| Some(OpenedNode::new(node)))
197            .map_err(|_| FxfsError::Inconsistent)
198            .with_context(|| format!("Blob {} has incorrect node type!", id.string))
199    }
200
201    // Attempts to get a node from the node cache. If the node wasn't present in the cache, loads
202    // the object from the object store, installing the returned node into the cache and returns the
203    // newly created FxNode backed by the loaded object.
204    async fn get_or_load_node(
205        self: &Arc<Self>,
206        object_id: u64,
207        id: &Identifier,
208    ) -> Result<Arc<dyn FxNode>, Error> {
209        let volume = self.volume();
210        match volume.cache().get_or_reserve(object_id).await {
211            GetResult::Node(node) => {
212                // Protecting against the scenario where a directory entry points to another node
213                // which has already been loaded and verified with the correct hash. We need to
214                // verify that the hash for the blob that is cached here matches the requested hash.
215                let blob = node.into_any().downcast::<FxBlob>().map_err(|_| {
216                    anyhow!(FxfsError::Inconsistent).context("Loaded non-blob from cache")
217                })?;
218                ensure!(
219                    blob.root() == id.hash,
220                    anyhow!(FxfsError::Inconsistent)
221                        .context("Loaded blob by node that did not match the given hash")
222                );
223                Ok(blob as Arc<dyn FxNode>)
224            }
225            GetResult::Placeholder(placeholder) => {
226                let object =
227                    ObjectStore::open_object(volume, object_id, HandleOptions::default(), None)
228                        .await?;
229                let metadata = BlobMetadata::read_from(&object).await?;
230                let (uncompressed_size, compression_info) = match &metadata.format {
231                    BlobFormat::Uncompressed => (object.get_size(), None),
232                    BlobFormat::ChunkedZstd {
233                        uncompressed_size,
234                        chunk_size,
235                        compressed_offsets,
236                    } => (
237                        *uncompressed_size,
238                        Some(CompressionInfo::new(
239                            *chunk_size,
240                            compressed_offsets,
241                            CompressionAlgorithm::Zstd,
242                        )?),
243                    ),
244                    BlobFormat::ChunkedLz4 {
245                        uncompressed_size,
246                        chunk_size,
247                        compressed_offsets,
248                    } => (
249                        *uncompressed_size,
250                        Some(CompressionInfo::new(
251                            *chunk_size,
252                            compressed_offsets,
253                            CompressionAlgorithm::Lz4,
254                        )?),
255                    ),
256                };
257                let merkle_verifier = metadata.into_merkle_verifier(id.hash)?;
258
259                let node = FxBlob::new(
260                    object,
261                    id.hash,
262                    merkle_verifier,
263                    compression_info,
264                    uncompressed_size,
265                )? as Arc<dyn FxNode>;
266                placeholder.commit(&node);
267                Ok(node)
268            }
269        }
270    }
271
272    /// Creates a [`ClientEnd<BlobWriterMarker>`] to write the delivery blob identified by `hash`.
273    /// It is safe to create multiple writers for a given `hash`, however only one will succeed.
274    /// Requests are handled asynchronously on this volume's execution scope.
275    async fn create_blob_writer(
276        self: &Arc<Self>,
277        hash: Hash,
278        allow_existing: bool,
279    ) -> Result<ClientEnd<BlobWriterMarker>, CreateBlobError> {
280        let id = hash.into();
281        let blob_exists = self
282            .open_blob(&id)
283            .await
284            .map_err(|e| {
285                log::error!("Failed to lookup blob: {:?}", e);
286                CreateBlobError::Internal
287            })?
288            .is_some();
289        if blob_exists && !allow_existing {
290            return Err(CreateBlobError::AlreadyExists);
291        }
292        let (client_end, request_stream) = create_request_stream::<BlobWriterMarker>();
293        let writer = DeliveryBlobWriter::new(self, hash).await.map_err(|e| {
294            log::error!("Failed to create blob writer: {:?}", e);
295            CreateBlobError::Internal
296        })?;
297        self.volume().scope().spawn(async move {
298            if let Err(e) = writer.handle_requests(request_stream).await {
299                log::error!("Failed to handle BlobWriter requests: {}", e);
300            }
301        });
302        return Ok(client_end);
303    }
304
305    async fn needs_overwrite(&self, blob_hash: Identifier) -> Result<bool, Error> {
306        // We don't take a lock here because this will only look up existence for now. If we
307        // actually start fetching the blob or info about it after looking it up this will need to
308        // take a reader lock on the directory and maybe also the object.
309        if self.volume().dirent_cache().lookup(&(self.object_id(), &blob_hash.string)).is_some() {
310            return Ok(false);
311        }
312        match self.directory.directory().lookup(&blob_hash.string).await? {
313            Some(_) => Ok(false),
314            None => Err(FxfsError::NotFound.into()),
315        }
316    }
317
318    async fn handle_blob_creator_requests(self: Arc<Self>, mut requests: BlobCreatorRequestStream) {
319        while let Ok(Some(request)) = requests.try_next().await {
320            match request {
321                BlobCreatorRequest::Create { responder, hash, allow_existing } => {
322                    async {
323                        responder
324                            .send(self.create_blob_writer(Hash::from(hash), allow_existing).await)
325                            .unwrap_or_else(|error| {
326                                log::error!(error:?; "failed to send Create response");
327                            });
328                    }
329                    .trace(trace_future_args!("BlobCreator::Create"))
330                    .await;
331                }
332                BlobCreatorRequest::NeedsOverwrite { blob_hash, responder } => {
333                    async {
334                        let _ = responder.send(
335                            self.needs_overwrite(Hash::from(blob_hash).into())
336                                .await
337                                .map_err(map_to_raw_status),
338                        );
339                    }
340                    .trace(trace_future_args!("BlobCreator::NeedsOverwrite"))
341                    .await;
342                }
343            }
344        }
345    }
346
347    async fn handle_blob_reader_requests(self: Arc<Self>, mut requests: BlobReaderRequestStream) {
348        while let Ok(Some(request)) = requests.try_next().await {
349            match request {
350                BlobReaderRequest::GetVmo { blob_hash, responder } => {
351                    async {
352                        responder
353                            .send(
354                                self.get_blob_vmo(blob_hash.into())
355                                    .await
356                                    .map_err(map_to_raw_status),
357                            )
358                            .unwrap_or_else(|error| {
359                                log::error!(error:?; "failed to send GetVmo response");
360                            });
361                    }
362                    .trace(trace_future_args!("BlobReader::GetVmo"))
363                    .await;
364                }
365            };
366        }
367    }
368
369    async fn open_impl(
370        self: Arc<Self>,
371        scope: ExecutionScope,
372        path: Path,
373        flags: impl ProtocolsExt,
374        object_request: ObjectRequestRef<'_>,
375    ) -> Result<(), Status> {
376        if path.is_empty() {
377            object_request
378                .create_connection::<MutableConnection<_>, _>(
379                    scope,
380                    OpenedNode::new(self).take(),
381                    flags,
382                )
383                .await
384        } else {
385            Err(Status::NOT_SUPPORTED)
386        }
387    }
388}
389
390impl FxNode for BlobDirectory {
391    fn object_id(&self) -> u64 {
392        self.directory.object_id()
393    }
394
395    fn parent(&self) -> Option<Arc<FxDirectory>> {
396        self.directory.parent()
397    }
398
399    fn set_parent(&self, _parent: Arc<FxDirectory>) {
400        // This directory can't be renamed.
401        unreachable!();
402    }
403
404    fn open_count_add_one(&self) {}
405    fn open_count_sub_one(self: Arc<Self>) {}
406
407    fn object_descriptor(&self) -> ObjectDescriptor {
408        ObjectDescriptor::Directory
409    }
410}
411
412impl MutableDirectory for BlobDirectory {
413    async fn unlink(self: Arc<Self>, name: &str, must_be_directory: bool) -> Result<(), Status> {
414        if must_be_directory {
415            return Err(Status::INVALID_ARGS);
416        }
417        self.directory.clone().unlink(name, must_be_directory).await
418    }
419
420    async fn update_attributes(
421        &self,
422        attributes: fio::MutableNodeAttributes,
423    ) -> Result<(), Status> {
424        self.directory.update_attributes(attributes).await
425    }
426
427    async fn sync(&self) -> Result<(), Status> {
428        self.directory.sync().await
429    }
430}
431
432/// Implementation of VFS pseudo-directory for blobs. Forks a task per connection.
433impl DirectoryEntry for BlobDirectory {
434    fn open_entry(self: Arc<Self>, request: OpenRequest<'_>) -> Result<(), Status> {
435        request.open_dir(self)
436    }
437
438    fn scope(&self) -> Option<ExecutionScope> {
439        Some(self.volume().scope().clone())
440    }
441}
442
443impl GetEntryInfo for BlobDirectory {
444    fn entry_info(&self) -> EntryInfo {
445        self.directory.entry_info()
446    }
447}
448
449impl vfs::node::Node for BlobDirectory {
450    async fn get_attributes(
451        &self,
452        requested_attributes: fio::NodeAttributesQuery,
453    ) -> Result<fio::NodeAttributes2, Status> {
454        self.directory.get_attributes(requested_attributes).await
455    }
456
457    fn query_filesystem(&self) -> Result<FilesystemInfo, Status> {
458        self.directory.query_filesystem()
459    }
460}
461
462/// Implements VFS entry container trait for directories, allowing manipulation of their contents.
463impl VfsDirectory for BlobDirectory {
464    fn deprecated_open(
465        self: Arc<Self>,
466        scope: ExecutionScope,
467        flags: fio::OpenFlags,
468        path: Path,
469        server_end: ServerEnd<NodeMarker>,
470    ) {
471        scope.clone().spawn(flags.to_object_request(server_end).handle_async(
472            async move |object_request| self.open_impl(scope, path, flags, object_request).await,
473        ));
474    }
475
476    fn open(
477        self: Arc<Self>,
478        scope: ExecutionScope,
479        path: Path,
480        flags: fio::Flags,
481        object_request: ObjectRequestRef<'_>,
482    ) -> Result<(), Status> {
483        scope.clone().spawn(object_request.take().handle_async(async move |object_request| {
484            self.open_impl(scope, path, flags, object_request).await
485        }));
486        Ok(())
487    }
488
489    async fn open_async(
490        self: Arc<Self>,
491        scope: ExecutionScope,
492        path: Path,
493        flags: fio::Flags,
494        object_request: ObjectRequestRef<'_>,
495    ) -> Result<(), Status> {
496        self.open_impl(scope, path, flags, object_request).await
497    }
498
499    async fn read_dirents(
500        &self,
501        pos: &TraversalPosition,
502        sink: Box<dyn dirents_sink::Sink>,
503    ) -> Result<(TraversalPosition, Box<dyn dirents_sink::Sealed>), Status> {
504        self.directory.read_dirents(pos, sink).await
505    }
506
507    fn register_watcher(
508        self: Arc<Self>,
509        scope: ExecutionScope,
510        mask: WatchMask,
511        watcher: DirectoryWatcher,
512    ) -> Result<(), Status> {
513        self.directory.clone().register_watcher(scope, mask, watcher)
514    }
515
516    fn unregister_watcher(self: Arc<Self>, key: usize) {
517        self.directory.clone().unregister_watcher(key)
518    }
519}
520
521impl From<object_store::Directory<FxVolume>> for BlobDirectory {
522    fn from(dir: object_store::Directory<FxVolume>) -> Self {
523        Self::new(dir.into())
524    }
525}
526
527#[cfg(test)]
528mod tests {
529    use super::*;
530    use crate::fuchsia::fxblob::testing::{BlobFixture, new_blob_fixture, open_blob_fixture};
531    use assert_matches::assert_matches;
532    use blob_writer::BlobWriter;
533    use delivery_blob::{CompressionMode, Type1Blob, delivery_blob_path};
534    use fidl_fuchsia_fxfs::BlobReaderMarker;
535    use fuchsia_async::{self as fasync, DurationExt as _, TimeoutExt as _};
536    use fuchsia_component_client::connect_to_protocol_at_dir_svc;
537    use fuchsia_fs::directory::{
538        DirEntry, DirentKind, WatchEvent, WatchMessage, Watcher, readdir_inclusive,
539    };
540
541    use futures::StreamExt as _;
542    use std::path::PathBuf;
543
544    #[fasync::run(10, test)]
545    async fn test_unlink() {
546        let fixture = new_blob_fixture().await;
547
548        let data = [1; 1000];
549
550        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
551
552        assert_eq!(fixture.read_blob(hash).await, data);
553
554        fixture
555            .root()
556            .unlink(&format!("{}", hash), &fio::UnlinkOptions::default())
557            .await
558            .expect("FIDL failed")
559            .expect("unlink failed");
560
561        fixture.close().await;
562    }
563
564    #[fasync::run(10, test)]
565    async fn test_readdir() {
566        let fixture = new_blob_fixture().await;
567
568        let data = [0xab; 2];
569        let hash;
570        {
571            hash = fuchsia_merkle::root_from_slice(&data);
572            let compressed_data: Vec<u8> = Type1Blob::generate(&data, CompressionMode::Always);
573
574            let blob_proxy =
575                connect_to_protocol_at_dir_svc::<fidl_fuchsia_fxfs::BlobCreatorMarker>(
576                    fixture.volume_out_dir(),
577                )
578                .expect("failed to connect to the Blob service");
579
580            let blob_writer_client_end = blob_proxy
581                .create(&hash.into(), false)
582                .await
583                .expect("transport error on create")
584                .expect("failed to create blob");
585
586            let writer = blob_writer_client_end.into_proxy();
587            let mut blob_writer = BlobWriter::create(writer, compressed_data.len() as u64)
588                .await
589                .expect("failed to create BlobWriter");
590            blob_writer.write(&compressed_data[..1]).await.unwrap();
591
592            // Before the blob is finished writing, it shouldn't appear in the directory.
593            assert_eq!(
594                readdir_inclusive(fixture.root()).await.ok(),
595                Some(vec![DirEntry { name: ".".to_string(), kind: DirentKind::Directory }])
596            );
597
598            blob_writer.write(&compressed_data[1..]).await.unwrap();
599        }
600
601        assert_eq!(
602            readdir_inclusive(fixture.root()).await.ok(),
603            Some(vec![
604                DirEntry { name: ".".to_string(), kind: DirentKind::Directory },
605                DirEntry { name: format! {"{}", hash}, kind: DirentKind::File },
606            ])
607        );
608
609        fixture.close().await;
610    }
611
612    #[fasync::run(10, test)]
613    async fn test_watchers() {
614        let fixture = new_blob_fixture().await;
615
616        let mut watcher = Watcher::new(fixture.root()).await.unwrap();
617        assert_eq!(
618            watcher.next().await,
619            Some(Ok(WatchMessage { event: WatchEvent::EXISTING, filename: PathBuf::from(".") }))
620        );
621        assert_matches!(
622            watcher.next().await,
623            Some(Ok(WatchMessage { event: WatchEvent::IDLE, .. }))
624        );
625
626        let data = vec![vec![0xab; 2], vec![0xcd; 65_536]];
627        let mut hashes = vec![];
628        let mut filenames = vec![];
629        for datum in data {
630            let hash = fuchsia_merkle::root_from_slice(&datum);
631            let filename = PathBuf::from(format!("{}", hash));
632            hashes.push(hash.clone());
633            filenames.push(filename.clone());
634
635            let compressed_data: Vec<u8> = Type1Blob::generate(&datum, CompressionMode::Always);
636
637            let blob_proxy =
638                connect_to_protocol_at_dir_svc::<fidl_fuchsia_fxfs::BlobCreatorMarker>(
639                    fixture.volume_out_dir(),
640                )
641                .expect("failed to connect to the Blob service");
642
643            let blob_writer_client_end = blob_proxy
644                .create(&hash.into(), false)
645                .await
646                .expect("transport error on create")
647                .expect("failed to create blob");
648
649            let writer = blob_writer_client_end.into_proxy();
650            let mut blob_writer = BlobWriter::create(writer, compressed_data.len() as u64)
651                .await
652                .expect("failed to create BlobWriter");
653            blob_writer.write(&compressed_data[..compressed_data.len() - 1]).await.unwrap();
654
655            // Before the blob is finished writing, we shouldn't see any watch events for it.
656            assert_matches!(
657                watcher
658                    .next()
659                    .on_timeout(zx::MonotonicDuration::from_millis(500).after_now(), || None)
660                    .await,
661                None
662            );
663
664            blob_writer.write(&compressed_data[compressed_data.len() - 1..]).await.unwrap();
665
666            assert_eq!(
667                watcher.next().await,
668                Some(Ok(WatchMessage { event: WatchEvent::ADD_FILE, filename }))
669            );
670        }
671
672        for (hash, filename) in hashes.iter().zip(filenames) {
673            fixture
674                .root()
675                .unlink(&format!("{}", hash), &fio::UnlinkOptions::default())
676                .await
677                .expect("FIDL call failed")
678                .expect("unlink failed");
679            assert_eq!(
680                watcher.next().await,
681                Some(Ok(WatchMessage { event: WatchEvent::REMOVE_FILE, filename }))
682            );
683        }
684
685        std::mem::drop(watcher);
686        fixture.close().await;
687    }
688
689    #[fasync::run(10, test)]
690    async fn test_rename_fails() {
691        let fixture = new_blob_fixture().await;
692
693        let data = vec![];
694        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
695
696        let (status, token) = fixture.root().get_token().await.expect("FIDL failed");
697        Status::ok(status).unwrap();
698        fixture
699            .root()
700            .rename(&format!("{}", delivery_blob_path(hash)), token.unwrap().into(), "foo")
701            .await
702            .expect("FIDL failed")
703            .expect_err("rename should fail");
704
705        fixture.close().await;
706    }
707
708    #[fasync::run(10, test)]
709    async fn test_link_fails() {
710        let fixture = new_blob_fixture().await;
711
712        let data = vec![];
713        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
714
715        let (status, token) = fixture.root().get_token().await.expect("FIDL failed");
716        Status::ok(status).unwrap();
717        let status = fixture
718            .root()
719            .link(&format!("{}", hash), token.unwrap().into(), "foo")
720            .await
721            .expect("FIDL failed");
722        assert_eq!(Status::from_raw(status), Status::NOT_SUPPORTED);
723
724        fixture.close().await;
725    }
726
727    #[fasync::run(10, test)]
728    async fn test_verify_cached_hash_node() {
729        let fixture = new_blob_fixture().await;
730
731        let data = vec![];
732        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
733        let evil_hash =
734            Hash::from_str("2222222222222222222222222222222222222222222222222222222222222222")
735                .unwrap();
736
737        // Create a malicious link to the existing blob. This shouldn't be possible without special
738        // access either via internal apis or modifying the disk image.
739        {
740            let root = fixture
741                .volume()
742                .root()
743                .clone()
744                .as_node()
745                .into_any()
746                .downcast::<BlobDirectory>()
747                .unwrap()
748                .directory()
749                .clone();
750            root.clone()
751                .link(evil_hash.to_string(), root, &hash.to_string())
752                .await
753                .expect("Linking file");
754        }
755        let device = fixture.close().await;
756
757        let fixture = open_blob_fixture(device).await;
758        {
759            // Hold open a ref to keep it in the node cache.
760            let _vmo = fixture.get_blob_vmo(hash).await;
761
762            // Open the malicious link
763            let blob_reader =
764                connect_to_protocol_at_dir_svc::<BlobReaderMarker>(fixture.volume_out_dir())
765                    .expect("failed to connect to the BlobReader service");
766            blob_reader
767                .get_vmo(&evil_hash.into())
768                .await
769                .expect("transport error on BlobReader.GetVmo")
770                .expect_err("Hashes should mismatch");
771        }
772        fixture.close().await;
773    }
774
775    #[fasync::run(10, test)]
776    async fn test_blob_needs_overwrite_verifies_existence() {
777        let fixture = new_blob_fixture().await;
778        let data = vec![42u8; 32];
779        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
780
781        let blob_creator =
782            connect_to_protocol_at_dir_svc::<BlobCreatorMarker>(fixture.volume_out_dir())
783                .expect("failed to connect to the BlobCreator service");
784        // Do it once to fetch it fresh after clearing the cache.
785        fixture.volume().volume().dirent_cache().clear();
786        assert!(
787            !blob_creator
788                .needs_overwrite(&hash.into())
789                .await
790                .expect("fidl transport")
791                .expect("Find new blob")
792        );
793
794        // Get it into the cache and then check again.
795        let blob_reader =
796            connect_to_protocol_at_dir_svc::<BlobReaderMarker>(fixture.volume_out_dir())
797                .expect("failed to connect to the BlobReader service");
798        blob_reader
799            .get_vmo(&hash.into())
800            .await
801            .expect("transport error on BlobReader.GetVmo")
802            .expect("Opening blob");
803        assert!(
804            !blob_creator
805                .needs_overwrite(&hash.into())
806                .await
807                .expect("fidl transport")
808                .expect("Find new blob")
809        );
810
811        // Fail to find one that is missing.
812        blob_creator
813            .needs_overwrite(&[1u8; 32].into())
814            .await
815            .expect("fidl transport")
816            .expect_err("Blob should not exist");
817        fixture.close().await;
818    }
819}