Skip to main content

fxfs_platform/fuchsia/fxblob/
directory.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! This module contains the [`BlobDirectory`] node type used to represent a directory of immutable
6//! content-addressable blobs.
7
8use crate::fuchsia::component::map_to_raw_status;
9use crate::fuchsia::directory::FxDirectory;
10use crate::fuchsia::dirent_cache::DirentCacheKey;
11use crate::fuchsia::fxblob::blob::FxBlob;
12use crate::fuchsia::fxblob::writer::DeliveryBlobWriter;
13use crate::fuchsia::node::{FxNode, GetResult, OpenedNode};
14use crate::fuchsia::volume::{FxVolume, RootDir};
15use anyhow::{Context as _, Error, anyhow, ensure};
16use fidl::endpoints::{ClientEnd, DiscoverableProtocolMarker, ServerEnd, create_request_stream};
17use fidl_fuchsia_fxfs::{
18    BlobCreatorMarker, BlobCreatorRequest, BlobCreatorRequestStream, BlobReaderMarker,
19    BlobReaderRequest, BlobReaderRequestStream, BlobWriterMarker, CreateBlobError,
20};
21use fidl_fuchsia_io::{self as fio, FilesystemInfo, NodeMarker, WatchMask};
22use fuchsia_hash::Hash;
23use futures::TryStreamExt;
24use fxfs::errors::FxfsError;
25use fxfs::object_store::transaction::{LockKey, lock_keys};
26use fxfs::object_store::{self, HandleOptions, ObjectDescriptor, ObjectStore, StoreObjectHandle};
27use fxfs_macros::ToWeakNode;
28use fxfs_trace::{TraceFutureExt, trace_future_args};
29use std::str::FromStr;
30use std::sync::Arc;
31use vfs::directory::dirents_sink;
32use vfs::directory::entry::{DirectoryEntry, EntryInfo, GetEntryInfo, OpenRequest};
33use vfs::directory::entry_container::{
34    Directory as VfsDirectory, DirectoryWatcher, MutableDirectory,
35};
36use vfs::directory::helper::DirectlyMutable;
37use vfs::directory::mutable::connection::MutableConnection;
38use vfs::directory::simple::Simple;
39use vfs::directory::traversal_position::TraversalPosition;
40use vfs::execution_scope::ExecutionScope;
41use vfs::path::Path;
42use vfs::{ObjectRequestRef, ProtocolsExt, ToObjectRequest};
43use zx::Status;
44
45/// A flat directory containing content-addressable blobs (names are their hashes).
46/// It is not possible to create sub-directories.
47/// It is not possible to write to an existing blob.
48/// It is not possible to open or read a blob until it is written and verified.
49#[derive(ToWeakNode)]
50pub struct BlobDirectory {
51    directory: Arc<FxDirectory>,
52}
53
54/// Instead of constantly switching back and forth between strings and hashes. Do it once and then
55/// just pass around a reference to that.
56pub(crate) struct Identifier {
57    pub string: String,
58    pub hash: Hash,
59}
60
61impl TryFrom<&str> for Identifier {
62    type Error = FxfsError;
63    fn try_from(value: &str) -> Result<Self, Self::Error> {
64        Ok(Self {
65            string: value.to_owned(),
66            hash: Hash::from_str(value).map_err(|_| FxfsError::InvalidArgs)?,
67        })
68    }
69}
70
71impl From<Hash> for Identifier {
72    fn from(hash: Hash) -> Self {
73        Self { string: hash.to_string(), hash }
74    }
75}
76
77impl RootDir for BlobDirectory {
78    fn as_directory_entry(self: Arc<Self>) -> Arc<dyn DirectoryEntry> {
79        self
80    }
81
82    fn serve(self: Arc<Self>, flags: fio::Flags, server_end: ServerEnd<fio::DirectoryMarker>) {
83        let scope = self.volume().scope().clone();
84        vfs::directory::serve_on(self, flags, scope, server_end);
85    }
86
87    fn as_node(self: Arc<Self>) -> Arc<dyn FxNode> {
88        self as Arc<dyn FxNode>
89    }
90
91    fn register_additional_volume_services(self: Arc<Self>, svc_dir: &Simple) -> Result<(), Error> {
92        let this = self.clone();
93        svc_dir.add_entry(
94            BlobCreatorMarker::PROTOCOL_NAME,
95            vfs::service::host(move |r| this.clone().handle_blob_creator_requests(r)),
96        )?;
97
98        svc_dir.add_entry(
99            BlobReaderMarker::PROTOCOL_NAME,
100            vfs::service::host(move |r| self.clone().handle_blob_reader_requests(r)),
101        )?;
102
103        Ok(())
104    }
105}
106
107impl BlobDirectory {
108    fn new(directory: FxDirectory) -> Self {
109        Self { directory: Arc::new(directory) }
110    }
111
112    pub fn directory(&self) -> &Arc<FxDirectory> {
113        &self.directory
114    }
115
116    pub fn volume(&self) -> &Arc<FxVolume> {
117        self.directory.volume()
118    }
119
120    fn store(&self) -> &ObjectStore {
121        self.directory.store()
122    }
123
124    /// Open blob and get the child vmo. This allows the creation of the child vmo to be atomic with
125    /// the open.
126    pub(crate) async fn open_blob_get_vmo(
127        self: &Arc<Self>,
128        id: &Identifier,
129    ) -> Result<(Arc<FxBlob>, zx::Vmo), Error> {
130        let store = self.store();
131        let fs = store.filesystem();
132        let keys = lock_keys![LockKey::object(store.store_object_id(), self.directory.object_id())];
133        // A lock needs to be held over searching the directory and incrementing the open count.
134        let _guard = fs.lock_manager().read_lock(keys.clone()).await;
135        let blob = self.open_blob_locked(id).await?.ok_or(FxfsError::NotFound)?;
136        let vmo = blob.create_child_vmo()?;
137        // Downgrade from an OpenedNode<Node> to a Node.
138        Ok((blob.clone(), vmo))
139    }
140
141    /// Wraps ['open_blob_locked'] while taking the locks.
142    pub(crate) async fn open_blob(
143        self: &Arc<Self>,
144        id: &Identifier,
145    ) -> Result<Option<OpenedNode<FxBlob>>, Error> {
146        let store = self.store();
147        let fs = store.filesystem();
148        let keys = lock_keys![LockKey::object(store.store_object_id(), self.directory.object_id())];
149        // A lock needs to be held over searching the directory and incrementing the open count.
150        let _guard = fs.lock_manager().read_lock(keys.clone()).await;
151        self.open_blob_locked(id).await
152    }
153
154    /// Attempt to open and cache the blob with `id` in this directory. Returns `Ok(None)` if no
155    /// blob matching `id` was found. Requires holding locks for at least the object store and
156    /// directory object.
157    async fn open_blob_locked(
158        self: &Arc<Self>,
159        id: &Identifier,
160    ) -> Result<Option<OpenedNode<FxBlob>>, Error> {
161        let node = match self.directory.directory().owner().dirent_cache().lookup(&(
162            self.directory.object_id(),
163            &id.string,
164            false,
165        )) {
166            Some(node) => Some(node),
167            None => {
168                if let Some((object_id, _, _)) =
169                    self.directory.directory().lookup(&id.string).await?
170                {
171                    let node = self.get_or_load_node(object_id, &id).await?;
172                    self.directory.directory().owner().dirent_cache().insert(
173                        DirentCacheKey::new(self.directory.object_id(), id.string.clone(), false),
174                        node.clone(),
175                    );
176                    Some(node)
177                } else {
178                    None
179                }
180            }
181        };
182        let Some(node) = node else {
183            return Ok(None);
184        };
185        if node.object_descriptor() != ObjectDescriptor::File {
186            return Err(FxfsError::Inconsistent)
187                .with_context(|| format!("Blob {} has invalid object descriptor!", id.string));
188        }
189        node.into_any()
190            .downcast::<FxBlob>()
191            .map(|node| Some(OpenedNode::new(node)))
192            .map_err(|_| FxfsError::Inconsistent)
193            .with_context(|| format!("Blob {} has incorrect node type!", id.string))
194    }
195
196    // Attempts to get a node from the node cache. If the node wasn't present in the cache, loads
197    // the object from the object store, installing the returned node into the cache and returns the
198    // newly created FxNode backed by the loaded object.
199    async fn get_or_load_node(
200        self: &Arc<Self>,
201        object_id: u64,
202        id: &Identifier,
203    ) -> Result<Arc<dyn FxNode>, Error> {
204        let volume = self.volume();
205        match volume.cache().get_or_reserve(object_id).await {
206            GetResult::Node(node) => {
207                // Protecting against the scenario where a directory entry points to another node
208                // which has already been loaded and verified with the correct hash. We need to
209                // verify that the hash for the blob that is cached here matches the requested hash.
210                let blob = node.into_any().downcast::<FxBlob>().map_err(|_| {
211                    anyhow!(FxfsError::Inconsistent).context("Loaded non-blob from cache")
212                })?;
213                ensure!(
214                    blob.root() == id.hash,
215                    anyhow!(FxfsError::Inconsistent)
216                        .context("Loaded blob by node that did not match the given hash")
217                );
218                Ok(blob as Arc<dyn FxNode>)
219            }
220            GetResult::Placeholder(placeholder) => {
221                let handle = StoreObjectHandle::new(
222                    volume.clone(),
223                    object_id,
224                    /*permanent_keys=*/ false,
225                    HandleOptions::default(),
226                    /*trace=*/ false,
227                );
228                let node = FxBlob::new(handle, id.hash).await? as Arc<dyn FxNode>;
229                placeholder.commit(&node);
230                Ok(node)
231            }
232        }
233    }
234
235    /// Creates a [`ClientEnd<BlobWriterMarker>`] to write the delivery blob identified by `hash`.
236    /// It is safe to create multiple writers for a given `hash`, however only one will succeed.
237    /// Requests are handled asynchronously on this volume's execution scope.
238    async fn create_blob_writer(
239        self: &Arc<Self>,
240        hash: Hash,
241        allow_existing: bool,
242    ) -> Result<ClientEnd<BlobWriterMarker>, CreateBlobError> {
243        let id = hash.into();
244        let blob_exists = self
245            .open_blob(&id)
246            .await
247            .map_err(|e| {
248                log::error!("Failed to lookup blob: {:?}", e);
249                CreateBlobError::Internal
250            })?
251            .is_some();
252        if blob_exists && !allow_existing {
253            return Err(CreateBlobError::AlreadyExists);
254        }
255        let (client_end, request_stream) = create_request_stream::<BlobWriterMarker>();
256        let writer = DeliveryBlobWriter::new(self, hash).await.map_err(|e| {
257            log::error!("Failed to create blob writer: {:?}", e);
258            CreateBlobError::Internal
259        })?;
260        self.volume().scope().spawn(async move {
261            if let Err(e) = writer.handle_requests(request_stream).await {
262                log::error!("Failed to handle BlobWriter requests: {}", e);
263            }
264        });
265        return Ok(client_end);
266    }
267
268    async fn needs_overwrite(&self, blob_hash: Identifier) -> Result<bool, Error> {
269        // We don't take a lock here because this will only look up existence for now. If we
270        // actually start fetching the blob or info about it after looking it up this will need to
271        // take a reader lock on the directory and maybe also the object.
272        if self
273            .volume()
274            .dirent_cache()
275            .lookup(&(self.object_id(), &blob_hash.string, false))
276            .is_some()
277        {
278            return Ok(false);
279        }
280        match self.directory.directory().lookup(&blob_hash.string).await? {
281            Some(_) => Ok(false),
282            None => Err(FxfsError::NotFound.into()),
283        }
284    }
285
286    async fn handle_blob_creator_requests(self: Arc<Self>, mut requests: BlobCreatorRequestStream) {
287        while let Ok(Some(request)) = requests.try_next().await {
288            match request {
289                BlobCreatorRequest::Create { responder, hash, allow_existing } => {
290                    async {
291                        responder
292                            .send(self.create_blob_writer(Hash::from(hash), allow_existing).await)
293                            .unwrap_or_else(|error| {
294                                log::error!(error:?; "failed to send Create response");
295                            });
296                    }
297                    .trace(trace_future_args!("BlobCreator::Create"))
298                    .await;
299                }
300                BlobCreatorRequest::NeedsOverwrite { blob_hash, responder } => {
301                    async {
302                        let _ = responder.send(
303                            self.needs_overwrite(Hash::from(blob_hash).into())
304                                .await
305                                .map_err(map_to_raw_status),
306                        );
307                    }
308                    .trace(trace_future_args!("BlobCreator::NeedsOverwrite"))
309                    .await;
310                }
311            }
312        }
313    }
314
315    async fn handle_blob_reader_requests(self: Arc<Self>, mut requests: BlobReaderRequestStream) {
316        while let Ok(Some(request)) = requests.try_next().await {
317            match request {
318                BlobReaderRequest::GetVmo { blob_hash, responder } => {
319                    async {
320                        responder
321                            .send(
322                                self.get_blob_vmo(blob_hash.into())
323                                    .await
324                                    .map_err(map_to_raw_status),
325                            )
326                            .unwrap_or_else(|error| {
327                                log::error!(error:?; "failed to send GetVmo response");
328                            });
329                    }
330                    .trace(trace_future_args!("BlobReader::GetVmo"))
331                    .await;
332                }
333            };
334        }
335    }
336
337    async fn open_impl(
338        self: Arc<Self>,
339        scope: ExecutionScope,
340        path: Path,
341        flags: impl ProtocolsExt,
342        object_request: ObjectRequestRef<'_>,
343    ) -> Result<(), Status> {
344        if path.is_empty() {
345            object_request
346                .create_connection::<MutableConnection<_>, _>(
347                    scope,
348                    OpenedNode::new(self).take(),
349                    flags,
350                )
351                .await
352        } else {
353            Err(Status::NOT_SUPPORTED)
354        }
355    }
356}
357
358impl FxNode for BlobDirectory {
359    fn object_id(&self) -> u64 {
360        self.directory.object_id()
361    }
362
363    fn parent(&self) -> Option<Arc<FxDirectory>> {
364        self.directory.parent()
365    }
366
367    fn set_parent(&self, _parent: Arc<FxDirectory>) {
368        // This directory can't be renamed.
369        unreachable!();
370    }
371
372    fn open_count_add_one(&self) {}
373    fn open_count_sub_one(self: Arc<Self>) {}
374
375    fn object_descriptor(&self) -> ObjectDescriptor {
376        ObjectDescriptor::Directory
377    }
378}
379
380impl MutableDirectory for BlobDirectory {
381    async fn unlink(self: Arc<Self>, name: &str, must_be_directory: bool) -> Result<(), Status> {
382        if must_be_directory {
383            return Err(Status::INVALID_ARGS);
384        }
385        self.directory.clone().unlink(name, must_be_directory).await
386    }
387
388    async fn update_attributes(
389        &self,
390        attributes: fio::MutableNodeAttributes,
391    ) -> Result<(), Status> {
392        self.directory.update_attributes(attributes).await
393    }
394
395    async fn sync(&self) -> Result<(), Status> {
396        self.directory.sync().await
397    }
398}
399
400/// Implementation of VFS pseudo-directory for blobs. Forks a task per connection.
401impl DirectoryEntry for BlobDirectory {
402    fn open_entry(self: Arc<Self>, request: OpenRequest<'_>) -> Result<(), Status> {
403        request.open_dir(self)
404    }
405
406    fn scope(&self) -> Option<ExecutionScope> {
407        Some(self.volume().scope().clone())
408    }
409}
410
411impl GetEntryInfo for BlobDirectory {
412    fn entry_info(&self) -> EntryInfo {
413        self.directory.entry_info()
414    }
415}
416
417impl vfs::node::Node for BlobDirectory {
418    async fn get_attributes(
419        &self,
420        requested_attributes: fio::NodeAttributesQuery,
421    ) -> Result<fio::NodeAttributes2, Status> {
422        self.directory.get_attributes(requested_attributes).await
423    }
424
425    fn query_filesystem(&self) -> Result<FilesystemInfo, Status> {
426        self.directory.query_filesystem()
427    }
428}
429
430/// Implements VFS entry container trait for directories, allowing manipulation of their contents.
431impl VfsDirectory for BlobDirectory {
432    fn deprecated_open(
433        self: Arc<Self>,
434        scope: ExecutionScope,
435        flags: fio::OpenFlags,
436        path: Path,
437        server_end: ServerEnd<NodeMarker>,
438    ) {
439        scope.clone().spawn(flags.to_object_request(server_end).handle_async(
440            async move |object_request| self.open_impl(scope, path, flags, object_request).await,
441        ));
442    }
443
444    fn open(
445        self: Arc<Self>,
446        scope: ExecutionScope,
447        path: Path,
448        flags: fio::Flags,
449        object_request: ObjectRequestRef<'_>,
450    ) -> Result<(), Status> {
451        scope.clone().spawn(object_request.take().handle_async(async move |object_request| {
452            self.open_impl(scope, path, flags, object_request).await
453        }));
454        Ok(())
455    }
456
457    async fn open_async(
458        self: Arc<Self>,
459        scope: ExecutionScope,
460        path: Path,
461        flags: fio::Flags,
462        object_request: ObjectRequestRef<'_>,
463    ) -> Result<(), Status> {
464        self.open_impl(scope, path, flags, object_request).await
465    }
466
467    async fn read_dirents(
468        &self,
469        pos: &TraversalPosition,
470        sink: Box<dyn dirents_sink::Sink>,
471    ) -> Result<(TraversalPosition, Box<dyn dirents_sink::Sealed>), Status> {
472        self.directory.read_dirents(pos, sink).await
473    }
474
475    fn register_watcher(
476        self: Arc<Self>,
477        scope: ExecutionScope,
478        mask: WatchMask,
479        watcher: DirectoryWatcher,
480    ) -> Result<(), Status> {
481        self.directory.clone().register_watcher(scope, mask, watcher)
482    }
483
484    fn unregister_watcher(self: Arc<Self>, key: usize) {
485        self.directory.clone().unregister_watcher(key)
486    }
487}
488
489impl From<object_store::Directory<FxVolume>> for BlobDirectory {
490    fn from(dir: object_store::Directory<FxVolume>) -> Self {
491        Self::new(dir.into())
492    }
493}
494
495#[cfg(test)]
496mod tests {
497    use super::*;
498    use crate::fuchsia::fxblob::testing::{BlobFixture, new_blob_fixture, open_blob_fixture};
499    use assert_matches::assert_matches;
500    use blob_writer::BlobWriter;
501    use delivery_blob::{CompressionMode, Type1Blob};
502    use fidl_fuchsia_fxfs::BlobReaderMarker;
503    use fuchsia_async::{self as fasync, DurationExt as _, TimeoutExt as _};
504    use fuchsia_component_client::connect_to_protocol_at_dir_svc;
505    use fuchsia_fs::directory::{
506        DirEntry, DirentKind, WatchEvent, WatchMessage, Watcher, readdir_inclusive,
507    };
508
509    use futures::StreamExt as _;
510    use std::path::PathBuf;
511
512    #[fasync::run(10, test)]
513    async fn test_unlink() {
514        let fixture = new_blob_fixture().await;
515
516        let data = [1; 1000];
517
518        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
519
520        assert_eq!(fixture.read_blob(hash).await, data);
521
522        fixture
523            .root()
524            .unlink(&format!("{}", hash), &fio::UnlinkOptions::default())
525            .await
526            .expect("FIDL failed")
527            .expect("unlink failed");
528
529        fixture.close().await;
530    }
531
532    #[fasync::run(10, test)]
533    async fn test_readdir() {
534        let fixture = new_blob_fixture().await;
535
536        let data = [0xab; 2];
537        let hash;
538        {
539            hash = fuchsia_merkle::root_from_slice(&data);
540            let compressed_data: Vec<u8> = Type1Blob::generate(&data, CompressionMode::Always);
541
542            let blob_proxy =
543                connect_to_protocol_at_dir_svc::<fidl_fuchsia_fxfs::BlobCreatorMarker>(
544                    fixture.volume_out_dir(),
545                )
546                .expect("failed to connect to the Blob service");
547
548            let blob_writer_client_end = blob_proxy
549                .create(&hash.into(), false)
550                .await
551                .expect("transport error on create")
552                .expect("failed to create blob");
553
554            let writer = blob_writer_client_end.into_proxy();
555            let mut blob_writer = BlobWriter::create(writer, compressed_data.len() as u64)
556                .await
557                .expect("failed to create BlobWriter");
558            blob_writer.write(&compressed_data[..1]).await.unwrap();
559
560            // Before the blob is finished writing, it shouldn't appear in the directory.
561            assert_eq!(
562                readdir_inclusive(fixture.root()).await.ok(),
563                Some(vec![DirEntry { name: ".".to_string(), kind: DirentKind::Directory }])
564            );
565
566            blob_writer.write(&compressed_data[1..]).await.unwrap();
567        }
568
569        assert_eq!(
570            readdir_inclusive(fixture.root()).await.ok(),
571            Some(vec![
572                DirEntry { name: ".".to_string(), kind: DirentKind::Directory },
573                DirEntry { name: format! {"{}", hash}, kind: DirentKind::File },
574            ])
575        );
576
577        fixture.close().await;
578    }
579
580    #[fasync::run(10, test)]
581    async fn test_watchers() {
582        let fixture = new_blob_fixture().await;
583
584        let mut watcher = Watcher::new(fixture.root()).await.unwrap();
585        assert_eq!(
586            watcher.next().await,
587            Some(Ok(WatchMessage { event: WatchEvent::EXISTING, filename: PathBuf::from(".") }))
588        );
589        assert_matches!(
590            watcher.next().await,
591            Some(Ok(WatchMessage { event: WatchEvent::IDLE, .. }))
592        );
593
594        let data = vec![vec![0xab; 2], vec![0xcd; 65_536]];
595        let mut hashes = vec![];
596        let mut filenames = vec![];
597        for datum in data {
598            let hash = fuchsia_merkle::root_from_slice(&datum);
599            let filename = PathBuf::from(format!("{}", hash));
600            hashes.push(hash.clone());
601            filenames.push(filename.clone());
602
603            let compressed_data: Vec<u8> = Type1Blob::generate(&datum, CompressionMode::Always);
604
605            let blob_proxy =
606                connect_to_protocol_at_dir_svc::<fidl_fuchsia_fxfs::BlobCreatorMarker>(
607                    fixture.volume_out_dir(),
608                )
609                .expect("failed to connect to the Blob service");
610
611            let blob_writer_client_end = blob_proxy
612                .create(&hash.into(), false)
613                .await
614                .expect("transport error on create")
615                .expect("failed to create blob");
616
617            let writer = blob_writer_client_end.into_proxy();
618            let mut blob_writer = BlobWriter::create(writer, compressed_data.len() as u64)
619                .await
620                .expect("failed to create BlobWriter");
621            blob_writer.write(&compressed_data[..compressed_data.len() - 1]).await.unwrap();
622
623            // Before the blob is finished writing, we shouldn't see any watch events for it.
624            assert_matches!(
625                watcher
626                    .next()
627                    .on_timeout(zx::MonotonicDuration::from_millis(500).after_now(), || None)
628                    .await,
629                None
630            );
631
632            blob_writer.write(&compressed_data[compressed_data.len() - 1..]).await.unwrap();
633
634            assert_eq!(
635                watcher.next().await,
636                Some(Ok(WatchMessage { event: WatchEvent::ADD_FILE, filename }))
637            );
638        }
639
640        for (hash, filename) in hashes.iter().zip(filenames) {
641            fixture
642                .root()
643                .unlink(&format!("{}", hash), &fio::UnlinkOptions::default())
644                .await
645                .expect("FIDL call failed")
646                .expect("unlink failed");
647            assert_eq!(
648                watcher.next().await,
649                Some(Ok(WatchMessage { event: WatchEvent::REMOVE_FILE, filename }))
650            );
651        }
652
653        std::mem::drop(watcher);
654        fixture.close().await;
655    }
656
657    #[fasync::run(10, test)]
658    async fn test_rename_fails() {
659        let fixture = new_blob_fixture().await;
660
661        let data = vec![];
662        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
663
664        let (status, token) = fixture.root().get_token().await.expect("FIDL failed");
665        Status::ok(status).unwrap();
666        fixture
667            .root()
668            .rename(&format!("{}", hash), token.unwrap().into(), "foo")
669            .await
670            .expect("FIDL failed")
671            .expect_err("rename should fail");
672
673        fixture.close().await;
674    }
675
676    #[fasync::run(10, test)]
677    async fn test_link_fails() {
678        let fixture = new_blob_fixture().await;
679
680        let data = vec![];
681        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
682
683        let (status, token) = fixture.root().get_token().await.expect("FIDL failed");
684        Status::ok(status).unwrap();
685        let status = fixture
686            .root()
687            .link(&format!("{}", hash), token.unwrap().into(), "foo")
688            .await
689            .expect("FIDL failed");
690        assert_eq!(Status::from_raw(status), Status::NOT_SUPPORTED);
691
692        fixture.close().await;
693    }
694
695    #[fasync::run(10, test)]
696    async fn test_verify_cached_hash_node() {
697        let fixture = new_blob_fixture().await;
698
699        let data = vec![];
700        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
701        let evil_hash =
702            Hash::from_str("2222222222222222222222222222222222222222222222222222222222222222")
703                .unwrap();
704
705        // Create a malicious link to the existing blob. This shouldn't be possible without special
706        // access either via internal apis or modifying the disk image.
707        {
708            let root = fixture
709                .volume()
710                .root()
711                .clone()
712                .as_node()
713                .into_any()
714                .downcast::<BlobDirectory>()
715                .unwrap()
716                .directory()
717                .clone();
718            root.clone()
719                .link(evil_hash.to_string(), root, &hash.to_string())
720                .await
721                .expect("Linking file");
722        }
723        let device = fixture.close().await;
724
725        let fixture = open_blob_fixture(device).await;
726        {
727            // Hold open a ref to keep it in the node cache.
728            let _vmo = fixture.get_blob_vmo(hash).await;
729
730            // Open the malicious link
731            let blob_reader =
732                connect_to_protocol_at_dir_svc::<BlobReaderMarker>(fixture.volume_out_dir())
733                    .expect("failed to connect to the BlobReader service");
734            blob_reader
735                .get_vmo(&evil_hash.into())
736                .await
737                .expect("transport error on BlobReader.GetVmo")
738                .expect_err("Hashes should mismatch");
739        }
740        fixture.close().await;
741    }
742
743    #[fasync::run(10, test)]
744    async fn test_blob_needs_overwrite_verifies_existence() {
745        let fixture = new_blob_fixture().await;
746        let data = vec![42u8; 32];
747        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
748
749        let blob_creator =
750            connect_to_protocol_at_dir_svc::<BlobCreatorMarker>(fixture.volume_out_dir())
751                .expect("failed to connect to the BlobCreator service");
752        // Do it once to fetch it fresh after clearing the cache.
753        fixture.volume().volume().dirent_cache().clear();
754        assert!(
755            !blob_creator
756                .needs_overwrite(&hash.into())
757                .await
758                .expect("fidl transport")
759                .expect("Find new blob")
760        );
761
762        // Get it into the cache and then check again.
763        let blob_reader =
764            connect_to_protocol_at_dir_svc::<BlobReaderMarker>(fixture.volume_out_dir())
765                .expect("failed to connect to the BlobReader service");
766        blob_reader
767            .get_vmo(&hash.into())
768            .await
769            .expect("transport error on BlobReader.GetVmo")
770            .expect("Opening blob");
771        assert!(
772            !blob_creator
773                .needs_overwrite(&hash.into())
774                .await
775                .expect("fidl transport")
776                .expect("Find new blob")
777        );
778
779        // Fail to find one that is missing.
780        blob_creator
781            .needs_overwrite(&[1u8; 32].into())
782            .await
783            .expect("fidl transport")
784            .expect_err("Blob should not exist");
785        fixture.close().await;
786    }
787}