1use crate::fuchsia::component::map_to_raw_status;
9use crate::fuchsia::directory::FxDirectory;
10use crate::fuchsia::fxblob::blob::{CompressionInfo, FxBlob};
11use crate::fuchsia::fxblob::writer::DeliveryBlobWriter;
12use crate::fuchsia::node::{FxNode, GetResult, OpenedNode};
13use crate::fuchsia::volume::{FxVolume, RootDir};
14use anyhow::{Context as _, Error, anyhow, ensure};
15use delivery_blob::compression::CompressionAlgorithm;
16use fidl::endpoints::{ClientEnd, DiscoverableProtocolMarker, ServerEnd, create_request_stream};
17use fidl_fuchsia_fxfs::{
18 BlobCreatorMarker, BlobCreatorRequest, BlobCreatorRequestStream, BlobReaderMarker,
19 BlobReaderRequest, BlobReaderRequestStream, BlobWriterMarker, CreateBlobError,
20};
21use fidl_fuchsia_io::{self as fio, FilesystemInfo, NodeMarker, WatchMask};
22use fuchsia_hash::Hash;
23use futures::TryStreamExt;
24use fxfs::blob_metadata::{BlobFormat, BlobMetadata};
25use fxfs::errors::FxfsError;
26use fxfs::object_handle::ReadObjectHandle;
27use fxfs::object_store::transaction::{LockKey, lock_keys};
28use fxfs::object_store::{self, HandleOptions, ObjectDescriptor, ObjectStore};
29use fxfs_macros::ToWeakNode;
30use std::str::FromStr;
31use std::sync::Arc;
32use vfs::directory::dirents_sink;
33use vfs::directory::entry::{DirectoryEntry, EntryInfo, GetEntryInfo, OpenRequest};
34use vfs::directory::entry_container::{
35 Directory as VfsDirectory, DirectoryWatcher, MutableDirectory,
36};
37use vfs::directory::helper::DirectlyMutable;
38use vfs::directory::mutable::connection::MutableConnection;
39use vfs::directory::simple::Simple;
40use vfs::directory::traversal_position::TraversalPosition;
41use vfs::execution_scope::ExecutionScope;
42use vfs::path::Path;
43use vfs::{ObjectRequestRef, ProtocolsExt, ToObjectRequest};
44use zx::Status;
45
46#[derive(ToWeakNode)]
51pub struct BlobDirectory {
52 directory: Arc<FxDirectory>,
53}
54
55pub(crate) struct Identifier {
58 pub string: String,
59 pub hash: Hash,
60}
61
62impl TryFrom<&str> for Identifier {
63 type Error = FxfsError;
64 fn try_from(value: &str) -> Result<Self, Self::Error> {
65 Ok(Self {
66 string: value.to_owned(),
67 hash: Hash::from_str(value).map_err(|_| FxfsError::InvalidArgs)?,
68 })
69 }
70}
71
72impl From<Hash> for Identifier {
73 fn from(hash: Hash) -> Self {
74 Self { string: hash.to_string(), hash }
75 }
76}
77
78impl RootDir for BlobDirectory {
79 fn as_directory_entry(self: Arc<Self>) -> Arc<dyn DirectoryEntry> {
80 self
81 }
82
83 fn serve(self: Arc<Self>, flags: fio::Flags, server_end: ServerEnd<fio::DirectoryMarker>) {
84 let scope = self.volume().scope().clone();
85 vfs::directory::serve_on(self, flags, scope, server_end);
86 }
87
88 fn as_node(self: Arc<Self>) -> Arc<dyn FxNode> {
89 self as Arc<dyn FxNode>
90 }
91
92 fn register_additional_volume_services(self: Arc<Self>, svc_dir: &Simple) -> Result<(), Error> {
93 let this = self.clone();
94 svc_dir.add_entry(
95 BlobCreatorMarker::PROTOCOL_NAME,
96 vfs::service::host(move |r| this.clone().handle_blob_creator_requests(r)),
97 )?;
98
99 svc_dir.add_entry(
100 BlobReaderMarker::PROTOCOL_NAME,
101 vfs::service::host(move |r| self.clone().handle_blob_reader_requests(r)),
102 )?;
103
104 Ok(())
105 }
106}
107
108impl BlobDirectory {
109 fn new(directory: FxDirectory) -> Self {
110 Self { directory: Arc::new(directory) }
111 }
112
113 pub fn directory(&self) -> &Arc<FxDirectory> {
114 &self.directory
115 }
116
117 pub fn volume(&self) -> &Arc<FxVolume> {
118 self.directory.volume()
119 }
120
121 fn store(&self) -> &ObjectStore {
122 self.directory.store()
123 }
124
125 pub(crate) async fn open_blob_get_vmo(
128 self: &Arc<Self>,
129 id: &Identifier,
130 ) -> Result<(Arc<FxBlob>, zx::Vmo), Error> {
131 let store = self.store();
132 let fs = store.filesystem();
133 let keys = lock_keys![LockKey::object(store.store_object_id(), self.directory.object_id())];
134 let _guard = fs.lock_manager().read_lock(keys.clone()).await;
136 let blob = self.open_blob_locked(id).await?.ok_or(FxfsError::NotFound)?;
137 let vmo = blob.create_child_vmo()?;
138 Ok((blob.clone(), vmo))
140 }
141
142 pub(crate) async fn open_blob(
144 self: &Arc<Self>,
145 id: &Identifier,
146 ) -> Result<Option<OpenedNode<FxBlob>>, Error> {
147 let store = self.store();
148 let fs = store.filesystem();
149 let keys = lock_keys![LockKey::object(store.store_object_id(), self.directory.object_id())];
150 let _guard = fs.lock_manager().read_lock(keys.clone()).await;
152 self.open_blob_locked(id).await
153 }
154
155 async fn open_blob_locked(
159 self: &Arc<Self>,
160 id: &Identifier,
161 ) -> Result<Option<OpenedNode<FxBlob>>, Error> {
162 let node = match self
163 .directory
164 .directory()
165 .owner()
166 .dirent_cache()
167 .lookup(&(self.directory.object_id(), &id.string))
168 {
169 Some(node) => Some(node),
170 None => {
171 if let Some((object_id, _, _)) =
172 self.directory.directory().lookup(&id.string).await?
173 {
174 let node = self.get_or_load_node(object_id, &id).await?;
175 self.directory.directory().owner().dirent_cache().insert(
176 self.directory.object_id(),
177 id.string.clone(),
178 node.clone(),
179 );
180 Some(node)
181 } else {
182 None
183 }
184 }
185 };
186 let Some(node) = node else {
187 return Ok(None);
188 };
189 if node.object_descriptor() != ObjectDescriptor::File {
190 return Err(FxfsError::Inconsistent)
191 .with_context(|| format!("Blob {} has invalid object descriptor!", id.string));
192 }
193 node.into_any()
194 .downcast::<FxBlob>()
195 .map(|node| Some(OpenedNode::new(node)))
196 .map_err(|_| FxfsError::Inconsistent)
197 .with_context(|| format!("Blob {} has incorrect node type!", id.string))
198 }
199
200 async fn get_or_load_node(
204 self: &Arc<Self>,
205 object_id: u64,
206 id: &Identifier,
207 ) -> Result<Arc<dyn FxNode>, Error> {
208 let volume = self.volume();
209 match volume.cache().get_or_reserve(object_id).await {
210 GetResult::Node(node) => {
211 let blob = node.into_any().downcast::<FxBlob>().map_err(|_| {
215 anyhow!(FxfsError::Inconsistent).context("Loaded non-blob from cache")
216 })?;
217 ensure!(
218 blob.root() == id.hash,
219 anyhow!(FxfsError::Inconsistent)
220 .context("Loaded blob by node that did not match the given hash")
221 );
222 Ok(blob as Arc<dyn FxNode>)
223 }
224 GetResult::Placeholder(placeholder) => {
225 let object =
226 ObjectStore::open_object(volume, object_id, HandleOptions::default(), None)
227 .await?;
228 let metadata = BlobMetadata::read_from(&object).await?;
229 let (uncompressed_size, compression_info) = match &metadata.format {
230 BlobFormat::Uncompressed => (object.get_size(), None),
231 BlobFormat::ChunkedZstd {
232 uncompressed_size,
233 chunk_size,
234 compressed_offsets,
235 } => (
236 *uncompressed_size,
237 Some(CompressionInfo::new(
238 *chunk_size,
239 compressed_offsets,
240 CompressionAlgorithm::Zstd,
241 )?),
242 ),
243 BlobFormat::ChunkedLz4 {
244 uncompressed_size,
245 chunk_size,
246 compressed_offsets,
247 } => (
248 *uncompressed_size,
249 Some(CompressionInfo::new(
250 *chunk_size,
251 compressed_offsets,
252 CompressionAlgorithm::Lz4,
253 )?),
254 ),
255 };
256 let merkle_verifier = metadata.into_merkle_verifier(id.hash)?;
257
258 let node = FxBlob::new(
259 object,
260 id.hash,
261 merkle_verifier,
262 compression_info,
263 uncompressed_size,
264 )? as Arc<dyn FxNode>;
265 placeholder.commit(&node);
266 Ok(node)
267 }
268 }
269 }
270
271 async fn create_blob_writer(
275 self: &Arc<Self>,
276 hash: Hash,
277 allow_existing: bool,
278 ) -> Result<ClientEnd<BlobWriterMarker>, CreateBlobError> {
279 let id = hash.into();
280 let blob_exists = self
281 .open_blob(&id)
282 .await
283 .map_err(|e| {
284 log::error!("Failed to lookup blob: {:?}", e);
285 CreateBlobError::Internal
286 })?
287 .is_some();
288 if blob_exists && !allow_existing {
289 return Err(CreateBlobError::AlreadyExists);
290 }
291 let (client_end, request_stream) = create_request_stream::<BlobWriterMarker>();
292 let writer = DeliveryBlobWriter::new(self, hash).await.map_err(|e| {
293 log::error!("Failed to create blob writer: {:?}", e);
294 CreateBlobError::Internal
295 })?;
296 self.volume().scope().spawn(async move {
297 if let Err(e) = writer.handle_requests(request_stream).await {
298 log::error!("Failed to handle BlobWriter requests: {}", e);
299 }
300 });
301 return Ok(client_end);
302 }
303
304 async fn needs_overwrite(&self, blob_hash: Identifier) -> Result<bool, Error> {
305 if self.volume().dirent_cache().lookup(&(self.object_id(), &blob_hash.string)).is_some() {
309 return Ok(false);
310 }
311 match self.directory.directory().lookup(&blob_hash.string).await? {
312 Some(_) => Ok(false),
313 None => Err(FxfsError::NotFound.into()),
314 }
315 }
316
317 async fn handle_blob_creator_requests(self: Arc<Self>, mut requests: BlobCreatorRequestStream) {
318 while let Ok(Some(request)) = requests.try_next().await {
319 match request {
320 BlobCreatorRequest::Create { responder, hash, allow_existing } => {
321 responder
322 .send(self.create_blob_writer(Hash::from(hash), allow_existing).await)
323 .unwrap_or_else(|error| {
324 log::error!(error:?; "failed to send Create response");
325 });
326 }
327 BlobCreatorRequest::NeedsOverwrite { blob_hash, responder } => {
328 let _ = responder.send(
329 self.needs_overwrite(Hash::from(blob_hash).into())
330 .await
331 .map_err(map_to_raw_status),
332 );
333 }
334 }
335 }
336 }
337
338 async fn handle_blob_reader_requests(self: Arc<Self>, mut requests: BlobReaderRequestStream) {
339 while let Ok(Some(request)) = requests.try_next().await {
340 match request {
341 BlobReaderRequest::GetVmo { blob_hash, responder } => {
342 responder
343 .send(self.get_blob_vmo(blob_hash.into()).await.map_err(map_to_raw_status))
344 .unwrap_or_else(|error| {
345 log::error!(error:?; "failed to send GetVmo response");
346 });
347 }
348 };
349 }
350 }
351
352 async fn open_impl(
353 self: Arc<Self>,
354 scope: ExecutionScope,
355 path: Path,
356 flags: impl ProtocolsExt,
357 object_request: ObjectRequestRef<'_>,
358 ) -> Result<(), Status> {
359 if path.is_empty() {
360 object_request
361 .create_connection::<MutableConnection<_>, _>(
362 scope,
363 OpenedNode::new(self).take(),
364 flags,
365 )
366 .await
367 } else {
368 Err(Status::NOT_SUPPORTED)
369 }
370 }
371}
372
373impl FxNode for BlobDirectory {
374 fn object_id(&self) -> u64 {
375 self.directory.object_id()
376 }
377
378 fn parent(&self) -> Option<Arc<FxDirectory>> {
379 self.directory.parent()
380 }
381
382 fn set_parent(&self, _parent: Arc<FxDirectory>) {
383 unreachable!();
385 }
386
387 fn open_count_add_one(&self) {}
388 fn open_count_sub_one(self: Arc<Self>) {}
389
390 fn object_descriptor(&self) -> ObjectDescriptor {
391 ObjectDescriptor::Directory
392 }
393}
394
395impl MutableDirectory for BlobDirectory {
396 async fn unlink(self: Arc<Self>, name: &str, must_be_directory: bool) -> Result<(), Status> {
397 if must_be_directory {
398 return Err(Status::INVALID_ARGS);
399 }
400 self.directory.clone().unlink(name, must_be_directory).await
401 }
402
403 async fn update_attributes(
404 &self,
405 attributes: fio::MutableNodeAttributes,
406 ) -> Result<(), Status> {
407 self.directory.update_attributes(attributes).await
408 }
409
410 async fn sync(&self) -> Result<(), Status> {
411 self.directory.sync().await
412 }
413}
414
415impl DirectoryEntry for BlobDirectory {
417 fn open_entry(self: Arc<Self>, request: OpenRequest<'_>) -> Result<(), Status> {
418 request.open_dir(self)
419 }
420
421 fn scope(&self) -> Option<ExecutionScope> {
422 Some(self.volume().scope().clone())
423 }
424}
425
426impl GetEntryInfo for BlobDirectory {
427 fn entry_info(&self) -> EntryInfo {
428 self.directory.entry_info()
429 }
430}
431
432impl vfs::node::Node for BlobDirectory {
433 async fn get_attributes(
434 &self,
435 requested_attributes: fio::NodeAttributesQuery,
436 ) -> Result<fio::NodeAttributes2, Status> {
437 self.directory.get_attributes(requested_attributes).await
438 }
439
440 fn query_filesystem(&self) -> Result<FilesystemInfo, Status> {
441 self.directory.query_filesystem()
442 }
443}
444
445impl VfsDirectory for BlobDirectory {
447 fn deprecated_open(
448 self: Arc<Self>,
449 scope: ExecutionScope,
450 flags: fio::OpenFlags,
451 path: Path,
452 server_end: ServerEnd<NodeMarker>,
453 ) {
454 scope.clone().spawn(flags.to_object_request(server_end).handle_async(
455 async move |object_request| self.open_impl(scope, path, flags, object_request).await,
456 ));
457 }
458
459 fn open(
460 self: Arc<Self>,
461 scope: ExecutionScope,
462 path: Path,
463 flags: fio::Flags,
464 object_request: ObjectRequestRef<'_>,
465 ) -> Result<(), Status> {
466 scope.clone().spawn(object_request.take().handle_async(async move |object_request| {
467 self.open_impl(scope, path, flags, object_request).await
468 }));
469 Ok(())
470 }
471
472 async fn open_async(
473 self: Arc<Self>,
474 scope: ExecutionScope,
475 path: Path,
476 flags: fio::Flags,
477 object_request: ObjectRequestRef<'_>,
478 ) -> Result<(), Status> {
479 self.open_impl(scope, path, flags, object_request).await
480 }
481
482 async fn read_dirents(
483 &self,
484 pos: &TraversalPosition,
485 sink: Box<dyn dirents_sink::Sink>,
486 ) -> Result<(TraversalPosition, Box<dyn dirents_sink::Sealed>), Status> {
487 self.directory.read_dirents(pos, sink).await
488 }
489
490 fn register_watcher(
491 self: Arc<Self>,
492 scope: ExecutionScope,
493 mask: WatchMask,
494 watcher: DirectoryWatcher,
495 ) -> Result<(), Status> {
496 self.directory.clone().register_watcher(scope, mask, watcher)
497 }
498
499 fn unregister_watcher(self: Arc<Self>, key: usize) {
500 self.directory.clone().unregister_watcher(key)
501 }
502}
503
504impl From<object_store::Directory<FxVolume>> for BlobDirectory {
505 fn from(dir: object_store::Directory<FxVolume>) -> Self {
506 Self::new(dir.into())
507 }
508}
509
510#[cfg(test)]
511mod tests {
512 use super::*;
513 use crate::fuchsia::fxblob::testing::{BlobFixture, new_blob_fixture, open_blob_fixture};
514 use assert_matches::assert_matches;
515 use blob_writer::BlobWriter;
516 use delivery_blob::{CompressionMode, Type1Blob, delivery_blob_path};
517 use fidl_fuchsia_fxfs::BlobReaderMarker;
518 use fuchsia_async::{self as fasync, DurationExt as _, TimeoutExt as _};
519 use fuchsia_component_client::connect_to_protocol_at_dir_svc;
520 use fuchsia_fs::directory::{
521 DirEntry, DirentKind, WatchEvent, WatchMessage, Watcher, readdir_inclusive,
522 };
523
524 use futures::StreamExt as _;
525 use std::path::PathBuf;
526
527 #[fasync::run(10, test)]
528 async fn test_unlink() {
529 let fixture = new_blob_fixture().await;
530
531 let data = [1; 1000];
532
533 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
534
535 assert_eq!(fixture.read_blob(hash).await, data);
536
537 fixture
538 .root()
539 .unlink(&format!("{}", hash), &fio::UnlinkOptions::default())
540 .await
541 .expect("FIDL failed")
542 .expect("unlink failed");
543
544 fixture.close().await;
545 }
546
547 #[fasync::run(10, test)]
548 async fn test_readdir() {
549 let fixture = new_blob_fixture().await;
550
551 let data = [0xab; 2];
552 let hash;
553 {
554 hash = fuchsia_merkle::root_from_slice(&data);
555 let compressed_data: Vec<u8> = Type1Blob::generate(&data, CompressionMode::Always);
556
557 let blob_proxy =
558 connect_to_protocol_at_dir_svc::<fidl_fuchsia_fxfs::BlobCreatorMarker>(
559 fixture.volume_out_dir(),
560 )
561 .expect("failed to connect to the Blob service");
562
563 let blob_writer_client_end = blob_proxy
564 .create(&hash.into(), false)
565 .await
566 .expect("transport error on create")
567 .expect("failed to create blob");
568
569 let writer = blob_writer_client_end.into_proxy();
570 let mut blob_writer = BlobWriter::create(writer, compressed_data.len() as u64)
571 .await
572 .expect("failed to create BlobWriter");
573 blob_writer.write(&compressed_data[..1]).await.unwrap();
574
575 assert_eq!(
577 readdir_inclusive(fixture.root()).await.ok(),
578 Some(vec![DirEntry { name: ".".to_string(), kind: DirentKind::Directory }])
579 );
580
581 blob_writer.write(&compressed_data[1..]).await.unwrap();
582 }
583
584 assert_eq!(
585 readdir_inclusive(fixture.root()).await.ok(),
586 Some(vec![
587 DirEntry { name: ".".to_string(), kind: DirentKind::Directory },
588 DirEntry { name: format! {"{}", hash}, kind: DirentKind::File },
589 ])
590 );
591
592 fixture.close().await;
593 }
594
595 #[fasync::run(10, test)]
596 async fn test_watchers() {
597 let fixture = new_blob_fixture().await;
598
599 let mut watcher = Watcher::new(fixture.root()).await.unwrap();
600 assert_eq!(
601 watcher.next().await,
602 Some(Ok(WatchMessage { event: WatchEvent::EXISTING, filename: PathBuf::from(".") }))
603 );
604 assert_matches!(
605 watcher.next().await,
606 Some(Ok(WatchMessage { event: WatchEvent::IDLE, .. }))
607 );
608
609 let data = vec![vec![0xab; 2], vec![0xcd; 65_536]];
610 let mut hashes = vec![];
611 let mut filenames = vec![];
612 for datum in data {
613 let hash = fuchsia_merkle::root_from_slice(&datum);
614 let filename = PathBuf::from(format!("{}", hash));
615 hashes.push(hash.clone());
616 filenames.push(filename.clone());
617
618 let compressed_data: Vec<u8> = Type1Blob::generate(&datum, CompressionMode::Always);
619
620 let blob_proxy =
621 connect_to_protocol_at_dir_svc::<fidl_fuchsia_fxfs::BlobCreatorMarker>(
622 fixture.volume_out_dir(),
623 )
624 .expect("failed to connect to the Blob service");
625
626 let blob_writer_client_end = blob_proxy
627 .create(&hash.into(), false)
628 .await
629 .expect("transport error on create")
630 .expect("failed to create blob");
631
632 let writer = blob_writer_client_end.into_proxy();
633 let mut blob_writer = BlobWriter::create(writer, compressed_data.len() as u64)
634 .await
635 .expect("failed to create BlobWriter");
636 blob_writer.write(&compressed_data[..compressed_data.len() - 1]).await.unwrap();
637
638 assert_matches!(
640 watcher
641 .next()
642 .on_timeout(zx::MonotonicDuration::from_millis(500).after_now(), || None)
643 .await,
644 None
645 );
646
647 blob_writer.write(&compressed_data[compressed_data.len() - 1..]).await.unwrap();
648
649 assert_eq!(
650 watcher.next().await,
651 Some(Ok(WatchMessage { event: WatchEvent::ADD_FILE, filename }))
652 );
653 }
654
655 for (hash, filename) in hashes.iter().zip(filenames) {
656 fixture
657 .root()
658 .unlink(&format!("{}", hash), &fio::UnlinkOptions::default())
659 .await
660 .expect("FIDL call failed")
661 .expect("unlink failed");
662 assert_eq!(
663 watcher.next().await,
664 Some(Ok(WatchMessage { event: WatchEvent::REMOVE_FILE, filename }))
665 );
666 }
667
668 std::mem::drop(watcher);
669 fixture.close().await;
670 }
671
672 #[fasync::run(10, test)]
673 async fn test_rename_fails() {
674 let fixture = new_blob_fixture().await;
675
676 let data = vec![];
677 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
678
679 let (status, token) = fixture.root().get_token().await.expect("FIDL failed");
680 Status::ok(status).unwrap();
681 fixture
682 .root()
683 .rename(&format!("{}", delivery_blob_path(hash)), token.unwrap().into(), "foo")
684 .await
685 .expect("FIDL failed")
686 .expect_err("rename should fail");
687
688 fixture.close().await;
689 }
690
691 #[fasync::run(10, test)]
692 async fn test_link_fails() {
693 let fixture = new_blob_fixture().await;
694
695 let data = vec![];
696 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
697
698 let (status, token) = fixture.root().get_token().await.expect("FIDL failed");
699 Status::ok(status).unwrap();
700 let status = fixture
701 .root()
702 .link(&format!("{}", hash), token.unwrap().into(), "foo")
703 .await
704 .expect("FIDL failed");
705 assert_eq!(Status::from_raw(status), Status::NOT_SUPPORTED);
706
707 fixture.close().await;
708 }
709
710 #[fasync::run(10, test)]
711 async fn test_verify_cached_hash_node() {
712 let fixture = new_blob_fixture().await;
713
714 let data = vec![];
715 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
716 let evil_hash =
717 Hash::from_str("2222222222222222222222222222222222222222222222222222222222222222")
718 .unwrap();
719
720 {
723 let root = fixture
724 .volume()
725 .root()
726 .clone()
727 .as_node()
728 .into_any()
729 .downcast::<BlobDirectory>()
730 .unwrap()
731 .directory()
732 .clone();
733 root.clone()
734 .link(evil_hash.to_string(), root, &hash.to_string())
735 .await
736 .expect("Linking file");
737 }
738 let device = fixture.close().await;
739
740 let fixture = open_blob_fixture(device).await;
741 {
742 let _vmo = fixture.get_blob_vmo(hash).await;
744
745 let blob_reader =
747 connect_to_protocol_at_dir_svc::<BlobReaderMarker>(fixture.volume_out_dir())
748 .expect("failed to connect to the BlobReader service");
749 blob_reader
750 .get_vmo(&evil_hash.into())
751 .await
752 .expect("transport error on BlobReader.GetVmo")
753 .expect_err("Hashes should mismatch");
754 }
755 fixture.close().await;
756 }
757
758 #[fasync::run(10, test)]
759 async fn test_blob_needs_overwrite_verifies_existence() {
760 let fixture = new_blob_fixture().await;
761 let data = vec![42u8; 32];
762 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
763
764 let blob_creator =
765 connect_to_protocol_at_dir_svc::<BlobCreatorMarker>(fixture.volume_out_dir())
766 .expect("failed to connect to the BlobCreator service");
767 fixture.volume().volume().dirent_cache().clear();
769 assert!(
770 !blob_creator
771 .needs_overwrite(&hash.into())
772 .await
773 .expect("fidl transport")
774 .expect("Find new blob")
775 );
776
777 let blob_reader =
779 connect_to_protocol_at_dir_svc::<BlobReaderMarker>(fixture.volume_out_dir())
780 .expect("failed to connect to the BlobReader service");
781 blob_reader
782 .get_vmo(&hash.into())
783 .await
784 .expect("transport error on BlobReader.GetVmo")
785 .expect("Opening blob");
786 assert!(
787 !blob_creator
788 .needs_overwrite(&hash.into())
789 .await
790 .expect("fidl transport")
791 .expect("Find new blob")
792 );
793
794 blob_creator
796 .needs_overwrite(&[1u8; 32].into())
797 .await
798 .expect("fidl transport")
799 .expect_err("Blob should not exist");
800 fixture.close().await;
801 }
802}