Skip to main content

fxfs_platform_testing/fuchsia/fxblob/
directory.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! This module contains the [`BlobDirectory`] node type used to represent a directory of immutable
6//! content-addressable blobs.
7
8use crate::fuchsia::component::map_to_raw_status;
9use crate::fuchsia::directory::FxDirectory;
10use crate::fuchsia::fxblob::blob::FxBlob;
11use crate::fuchsia::fxblob::writer::DeliveryBlobWriter;
12use crate::fuchsia::node::{FxNode, GetResult, OpenedNode};
13use crate::fuchsia::volume::{FxVolume, RootDir};
14use anyhow::{Context as _, Error, anyhow, ensure};
15use fidl::endpoints::{ClientEnd, DiscoverableProtocolMarker, ServerEnd, create_request_stream};
16use fidl_fuchsia_fxfs::{
17    BlobCreatorMarker, BlobCreatorRequest, BlobCreatorRequestStream, BlobReaderMarker,
18    BlobReaderRequest, BlobReaderRequestStream, BlobWriterMarker, CreateBlobError,
19};
20use fidl_fuchsia_io::{self as fio, FilesystemInfo, NodeMarker, WatchMask};
21use fuchsia_hash::Hash;
22use futures::TryStreamExt;
23use fxfs::errors::FxfsError;
24use fxfs::object_store::transaction::{LockKey, lock_keys};
25use fxfs::object_store::{self, HandleOptions, ObjectDescriptor, ObjectStore, StoreObjectHandle};
26use fxfs_macros::ToWeakNode;
27use fxfs_trace::{TraceFutureExt, trace_future_args};
28use std::str::FromStr;
29use std::sync::Arc;
30use vfs::directory::dirents_sink;
31use vfs::directory::entry::{DirectoryEntry, EntryInfo, GetEntryInfo, OpenRequest};
32use vfs::directory::entry_container::{
33    Directory as VfsDirectory, DirectoryWatcher, MutableDirectory,
34};
35use vfs::directory::helper::DirectlyMutable;
36use vfs::directory::mutable::connection::MutableConnection;
37use vfs::directory::simple::Simple;
38use vfs::directory::traversal_position::TraversalPosition;
39use vfs::execution_scope::ExecutionScope;
40use vfs::path::Path;
41use vfs::{ObjectRequestRef, ProtocolsExt, ToObjectRequest};
42use zx::Status;
43
44/// A flat directory containing content-addressable blobs (names are their hashes).
45/// It is not possible to create sub-directories.
46/// It is not possible to write to an existing blob.
47/// It is not possible to open or read a blob until it is written and verified.
48#[derive(ToWeakNode)]
49pub struct BlobDirectory {
50    directory: Arc<FxDirectory>,
51}
52
53/// Instead of constantly switching back and forth between strings and hashes. Do it once and then
54/// just pass around a reference to that.
55pub(crate) struct Identifier {
56    pub string: String,
57    pub hash: Hash,
58}
59
60impl TryFrom<&str> for Identifier {
61    type Error = FxfsError;
62    fn try_from(value: &str) -> Result<Self, Self::Error> {
63        Ok(Self {
64            string: value.to_owned(),
65            hash: Hash::from_str(value).map_err(|_| FxfsError::InvalidArgs)?,
66        })
67    }
68}
69
70impl From<Hash> for Identifier {
71    fn from(hash: Hash) -> Self {
72        Self { string: hash.to_string(), hash }
73    }
74}
75
76impl RootDir for BlobDirectory {
77    fn as_directory_entry(self: Arc<Self>) -> Arc<dyn DirectoryEntry> {
78        self
79    }
80
81    fn serve(self: Arc<Self>, flags: fio::Flags, server_end: ServerEnd<fio::DirectoryMarker>) {
82        let scope = self.volume().scope().clone();
83        vfs::directory::serve_on(self, flags, scope, server_end);
84    }
85
86    fn as_node(self: Arc<Self>) -> Arc<dyn FxNode> {
87        self as Arc<dyn FxNode>
88    }
89
90    fn register_additional_volume_services(self: Arc<Self>, svc_dir: &Simple) -> Result<(), Error> {
91        let this = self.clone();
92        svc_dir.add_entry(
93            BlobCreatorMarker::PROTOCOL_NAME,
94            vfs::service::host(move |r| this.clone().handle_blob_creator_requests(r)),
95        )?;
96
97        svc_dir.add_entry(
98            BlobReaderMarker::PROTOCOL_NAME,
99            vfs::service::host(move |r| self.clone().handle_blob_reader_requests(r)),
100        )?;
101
102        Ok(())
103    }
104}
105
106impl BlobDirectory {
107    fn new(directory: FxDirectory) -> Self {
108        Self { directory: Arc::new(directory) }
109    }
110
111    pub fn directory(&self) -> &Arc<FxDirectory> {
112        &self.directory
113    }
114
115    pub fn volume(&self) -> &Arc<FxVolume> {
116        self.directory.volume()
117    }
118
119    fn store(&self) -> &ObjectStore {
120        self.directory.store()
121    }
122
123    /// Open blob and get the child vmo. This allows the creation of the child vmo to be atomic with
124    /// the open.
125    pub(crate) async fn open_blob_get_vmo(
126        self: &Arc<Self>,
127        id: &Identifier,
128    ) -> Result<(Arc<FxBlob>, zx::Vmo), Error> {
129        let store = self.store();
130        let fs = store.filesystem();
131        let keys = lock_keys![LockKey::object(store.store_object_id(), self.directory.object_id())];
132        // A lock needs to be held over searching the directory and incrementing the open count.
133        let _guard = fs.lock_manager().read_lock(keys.clone()).await;
134        let blob = self.open_blob_locked(id).await?.ok_or(FxfsError::NotFound)?;
135        let vmo = blob.create_child_vmo()?;
136        // Downgrade from an OpenedNode<Node> to a Node.
137        Ok((blob.clone(), vmo))
138    }
139
140    /// Wraps ['open_blob_locked'] while taking the locks.
141    pub(crate) async fn open_blob(
142        self: &Arc<Self>,
143        id: &Identifier,
144    ) -> Result<Option<OpenedNode<FxBlob>>, Error> {
145        let store = self.store();
146        let fs = store.filesystem();
147        let keys = lock_keys![LockKey::object(store.store_object_id(), self.directory.object_id())];
148        // A lock needs to be held over searching the directory and incrementing the open count.
149        let _guard = fs.lock_manager().read_lock(keys.clone()).await;
150        self.open_blob_locked(id).await
151    }
152
153    /// Attempt to open and cache the blob with `id` in this directory. Returns `Ok(None)` if no
154    /// blob matching `id` was found. Requires holding locks for at least the object store and
155    /// directory object.
156    async fn open_blob_locked(
157        self: &Arc<Self>,
158        id: &Identifier,
159    ) -> Result<Option<OpenedNode<FxBlob>>, Error> {
160        let node = match self
161            .directory
162            .directory()
163            .owner()
164            .dirent_cache()
165            .lookup(&(self.directory.object_id(), &id.string))
166        {
167            Some(node) => Some(node),
168            None => {
169                if let Some((object_id, _, _)) =
170                    self.directory.directory().lookup(&id.string).await?
171                {
172                    let node = self.get_or_load_node(object_id, &id).await?;
173                    self.directory.directory().owner().dirent_cache().insert(
174                        self.directory.object_id(),
175                        id.string.clone(),
176                        node.clone(),
177                    );
178                    Some(node)
179                } else {
180                    None
181                }
182            }
183        };
184        let Some(node) = node else {
185            return Ok(None);
186        };
187        if node.object_descriptor() != ObjectDescriptor::File {
188            return Err(FxfsError::Inconsistent)
189                .with_context(|| format!("Blob {} has invalid object descriptor!", id.string));
190        }
191        node.into_any()
192            .downcast::<FxBlob>()
193            .map(|node| Some(OpenedNode::new(node)))
194            .map_err(|_| FxfsError::Inconsistent)
195            .with_context(|| format!("Blob {} has incorrect node type!", id.string))
196    }
197
198    // Attempts to get a node from the node cache. If the node wasn't present in the cache, loads
199    // the object from the object store, installing the returned node into the cache and returns the
200    // newly created FxNode backed by the loaded object.
201    async fn get_or_load_node(
202        self: &Arc<Self>,
203        object_id: u64,
204        id: &Identifier,
205    ) -> Result<Arc<dyn FxNode>, Error> {
206        let volume = self.volume();
207        match volume.cache().get_or_reserve(object_id).await {
208            GetResult::Node(node) => {
209                // Protecting against the scenario where a directory entry points to another node
210                // which has already been loaded and verified with the correct hash. We need to
211                // verify that the hash for the blob that is cached here matches the requested hash.
212                let blob = node.into_any().downcast::<FxBlob>().map_err(|_| {
213                    anyhow!(FxfsError::Inconsistent).context("Loaded non-blob from cache")
214                })?;
215                ensure!(
216                    blob.root() == id.hash,
217                    anyhow!(FxfsError::Inconsistent)
218                        .context("Loaded blob by node that did not match the given hash")
219                );
220                Ok(blob as Arc<dyn FxNode>)
221            }
222            GetResult::Placeholder(placeholder) => {
223                let handle = StoreObjectHandle::new(
224                    volume.clone(),
225                    object_id,
226                    /*permanent_keys=*/ false,
227                    HandleOptions::default(),
228                    /*trace=*/ false,
229                );
230                let node = FxBlob::new(handle, id.hash).await? as Arc<dyn FxNode>;
231                placeholder.commit(&node);
232                Ok(node)
233            }
234        }
235    }
236
237    /// Creates a [`ClientEnd<BlobWriterMarker>`] to write the delivery blob identified by `hash`.
238    /// It is safe to create multiple writers for a given `hash`, however only one will succeed.
239    /// Requests are handled asynchronously on this volume's execution scope.
240    async fn create_blob_writer(
241        self: &Arc<Self>,
242        hash: Hash,
243        allow_existing: bool,
244    ) -> Result<ClientEnd<BlobWriterMarker>, CreateBlobError> {
245        let id = hash.into();
246        let blob_exists = self
247            .open_blob(&id)
248            .await
249            .map_err(|e| {
250                log::error!("Failed to lookup blob: {:?}", e);
251                CreateBlobError::Internal
252            })?
253            .is_some();
254        if blob_exists && !allow_existing {
255            return Err(CreateBlobError::AlreadyExists);
256        }
257        let (client_end, request_stream) = create_request_stream::<BlobWriterMarker>();
258        let writer = DeliveryBlobWriter::new(self, hash).await.map_err(|e| {
259            log::error!("Failed to create blob writer: {:?}", e);
260            CreateBlobError::Internal
261        })?;
262        self.volume().scope().spawn(async move {
263            if let Err(e) = writer.handle_requests(request_stream).await {
264                log::error!("Failed to handle BlobWriter requests: {}", e);
265            }
266        });
267        return Ok(client_end);
268    }
269
270    async fn needs_overwrite(&self, blob_hash: Identifier) -> Result<bool, Error> {
271        // We don't take a lock here because this will only look up existence for now. If we
272        // actually start fetching the blob or info about it after looking it up this will need to
273        // take a reader lock on the directory and maybe also the object.
274        if self.volume().dirent_cache().lookup(&(self.object_id(), &blob_hash.string)).is_some() {
275            return Ok(false);
276        }
277        match self.directory.directory().lookup(&blob_hash.string).await? {
278            Some(_) => Ok(false),
279            None => Err(FxfsError::NotFound.into()),
280        }
281    }
282
283    async fn handle_blob_creator_requests(self: Arc<Self>, mut requests: BlobCreatorRequestStream) {
284        while let Ok(Some(request)) = requests.try_next().await {
285            match request {
286                BlobCreatorRequest::Create { responder, hash, allow_existing } => {
287                    async {
288                        responder
289                            .send(self.create_blob_writer(Hash::from(hash), allow_existing).await)
290                            .unwrap_or_else(|error| {
291                                log::error!(error:?; "failed to send Create response");
292                            });
293                    }
294                    .trace(trace_future_args!("BlobCreator::Create"))
295                    .await;
296                }
297                BlobCreatorRequest::NeedsOverwrite { blob_hash, responder } => {
298                    async {
299                        let _ = responder.send(
300                            self.needs_overwrite(Hash::from(blob_hash).into())
301                                .await
302                                .map_err(map_to_raw_status),
303                        );
304                    }
305                    .trace(trace_future_args!("BlobCreator::NeedsOverwrite"))
306                    .await;
307                }
308            }
309        }
310    }
311
312    async fn handle_blob_reader_requests(self: Arc<Self>, mut requests: BlobReaderRequestStream) {
313        while let Ok(Some(request)) = requests.try_next().await {
314            match request {
315                BlobReaderRequest::GetVmo { blob_hash, responder } => {
316                    async {
317                        responder
318                            .send(
319                                self.get_blob_vmo(blob_hash.into())
320                                    .await
321                                    .map_err(map_to_raw_status),
322                            )
323                            .unwrap_or_else(|error| {
324                                log::error!(error:?; "failed to send GetVmo response");
325                            });
326                    }
327                    .trace(trace_future_args!("BlobReader::GetVmo"))
328                    .await;
329                }
330            };
331        }
332    }
333
334    async fn open_impl(
335        self: Arc<Self>,
336        scope: ExecutionScope,
337        path: Path,
338        flags: impl ProtocolsExt,
339        object_request: ObjectRequestRef<'_>,
340    ) -> Result<(), Status> {
341        if path.is_empty() {
342            object_request
343                .create_connection::<MutableConnection<_>, _>(
344                    scope,
345                    OpenedNode::new(self).take(),
346                    flags,
347                )
348                .await
349        } else {
350            Err(Status::NOT_SUPPORTED)
351        }
352    }
353}
354
355impl FxNode for BlobDirectory {
356    fn object_id(&self) -> u64 {
357        self.directory.object_id()
358    }
359
360    fn parent(&self) -> Option<Arc<FxDirectory>> {
361        self.directory.parent()
362    }
363
364    fn set_parent(&self, _parent: Arc<FxDirectory>) {
365        // This directory can't be renamed.
366        unreachable!();
367    }
368
369    fn open_count_add_one(&self) {}
370    fn open_count_sub_one(self: Arc<Self>) {}
371
372    fn object_descriptor(&self) -> ObjectDescriptor {
373        ObjectDescriptor::Directory
374    }
375}
376
377impl MutableDirectory for BlobDirectory {
378    async fn unlink(self: Arc<Self>, name: &str, must_be_directory: bool) -> Result<(), Status> {
379        if must_be_directory {
380            return Err(Status::INVALID_ARGS);
381        }
382        self.directory.clone().unlink(name, must_be_directory).await
383    }
384
385    async fn update_attributes(
386        &self,
387        attributes: fio::MutableNodeAttributes,
388    ) -> Result<(), Status> {
389        self.directory.update_attributes(attributes).await
390    }
391
392    async fn sync(&self) -> Result<(), Status> {
393        self.directory.sync().await
394    }
395}
396
397/// Implementation of VFS pseudo-directory for blobs. Forks a task per connection.
398impl DirectoryEntry for BlobDirectory {
399    fn open_entry(self: Arc<Self>, request: OpenRequest<'_>) -> Result<(), Status> {
400        request.open_dir(self)
401    }
402
403    fn scope(&self) -> Option<ExecutionScope> {
404        Some(self.volume().scope().clone())
405    }
406}
407
408impl GetEntryInfo for BlobDirectory {
409    fn entry_info(&self) -> EntryInfo {
410        self.directory.entry_info()
411    }
412}
413
414impl vfs::node::Node for BlobDirectory {
415    async fn get_attributes(
416        &self,
417        requested_attributes: fio::NodeAttributesQuery,
418    ) -> Result<fio::NodeAttributes2, Status> {
419        self.directory.get_attributes(requested_attributes).await
420    }
421
422    fn query_filesystem(&self) -> Result<FilesystemInfo, Status> {
423        self.directory.query_filesystem()
424    }
425}
426
427/// Implements VFS entry container trait for directories, allowing manipulation of their contents.
428impl VfsDirectory for BlobDirectory {
429    fn deprecated_open(
430        self: Arc<Self>,
431        scope: ExecutionScope,
432        flags: fio::OpenFlags,
433        path: Path,
434        server_end: ServerEnd<NodeMarker>,
435    ) {
436        scope.clone().spawn(flags.to_object_request(server_end).handle_async(
437            async move |object_request| self.open_impl(scope, path, flags, object_request).await,
438        ));
439    }
440
441    fn open(
442        self: Arc<Self>,
443        scope: ExecutionScope,
444        path: Path,
445        flags: fio::Flags,
446        object_request: ObjectRequestRef<'_>,
447    ) -> Result<(), Status> {
448        scope.clone().spawn(object_request.take().handle_async(async move |object_request| {
449            self.open_impl(scope, path, flags, object_request).await
450        }));
451        Ok(())
452    }
453
454    async fn open_async(
455        self: Arc<Self>,
456        scope: ExecutionScope,
457        path: Path,
458        flags: fio::Flags,
459        object_request: ObjectRequestRef<'_>,
460    ) -> Result<(), Status> {
461        self.open_impl(scope, path, flags, object_request).await
462    }
463
464    async fn read_dirents(
465        &self,
466        pos: &TraversalPosition,
467        sink: Box<dyn dirents_sink::Sink>,
468    ) -> Result<(TraversalPosition, Box<dyn dirents_sink::Sealed>), Status> {
469        self.directory.read_dirents(pos, sink).await
470    }
471
472    fn register_watcher(
473        self: Arc<Self>,
474        scope: ExecutionScope,
475        mask: WatchMask,
476        watcher: DirectoryWatcher,
477    ) -> Result<(), Status> {
478        self.directory.clone().register_watcher(scope, mask, watcher)
479    }
480
481    fn unregister_watcher(self: Arc<Self>, key: usize) {
482        self.directory.clone().unregister_watcher(key)
483    }
484}
485
486impl From<object_store::Directory<FxVolume>> for BlobDirectory {
487    fn from(dir: object_store::Directory<FxVolume>) -> Self {
488        Self::new(dir.into())
489    }
490}
491
492#[cfg(test)]
493mod tests {
494    use super::*;
495    use crate::fuchsia::fxblob::testing::{BlobFixture, new_blob_fixture, open_blob_fixture};
496    use assert_matches::assert_matches;
497    use blob_writer::BlobWriter;
498    use delivery_blob::{CompressionMode, Type1Blob, delivery_blob_path};
499    use fidl_fuchsia_fxfs::BlobReaderMarker;
500    use fuchsia_async::{self as fasync, DurationExt as _, TimeoutExt as _};
501    use fuchsia_component_client::connect_to_protocol_at_dir_svc;
502    use fuchsia_fs::directory::{
503        DirEntry, DirentKind, WatchEvent, WatchMessage, Watcher, readdir_inclusive,
504    };
505
506    use futures::StreamExt as _;
507    use std::path::PathBuf;
508
509    #[fasync::run(10, test)]
510    async fn test_unlink() {
511        let fixture = new_blob_fixture().await;
512
513        let data = [1; 1000];
514
515        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
516
517        assert_eq!(fixture.read_blob(hash).await, data);
518
519        fixture
520            .root()
521            .unlink(&format!("{}", hash), &fio::UnlinkOptions::default())
522            .await
523            .expect("FIDL failed")
524            .expect("unlink failed");
525
526        fixture.close().await;
527    }
528
529    #[fasync::run(10, test)]
530    async fn test_readdir() {
531        let fixture = new_blob_fixture().await;
532
533        let data = [0xab; 2];
534        let hash;
535        {
536            hash = fuchsia_merkle::root_from_slice(&data);
537            let compressed_data: Vec<u8> = Type1Blob::generate(&data, CompressionMode::Always);
538
539            let blob_proxy =
540                connect_to_protocol_at_dir_svc::<fidl_fuchsia_fxfs::BlobCreatorMarker>(
541                    fixture.volume_out_dir(),
542                )
543                .expect("failed to connect to the Blob service");
544
545            let blob_writer_client_end = blob_proxy
546                .create(&hash.into(), false)
547                .await
548                .expect("transport error on create")
549                .expect("failed to create blob");
550
551            let writer = blob_writer_client_end.into_proxy();
552            let mut blob_writer = BlobWriter::create(writer, compressed_data.len() as u64)
553                .await
554                .expect("failed to create BlobWriter");
555            blob_writer.write(&compressed_data[..1]).await.unwrap();
556
557            // Before the blob is finished writing, it shouldn't appear in the directory.
558            assert_eq!(
559                readdir_inclusive(fixture.root()).await.ok(),
560                Some(vec![DirEntry { name: ".".to_string(), kind: DirentKind::Directory }])
561            );
562
563            blob_writer.write(&compressed_data[1..]).await.unwrap();
564        }
565
566        assert_eq!(
567            readdir_inclusive(fixture.root()).await.ok(),
568            Some(vec![
569                DirEntry { name: ".".to_string(), kind: DirentKind::Directory },
570                DirEntry { name: format! {"{}", hash}, kind: DirentKind::File },
571            ])
572        );
573
574        fixture.close().await;
575    }
576
577    #[fasync::run(10, test)]
578    async fn test_watchers() {
579        let fixture = new_blob_fixture().await;
580
581        let mut watcher = Watcher::new(fixture.root()).await.unwrap();
582        assert_eq!(
583            watcher.next().await,
584            Some(Ok(WatchMessage { event: WatchEvent::EXISTING, filename: PathBuf::from(".") }))
585        );
586        assert_matches!(
587            watcher.next().await,
588            Some(Ok(WatchMessage { event: WatchEvent::IDLE, .. }))
589        );
590
591        let data = vec![vec![0xab; 2], vec![0xcd; 65_536]];
592        let mut hashes = vec![];
593        let mut filenames = vec![];
594        for datum in data {
595            let hash = fuchsia_merkle::root_from_slice(&datum);
596            let filename = PathBuf::from(format!("{}", hash));
597            hashes.push(hash.clone());
598            filenames.push(filename.clone());
599
600            let compressed_data: Vec<u8> = Type1Blob::generate(&datum, CompressionMode::Always);
601
602            let blob_proxy =
603                connect_to_protocol_at_dir_svc::<fidl_fuchsia_fxfs::BlobCreatorMarker>(
604                    fixture.volume_out_dir(),
605                )
606                .expect("failed to connect to the Blob service");
607
608            let blob_writer_client_end = blob_proxy
609                .create(&hash.into(), false)
610                .await
611                .expect("transport error on create")
612                .expect("failed to create blob");
613
614            let writer = blob_writer_client_end.into_proxy();
615            let mut blob_writer = BlobWriter::create(writer, compressed_data.len() as u64)
616                .await
617                .expect("failed to create BlobWriter");
618            blob_writer.write(&compressed_data[..compressed_data.len() - 1]).await.unwrap();
619
620            // Before the blob is finished writing, we shouldn't see any watch events for it.
621            assert_matches!(
622                watcher
623                    .next()
624                    .on_timeout(zx::MonotonicDuration::from_millis(500).after_now(), || None)
625                    .await,
626                None
627            );
628
629            blob_writer.write(&compressed_data[compressed_data.len() - 1..]).await.unwrap();
630
631            assert_eq!(
632                watcher.next().await,
633                Some(Ok(WatchMessage { event: WatchEvent::ADD_FILE, filename }))
634            );
635        }
636
637        for (hash, filename) in hashes.iter().zip(filenames) {
638            fixture
639                .root()
640                .unlink(&format!("{}", hash), &fio::UnlinkOptions::default())
641                .await
642                .expect("FIDL call failed")
643                .expect("unlink failed");
644            assert_eq!(
645                watcher.next().await,
646                Some(Ok(WatchMessage { event: WatchEvent::REMOVE_FILE, filename }))
647            );
648        }
649
650        std::mem::drop(watcher);
651        fixture.close().await;
652    }
653
654    #[fasync::run(10, test)]
655    async fn test_rename_fails() {
656        let fixture = new_blob_fixture().await;
657
658        let data = vec![];
659        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
660
661        let (status, token) = fixture.root().get_token().await.expect("FIDL failed");
662        Status::ok(status).unwrap();
663        fixture
664            .root()
665            .rename(&format!("{}", delivery_blob_path(hash)), token.unwrap().into(), "foo")
666            .await
667            .expect("FIDL failed")
668            .expect_err("rename should fail");
669
670        fixture.close().await;
671    }
672
673    #[fasync::run(10, test)]
674    async fn test_link_fails() {
675        let fixture = new_blob_fixture().await;
676
677        let data = vec![];
678        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
679
680        let (status, token) = fixture.root().get_token().await.expect("FIDL failed");
681        Status::ok(status).unwrap();
682        let status = fixture
683            .root()
684            .link(&format!("{}", hash), token.unwrap().into(), "foo")
685            .await
686            .expect("FIDL failed");
687        assert_eq!(Status::from_raw(status), Status::NOT_SUPPORTED);
688
689        fixture.close().await;
690    }
691
692    #[fasync::run(10, test)]
693    async fn test_verify_cached_hash_node() {
694        let fixture = new_blob_fixture().await;
695
696        let data = vec![];
697        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
698        let evil_hash =
699            Hash::from_str("2222222222222222222222222222222222222222222222222222222222222222")
700                .unwrap();
701
702        // Create a malicious link to the existing blob. This shouldn't be possible without special
703        // access either via internal apis or modifying the disk image.
704        {
705            let root = fixture
706                .volume()
707                .root()
708                .clone()
709                .as_node()
710                .into_any()
711                .downcast::<BlobDirectory>()
712                .unwrap()
713                .directory()
714                .clone();
715            root.clone()
716                .link(evil_hash.to_string(), root, &hash.to_string())
717                .await
718                .expect("Linking file");
719        }
720        let device = fixture.close().await;
721
722        let fixture = open_blob_fixture(device).await;
723        {
724            // Hold open a ref to keep it in the node cache.
725            let _vmo = fixture.get_blob_vmo(hash).await;
726
727            // Open the malicious link
728            let blob_reader =
729                connect_to_protocol_at_dir_svc::<BlobReaderMarker>(fixture.volume_out_dir())
730                    .expect("failed to connect to the BlobReader service");
731            blob_reader
732                .get_vmo(&evil_hash.into())
733                .await
734                .expect("transport error on BlobReader.GetVmo")
735                .expect_err("Hashes should mismatch");
736        }
737        fixture.close().await;
738    }
739
740    #[fasync::run(10, test)]
741    async fn test_blob_needs_overwrite_verifies_existence() {
742        let fixture = new_blob_fixture().await;
743        let data = vec![42u8; 32];
744        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
745
746        let blob_creator =
747            connect_to_protocol_at_dir_svc::<BlobCreatorMarker>(fixture.volume_out_dir())
748                .expect("failed to connect to the BlobCreator service");
749        // Do it once to fetch it fresh after clearing the cache.
750        fixture.volume().volume().dirent_cache().clear();
751        assert!(
752            !blob_creator
753                .needs_overwrite(&hash.into())
754                .await
755                .expect("fidl transport")
756                .expect("Find new blob")
757        );
758
759        // Get it into the cache and then check again.
760        let blob_reader =
761            connect_to_protocol_at_dir_svc::<BlobReaderMarker>(fixture.volume_out_dir())
762                .expect("failed to connect to the BlobReader service");
763        blob_reader
764            .get_vmo(&hash.into())
765            .await
766            .expect("transport error on BlobReader.GetVmo")
767            .expect("Opening blob");
768        assert!(
769            !blob_creator
770                .needs_overwrite(&hash.into())
771                .await
772                .expect("fidl transport")
773                .expect("Find new blob")
774        );
775
776        // Fail to find one that is missing.
777        blob_creator
778            .needs_overwrite(&[1u8; 32].into())
779            .await
780            .expect("fidl transport")
781            .expect_err("Blob should not exist");
782        fixture.close().await;
783    }
784}