Skip to main content

fxfs_platform/fuchsia/fxblob/
directory.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! This module contains the [`BlobDirectory`] node type used to represent a directory of immutable
6//! content-addressable blobs.
7
8use crate::fuchsia::component::map_to_raw_status;
9use crate::fuchsia::directory::FxDirectory;
10use crate::fuchsia::fxblob::blob::{CompressionInfo, FxBlob};
11use crate::fuchsia::fxblob::writer::DeliveryBlobWriter;
12use crate::fuchsia::node::{FxNode, GetResult, OpenedNode};
13use crate::fuchsia::volume::{FxVolume, RootDir};
14use anyhow::{Context as _, Error, anyhow, ensure};
15use delivery_blob::compression::CompressionAlgorithm;
16use fidl::endpoints::{ClientEnd, DiscoverableProtocolMarker, ServerEnd, create_request_stream};
17use fidl_fuchsia_fxfs::{
18    BlobCreatorMarker, BlobCreatorRequest, BlobCreatorRequestStream, BlobReaderMarker,
19    BlobReaderRequest, BlobReaderRequestStream, BlobWriterMarker, CreateBlobError,
20};
21use fidl_fuchsia_io::{self as fio, FilesystemInfo, NodeMarker, WatchMask};
22use fuchsia_hash::Hash;
23use futures::TryStreamExt;
24use fxfs::blob_metadata::{BlobFormat, BlobMetadata};
25use fxfs::errors::FxfsError;
26use fxfs::object_handle::ReadObjectHandle;
27use fxfs::object_store::transaction::{LockKey, lock_keys};
28use fxfs::object_store::{self, HandleOptions, ObjectDescriptor, ObjectStore};
29use fxfs_macros::ToWeakNode;
30use std::str::FromStr;
31use std::sync::Arc;
32use vfs::directory::dirents_sink;
33use vfs::directory::entry::{DirectoryEntry, EntryInfo, GetEntryInfo, OpenRequest};
34use vfs::directory::entry_container::{
35    Directory as VfsDirectory, DirectoryWatcher, MutableDirectory,
36};
37use vfs::directory::helper::DirectlyMutable;
38use vfs::directory::mutable::connection::MutableConnection;
39use vfs::directory::simple::Simple;
40use vfs::directory::traversal_position::TraversalPosition;
41use vfs::execution_scope::ExecutionScope;
42use vfs::path::Path;
43use vfs::{ObjectRequestRef, ProtocolsExt, ToObjectRequest};
44use zx::Status;
45
46/// A flat directory containing content-addressable blobs (names are their hashes).
47/// It is not possible to create sub-directories.
48/// It is not possible to write to an existing blob.
49/// It is not possible to open or read a blob until it is written and verified.
50#[derive(ToWeakNode)]
51pub struct BlobDirectory {
52    directory: Arc<FxDirectory>,
53}
54
55/// Instead of constantly switching back and forth between strings and hashes. Do it once and then
56/// just pass around a reference to that.
57pub(crate) struct Identifier {
58    pub string: String,
59    pub hash: Hash,
60}
61
62impl TryFrom<&str> for Identifier {
63    type Error = FxfsError;
64    fn try_from(value: &str) -> Result<Self, Self::Error> {
65        Ok(Self {
66            string: value.to_owned(),
67            hash: Hash::from_str(value).map_err(|_| FxfsError::InvalidArgs)?,
68        })
69    }
70}
71
72impl From<Hash> for Identifier {
73    fn from(hash: Hash) -> Self {
74        Self { string: hash.to_string(), hash }
75    }
76}
77
78impl RootDir for BlobDirectory {
79    fn as_directory_entry(self: Arc<Self>) -> Arc<dyn DirectoryEntry> {
80        self
81    }
82
83    fn serve(self: Arc<Self>, flags: fio::Flags, server_end: ServerEnd<fio::DirectoryMarker>) {
84        let scope = self.volume().scope().clone();
85        vfs::directory::serve_on(self, flags, scope, server_end);
86    }
87
88    fn as_node(self: Arc<Self>) -> Arc<dyn FxNode> {
89        self as Arc<dyn FxNode>
90    }
91
92    fn register_additional_volume_services(self: Arc<Self>, svc_dir: &Simple) -> Result<(), Error> {
93        let this = self.clone();
94        svc_dir.add_entry(
95            BlobCreatorMarker::PROTOCOL_NAME,
96            vfs::service::host(move |r| this.clone().handle_blob_creator_requests(r)),
97        )?;
98
99        svc_dir.add_entry(
100            BlobReaderMarker::PROTOCOL_NAME,
101            vfs::service::host(move |r| self.clone().handle_blob_reader_requests(r)),
102        )?;
103
104        Ok(())
105    }
106}
107
108impl BlobDirectory {
109    fn new(directory: FxDirectory) -> Self {
110        Self { directory: Arc::new(directory) }
111    }
112
113    pub fn directory(&self) -> &Arc<FxDirectory> {
114        &self.directory
115    }
116
117    pub fn volume(&self) -> &Arc<FxVolume> {
118        self.directory.volume()
119    }
120
121    fn store(&self) -> &ObjectStore {
122        self.directory.store()
123    }
124
125    /// Open blob and get the child vmo. This allows the creation of the child vmo to be atomic with
126    /// the open.
127    pub(crate) async fn open_blob_get_vmo(
128        self: &Arc<Self>,
129        id: &Identifier,
130    ) -> Result<(Arc<FxBlob>, zx::Vmo), Error> {
131        let store = self.store();
132        let fs = store.filesystem();
133        let keys = lock_keys![LockKey::object(store.store_object_id(), self.directory.object_id())];
134        // A lock needs to be held over searching the directory and incrementing the open count.
135        let _guard = fs.lock_manager().read_lock(keys.clone()).await;
136        let blob = self.open_blob_locked(id).await?.ok_or(FxfsError::NotFound)?;
137        let vmo = blob.create_child_vmo()?;
138        // Downgrade from an OpenedNode<Node> to a Node.
139        Ok((blob.clone(), vmo))
140    }
141
142    /// Wraps ['open_blob_locked'] while taking the locks.
143    pub(crate) async fn open_blob(
144        self: &Arc<Self>,
145        id: &Identifier,
146    ) -> Result<Option<OpenedNode<FxBlob>>, Error> {
147        let store = self.store();
148        let fs = store.filesystem();
149        let keys = lock_keys![LockKey::object(store.store_object_id(), self.directory.object_id())];
150        // A lock needs to be held over searching the directory and incrementing the open count.
151        let _guard = fs.lock_manager().read_lock(keys.clone()).await;
152        self.open_blob_locked(id).await
153    }
154
155    /// Attempt to open and cache the blob with `id` in this directory. Returns `Ok(None)` if no
156    /// blob matching `id` was found. Requires holding locks for at least the object store and
157    /// directory object.
158    async fn open_blob_locked(
159        self: &Arc<Self>,
160        id: &Identifier,
161    ) -> Result<Option<OpenedNode<FxBlob>>, Error> {
162        let node = match self
163            .directory
164            .directory()
165            .owner()
166            .dirent_cache()
167            .lookup(&(self.directory.object_id(), &id.string))
168        {
169            Some(node) => Some(node),
170            None => {
171                if let Some((object_id, _, _)) =
172                    self.directory.directory().lookup(&id.string).await?
173                {
174                    let node = self.get_or_load_node(object_id, &id).await?;
175                    self.directory.directory().owner().dirent_cache().insert(
176                        self.directory.object_id(),
177                        id.string.clone(),
178                        node.clone(),
179                    );
180                    Some(node)
181                } else {
182                    None
183                }
184            }
185        };
186        let Some(node) = node else {
187            return Ok(None);
188        };
189        if node.object_descriptor() != ObjectDescriptor::File {
190            return Err(FxfsError::Inconsistent)
191                .with_context(|| format!("Blob {} has invalid object descriptor!", id.string));
192        }
193        node.into_any()
194            .downcast::<FxBlob>()
195            .map(|node| Some(OpenedNode::new(node)))
196            .map_err(|_| FxfsError::Inconsistent)
197            .with_context(|| format!("Blob {} has incorrect node type!", id.string))
198    }
199
200    // Attempts to get a node from the node cache. If the node wasn't present in the cache, loads
201    // the object from the object store, installing the returned node into the cache and returns the
202    // newly created FxNode backed by the loaded object.
203    async fn get_or_load_node(
204        self: &Arc<Self>,
205        object_id: u64,
206        id: &Identifier,
207    ) -> Result<Arc<dyn FxNode>, Error> {
208        let volume = self.volume();
209        match volume.cache().get_or_reserve(object_id).await {
210            GetResult::Node(node) => {
211                // Protecting against the scenario where a directory entry points to another node
212                // which has already been loaded and verified with the correct hash. We need to
213                // verify that the hash for the blob that is cached here matches the requested hash.
214                let blob = node.into_any().downcast::<FxBlob>().map_err(|_| {
215                    anyhow!(FxfsError::Inconsistent).context("Loaded non-blob from cache")
216                })?;
217                ensure!(
218                    blob.root() == id.hash,
219                    anyhow!(FxfsError::Inconsistent)
220                        .context("Loaded blob by node that did not match the given hash")
221                );
222                Ok(blob as Arc<dyn FxNode>)
223            }
224            GetResult::Placeholder(placeholder) => {
225                let object =
226                    ObjectStore::open_object(volume, object_id, HandleOptions::default(), None)
227                        .await?;
228                let metadata = BlobMetadata::read_from(&object).await?;
229                let (uncompressed_size, compression_info) = match &metadata.format {
230                    BlobFormat::Uncompressed => (object.get_size(), None),
231                    BlobFormat::ChunkedZstd {
232                        uncompressed_size,
233                        chunk_size,
234                        compressed_offsets,
235                    } => (
236                        *uncompressed_size,
237                        Some(CompressionInfo::new(
238                            *chunk_size,
239                            compressed_offsets,
240                            CompressionAlgorithm::Zstd,
241                        )?),
242                    ),
243                    BlobFormat::ChunkedLz4 {
244                        uncompressed_size,
245                        chunk_size,
246                        compressed_offsets,
247                    } => (
248                        *uncompressed_size,
249                        Some(CompressionInfo::new(
250                            *chunk_size,
251                            compressed_offsets,
252                            CompressionAlgorithm::Lz4,
253                        )?),
254                    ),
255                };
256                let merkle_verifier = metadata.into_merkle_verifier(id.hash)?;
257
258                let node = FxBlob::new(
259                    object,
260                    id.hash,
261                    merkle_verifier,
262                    compression_info,
263                    uncompressed_size,
264                )? as Arc<dyn FxNode>;
265                placeholder.commit(&node);
266                Ok(node)
267            }
268        }
269    }
270
271    /// Creates a [`ClientEnd<BlobWriterMarker>`] to write the delivery blob identified by `hash`.
272    /// It is safe to create multiple writers for a given `hash`, however only one will succeed.
273    /// Requests are handled asynchronously on this volume's execution scope.
274    async fn create_blob_writer(
275        self: &Arc<Self>,
276        hash: Hash,
277        allow_existing: bool,
278    ) -> Result<ClientEnd<BlobWriterMarker>, CreateBlobError> {
279        let id = hash.into();
280        let blob_exists = self
281            .open_blob(&id)
282            .await
283            .map_err(|e| {
284                log::error!("Failed to lookup blob: {:?}", e);
285                CreateBlobError::Internal
286            })?
287            .is_some();
288        if blob_exists && !allow_existing {
289            return Err(CreateBlobError::AlreadyExists);
290        }
291        let (client_end, request_stream) = create_request_stream::<BlobWriterMarker>();
292        let writer = DeliveryBlobWriter::new(self, hash).await.map_err(|e| {
293            log::error!("Failed to create blob writer: {:?}", e);
294            CreateBlobError::Internal
295        })?;
296        self.volume().scope().spawn(async move {
297            if let Err(e) = writer.handle_requests(request_stream).await {
298                log::error!("Failed to handle BlobWriter requests: {}", e);
299            }
300        });
301        return Ok(client_end);
302    }
303
304    async fn needs_overwrite(&self, blob_hash: Identifier) -> Result<bool, Error> {
305        // We don't take a lock here because this will only look up existence for now. If we
306        // actually start fetching the blob or info about it after looking it up this will need to
307        // take a reader lock on the directory and maybe also the object.
308        if self.volume().dirent_cache().lookup(&(self.object_id(), &blob_hash.string)).is_some() {
309            return Ok(false);
310        }
311        match self.directory.directory().lookup(&blob_hash.string).await? {
312            Some(_) => Ok(false),
313            None => Err(FxfsError::NotFound.into()),
314        }
315    }
316
317    async fn handle_blob_creator_requests(self: Arc<Self>, mut requests: BlobCreatorRequestStream) {
318        while let Ok(Some(request)) = requests.try_next().await {
319            match request {
320                BlobCreatorRequest::Create { responder, hash, allow_existing } => {
321                    responder
322                        .send(self.create_blob_writer(Hash::from(hash), allow_existing).await)
323                        .unwrap_or_else(|error| {
324                            log::error!(error:?; "failed to send Create response");
325                        });
326                }
327                BlobCreatorRequest::NeedsOverwrite { blob_hash, responder } => {
328                    let _ = responder.send(
329                        self.needs_overwrite(Hash::from(blob_hash).into())
330                            .await
331                            .map_err(map_to_raw_status),
332                    );
333                }
334            }
335        }
336    }
337
338    async fn handle_blob_reader_requests(self: Arc<Self>, mut requests: BlobReaderRequestStream) {
339        while let Ok(Some(request)) = requests.try_next().await {
340            match request {
341                BlobReaderRequest::GetVmo { blob_hash, responder } => {
342                    responder
343                        .send(self.get_blob_vmo(blob_hash.into()).await.map_err(map_to_raw_status))
344                        .unwrap_or_else(|error| {
345                            log::error!(error:?; "failed to send GetVmo response");
346                        });
347                }
348            };
349        }
350    }
351
352    async fn open_impl(
353        self: Arc<Self>,
354        scope: ExecutionScope,
355        path: Path,
356        flags: impl ProtocolsExt,
357        object_request: ObjectRequestRef<'_>,
358    ) -> Result<(), Status> {
359        if path.is_empty() {
360            object_request
361                .create_connection::<MutableConnection<_>, _>(
362                    scope,
363                    OpenedNode::new(self).take(),
364                    flags,
365                )
366                .await
367        } else {
368            Err(Status::NOT_SUPPORTED)
369        }
370    }
371}
372
373impl FxNode for BlobDirectory {
374    fn object_id(&self) -> u64 {
375        self.directory.object_id()
376    }
377
378    fn parent(&self) -> Option<Arc<FxDirectory>> {
379        self.directory.parent()
380    }
381
382    fn set_parent(&self, _parent: Arc<FxDirectory>) {
383        // This directory can't be renamed.
384        unreachable!();
385    }
386
387    fn open_count_add_one(&self) {}
388    fn open_count_sub_one(self: Arc<Self>) {}
389
390    fn object_descriptor(&self) -> ObjectDescriptor {
391        ObjectDescriptor::Directory
392    }
393}
394
395impl MutableDirectory for BlobDirectory {
396    async fn unlink(self: Arc<Self>, name: &str, must_be_directory: bool) -> Result<(), Status> {
397        if must_be_directory {
398            return Err(Status::INVALID_ARGS);
399        }
400        self.directory.clone().unlink(name, must_be_directory).await
401    }
402
403    async fn update_attributes(
404        &self,
405        attributes: fio::MutableNodeAttributes,
406    ) -> Result<(), Status> {
407        self.directory.update_attributes(attributes).await
408    }
409
410    async fn sync(&self) -> Result<(), Status> {
411        self.directory.sync().await
412    }
413}
414
415/// Implementation of VFS pseudo-directory for blobs. Forks a task per connection.
416impl DirectoryEntry for BlobDirectory {
417    fn open_entry(self: Arc<Self>, request: OpenRequest<'_>) -> Result<(), Status> {
418        request.open_dir(self)
419    }
420
421    fn scope(&self) -> Option<ExecutionScope> {
422        Some(self.volume().scope().clone())
423    }
424}
425
426impl GetEntryInfo for BlobDirectory {
427    fn entry_info(&self) -> EntryInfo {
428        self.directory.entry_info()
429    }
430}
431
432impl vfs::node::Node for BlobDirectory {
433    async fn get_attributes(
434        &self,
435        requested_attributes: fio::NodeAttributesQuery,
436    ) -> Result<fio::NodeAttributes2, Status> {
437        self.directory.get_attributes(requested_attributes).await
438    }
439
440    fn query_filesystem(&self) -> Result<FilesystemInfo, Status> {
441        self.directory.query_filesystem()
442    }
443}
444
445/// Implements VFS entry container trait for directories, allowing manipulation of their contents.
446impl VfsDirectory for BlobDirectory {
447    fn deprecated_open(
448        self: Arc<Self>,
449        scope: ExecutionScope,
450        flags: fio::OpenFlags,
451        path: Path,
452        server_end: ServerEnd<NodeMarker>,
453    ) {
454        scope.clone().spawn(flags.to_object_request(server_end).handle_async(
455            async move |object_request| self.open_impl(scope, path, flags, object_request).await,
456        ));
457    }
458
459    fn open(
460        self: Arc<Self>,
461        scope: ExecutionScope,
462        path: Path,
463        flags: fio::Flags,
464        object_request: ObjectRequestRef<'_>,
465    ) -> Result<(), Status> {
466        scope.clone().spawn(object_request.take().handle_async(async move |object_request| {
467            self.open_impl(scope, path, flags, object_request).await
468        }));
469        Ok(())
470    }
471
472    async fn open_async(
473        self: Arc<Self>,
474        scope: ExecutionScope,
475        path: Path,
476        flags: fio::Flags,
477        object_request: ObjectRequestRef<'_>,
478    ) -> Result<(), Status> {
479        self.open_impl(scope, path, flags, object_request).await
480    }
481
482    async fn read_dirents(
483        &self,
484        pos: &TraversalPosition,
485        sink: Box<dyn dirents_sink::Sink>,
486    ) -> Result<(TraversalPosition, Box<dyn dirents_sink::Sealed>), Status> {
487        self.directory.read_dirents(pos, sink).await
488    }
489
490    fn register_watcher(
491        self: Arc<Self>,
492        scope: ExecutionScope,
493        mask: WatchMask,
494        watcher: DirectoryWatcher,
495    ) -> Result<(), Status> {
496        self.directory.clone().register_watcher(scope, mask, watcher)
497    }
498
499    fn unregister_watcher(self: Arc<Self>, key: usize) {
500        self.directory.clone().unregister_watcher(key)
501    }
502}
503
504impl From<object_store::Directory<FxVolume>> for BlobDirectory {
505    fn from(dir: object_store::Directory<FxVolume>) -> Self {
506        Self::new(dir.into())
507    }
508}
509
510#[cfg(test)]
511mod tests {
512    use super::*;
513    use crate::fuchsia::fxblob::testing::{BlobFixture, new_blob_fixture, open_blob_fixture};
514    use assert_matches::assert_matches;
515    use blob_writer::BlobWriter;
516    use delivery_blob::{CompressionMode, Type1Blob, delivery_blob_path};
517    use fidl_fuchsia_fxfs::BlobReaderMarker;
518    use fuchsia_async::{self as fasync, DurationExt as _, TimeoutExt as _};
519    use fuchsia_component_client::connect_to_protocol_at_dir_svc;
520    use fuchsia_fs::directory::{
521        DirEntry, DirentKind, WatchEvent, WatchMessage, Watcher, readdir_inclusive,
522    };
523
524    use futures::StreamExt as _;
525    use std::path::PathBuf;
526
527    #[fasync::run(10, test)]
528    async fn test_unlink() {
529        let fixture = new_blob_fixture().await;
530
531        let data = [1; 1000];
532
533        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
534
535        assert_eq!(fixture.read_blob(hash).await, data);
536
537        fixture
538            .root()
539            .unlink(&format!("{}", hash), &fio::UnlinkOptions::default())
540            .await
541            .expect("FIDL failed")
542            .expect("unlink failed");
543
544        fixture.close().await;
545    }
546
547    #[fasync::run(10, test)]
548    async fn test_readdir() {
549        let fixture = new_blob_fixture().await;
550
551        let data = [0xab; 2];
552        let hash;
553        {
554            hash = fuchsia_merkle::root_from_slice(&data);
555            let compressed_data: Vec<u8> = Type1Blob::generate(&data, CompressionMode::Always);
556
557            let blob_proxy =
558                connect_to_protocol_at_dir_svc::<fidl_fuchsia_fxfs::BlobCreatorMarker>(
559                    fixture.volume_out_dir(),
560                )
561                .expect("failed to connect to the Blob service");
562
563            let blob_writer_client_end = blob_proxy
564                .create(&hash.into(), false)
565                .await
566                .expect("transport error on create")
567                .expect("failed to create blob");
568
569            let writer = blob_writer_client_end.into_proxy();
570            let mut blob_writer = BlobWriter::create(writer, compressed_data.len() as u64)
571                .await
572                .expect("failed to create BlobWriter");
573            blob_writer.write(&compressed_data[..1]).await.unwrap();
574
575            // Before the blob is finished writing, it shouldn't appear in the directory.
576            assert_eq!(
577                readdir_inclusive(fixture.root()).await.ok(),
578                Some(vec![DirEntry { name: ".".to_string(), kind: DirentKind::Directory }])
579            );
580
581            blob_writer.write(&compressed_data[1..]).await.unwrap();
582        }
583
584        assert_eq!(
585            readdir_inclusive(fixture.root()).await.ok(),
586            Some(vec![
587                DirEntry { name: ".".to_string(), kind: DirentKind::Directory },
588                DirEntry { name: format! {"{}", hash}, kind: DirentKind::File },
589            ])
590        );
591
592        fixture.close().await;
593    }
594
595    #[fasync::run(10, test)]
596    async fn test_watchers() {
597        let fixture = new_blob_fixture().await;
598
599        let mut watcher = Watcher::new(fixture.root()).await.unwrap();
600        assert_eq!(
601            watcher.next().await,
602            Some(Ok(WatchMessage { event: WatchEvent::EXISTING, filename: PathBuf::from(".") }))
603        );
604        assert_matches!(
605            watcher.next().await,
606            Some(Ok(WatchMessage { event: WatchEvent::IDLE, .. }))
607        );
608
609        let data = vec![vec![0xab; 2], vec![0xcd; 65_536]];
610        let mut hashes = vec![];
611        let mut filenames = vec![];
612        for datum in data {
613            let hash = fuchsia_merkle::root_from_slice(&datum);
614            let filename = PathBuf::from(format!("{}", hash));
615            hashes.push(hash.clone());
616            filenames.push(filename.clone());
617
618            let compressed_data: Vec<u8> = Type1Blob::generate(&datum, CompressionMode::Always);
619
620            let blob_proxy =
621                connect_to_protocol_at_dir_svc::<fidl_fuchsia_fxfs::BlobCreatorMarker>(
622                    fixture.volume_out_dir(),
623                )
624                .expect("failed to connect to the Blob service");
625
626            let blob_writer_client_end = blob_proxy
627                .create(&hash.into(), false)
628                .await
629                .expect("transport error on create")
630                .expect("failed to create blob");
631
632            let writer = blob_writer_client_end.into_proxy();
633            let mut blob_writer = BlobWriter::create(writer, compressed_data.len() as u64)
634                .await
635                .expect("failed to create BlobWriter");
636            blob_writer.write(&compressed_data[..compressed_data.len() - 1]).await.unwrap();
637
638            // Before the blob is finished writing, we shouldn't see any watch events for it.
639            assert_matches!(
640                watcher
641                    .next()
642                    .on_timeout(zx::MonotonicDuration::from_millis(500).after_now(), || None)
643                    .await,
644                None
645            );
646
647            blob_writer.write(&compressed_data[compressed_data.len() - 1..]).await.unwrap();
648
649            assert_eq!(
650                watcher.next().await,
651                Some(Ok(WatchMessage { event: WatchEvent::ADD_FILE, filename }))
652            );
653        }
654
655        for (hash, filename) in hashes.iter().zip(filenames) {
656            fixture
657                .root()
658                .unlink(&format!("{}", hash), &fio::UnlinkOptions::default())
659                .await
660                .expect("FIDL call failed")
661                .expect("unlink failed");
662            assert_eq!(
663                watcher.next().await,
664                Some(Ok(WatchMessage { event: WatchEvent::REMOVE_FILE, filename }))
665            );
666        }
667
668        std::mem::drop(watcher);
669        fixture.close().await;
670    }
671
672    #[fasync::run(10, test)]
673    async fn test_rename_fails() {
674        let fixture = new_blob_fixture().await;
675
676        let data = vec![];
677        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
678
679        let (status, token) = fixture.root().get_token().await.expect("FIDL failed");
680        Status::ok(status).unwrap();
681        fixture
682            .root()
683            .rename(&format!("{}", delivery_blob_path(hash)), token.unwrap().into(), "foo")
684            .await
685            .expect("FIDL failed")
686            .expect_err("rename should fail");
687
688        fixture.close().await;
689    }
690
691    #[fasync::run(10, test)]
692    async fn test_link_fails() {
693        let fixture = new_blob_fixture().await;
694
695        let data = vec![];
696        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
697
698        let (status, token) = fixture.root().get_token().await.expect("FIDL failed");
699        Status::ok(status).unwrap();
700        let status = fixture
701            .root()
702            .link(&format!("{}", hash), token.unwrap().into(), "foo")
703            .await
704            .expect("FIDL failed");
705        assert_eq!(Status::from_raw(status), Status::NOT_SUPPORTED);
706
707        fixture.close().await;
708    }
709
710    #[fasync::run(10, test)]
711    async fn test_verify_cached_hash_node() {
712        let fixture = new_blob_fixture().await;
713
714        let data = vec![];
715        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
716        let evil_hash =
717            Hash::from_str("2222222222222222222222222222222222222222222222222222222222222222")
718                .unwrap();
719
720        // Create a malicious link to the existing blob. This shouldn't be possible without special
721        // access either via internal apis or modifying the disk image.
722        {
723            let root = fixture
724                .volume()
725                .root()
726                .clone()
727                .as_node()
728                .into_any()
729                .downcast::<BlobDirectory>()
730                .unwrap()
731                .directory()
732                .clone();
733            root.clone()
734                .link(evil_hash.to_string(), root, &hash.to_string())
735                .await
736                .expect("Linking file");
737        }
738        let device = fixture.close().await;
739
740        let fixture = open_blob_fixture(device).await;
741        {
742            // Hold open a ref to keep it in the node cache.
743            let _vmo = fixture.get_blob_vmo(hash).await;
744
745            // Open the malicious link
746            let blob_reader =
747                connect_to_protocol_at_dir_svc::<BlobReaderMarker>(fixture.volume_out_dir())
748                    .expect("failed to connect to the BlobReader service");
749            blob_reader
750                .get_vmo(&evil_hash.into())
751                .await
752                .expect("transport error on BlobReader.GetVmo")
753                .expect_err("Hashes should mismatch");
754        }
755        fixture.close().await;
756    }
757
758    #[fasync::run(10, test)]
759    async fn test_blob_needs_overwrite_verifies_existence() {
760        let fixture = new_blob_fixture().await;
761        let data = vec![42u8; 32];
762        let hash = fixture.write_blob(&data, CompressionMode::Never).await;
763
764        let blob_creator =
765            connect_to_protocol_at_dir_svc::<BlobCreatorMarker>(fixture.volume_out_dir())
766                .expect("failed to connect to the BlobCreator service");
767        // Do it once to fetch it fresh after clearing the cache.
768        fixture.volume().volume().dirent_cache().clear();
769        assert!(
770            !blob_creator
771                .needs_overwrite(&hash.into())
772                .await
773                .expect("fidl transport")
774                .expect("Find new blob")
775        );
776
777        // Get it into the cache and then check again.
778        let blob_reader =
779            connect_to_protocol_at_dir_svc::<BlobReaderMarker>(fixture.volume_out_dir())
780                .expect("failed to connect to the BlobReader service");
781        blob_reader
782            .get_vmo(&hash.into())
783            .await
784            .expect("transport error on BlobReader.GetVmo")
785            .expect("Opening blob");
786        assert!(
787            !blob_creator
788                .needs_overwrite(&hash.into())
789                .await
790                .expect("fidl transport")
791                .expect("Find new blob")
792        );
793
794        // Fail to find one that is missing.
795        blob_creator
796            .needs_overwrite(&[1u8; 32].into())
797            .await
798            .expect("fidl transport")
799            .expect_err("Blob should not exist");
800        fixture.close().await;
801    }
802}