1use crate::fuchsia::component::map_to_raw_status;
9use crate::fuchsia::directory::FxDirectory;
10use crate::fuchsia::fxblob::blob::{CompressionInfo, FxBlob};
11use crate::fuchsia::fxblob::writer::DeliveryBlobWriter;
12use crate::fuchsia::node::{FxNode, GetResult, OpenedNode};
13use crate::fuchsia::volume::{FxVolume, RootDir};
14use anyhow::{Context as _, Error, anyhow, ensure};
15use delivery_blob::compression::CompressionAlgorithm;
16use fidl::endpoints::{ClientEnd, DiscoverableProtocolMarker, ServerEnd, create_request_stream};
17use fidl_fuchsia_fxfs::{
18 BlobCreatorMarker, BlobCreatorRequest, BlobCreatorRequestStream, BlobReaderMarker,
19 BlobReaderRequest, BlobReaderRequestStream, BlobWriterMarker, CreateBlobError,
20};
21use fidl_fuchsia_io::{self as fio, FilesystemInfo, NodeMarker, WatchMask};
22use fuchsia_hash::Hash;
23use futures::TryStreamExt;
24use fxfs::blob_metadata::{BlobFormat, BlobMetadata};
25use fxfs::errors::FxfsError;
26use fxfs::object_handle::ReadObjectHandle;
27use fxfs::object_store::transaction::{LockKey, lock_keys};
28use fxfs::object_store::{self, HandleOptions, ObjectDescriptor, ObjectStore};
29use fxfs_macros::ToWeakNode;
30use fxfs_trace::{TraceFutureExt, trace_future_args};
31use std::str::FromStr;
32use std::sync::Arc;
33use vfs::directory::dirents_sink;
34use vfs::directory::entry::{DirectoryEntry, EntryInfo, GetEntryInfo, OpenRequest};
35use vfs::directory::entry_container::{
36 Directory as VfsDirectory, DirectoryWatcher, MutableDirectory,
37};
38use vfs::directory::helper::DirectlyMutable;
39use vfs::directory::mutable::connection::MutableConnection;
40use vfs::directory::simple::Simple;
41use vfs::directory::traversal_position::TraversalPosition;
42use vfs::execution_scope::ExecutionScope;
43use vfs::path::Path;
44use vfs::{ObjectRequestRef, ProtocolsExt, ToObjectRequest};
45use zx::Status;
46
47#[derive(ToWeakNode)]
52pub struct BlobDirectory {
53 directory: Arc<FxDirectory>,
54}
55
56pub(crate) struct Identifier {
59 pub string: String,
60 pub hash: Hash,
61}
62
63impl TryFrom<&str> for Identifier {
64 type Error = FxfsError;
65 fn try_from(value: &str) -> Result<Self, Self::Error> {
66 Ok(Self {
67 string: value.to_owned(),
68 hash: Hash::from_str(value).map_err(|_| FxfsError::InvalidArgs)?,
69 })
70 }
71}
72
73impl From<Hash> for Identifier {
74 fn from(hash: Hash) -> Self {
75 Self { string: hash.to_string(), hash }
76 }
77}
78
79impl RootDir for BlobDirectory {
80 fn as_directory_entry(self: Arc<Self>) -> Arc<dyn DirectoryEntry> {
81 self
82 }
83
84 fn serve(self: Arc<Self>, flags: fio::Flags, server_end: ServerEnd<fio::DirectoryMarker>) {
85 let scope = self.volume().scope().clone();
86 vfs::directory::serve_on(self, flags, scope, server_end);
87 }
88
89 fn as_node(self: Arc<Self>) -> Arc<dyn FxNode> {
90 self as Arc<dyn FxNode>
91 }
92
93 fn register_additional_volume_services(self: Arc<Self>, svc_dir: &Simple) -> Result<(), Error> {
94 let this = self.clone();
95 svc_dir.add_entry(
96 BlobCreatorMarker::PROTOCOL_NAME,
97 vfs::service::host(move |r| this.clone().handle_blob_creator_requests(r)),
98 )?;
99
100 svc_dir.add_entry(
101 BlobReaderMarker::PROTOCOL_NAME,
102 vfs::service::host(move |r| self.clone().handle_blob_reader_requests(r)),
103 )?;
104
105 Ok(())
106 }
107}
108
109impl BlobDirectory {
110 fn new(directory: FxDirectory) -> Self {
111 Self { directory: Arc::new(directory) }
112 }
113
114 pub fn directory(&self) -> &Arc<FxDirectory> {
115 &self.directory
116 }
117
118 pub fn volume(&self) -> &Arc<FxVolume> {
119 self.directory.volume()
120 }
121
122 fn store(&self) -> &ObjectStore {
123 self.directory.store()
124 }
125
126 pub(crate) async fn open_blob_get_vmo(
129 self: &Arc<Self>,
130 id: &Identifier,
131 ) -> Result<(Arc<FxBlob>, zx::Vmo), Error> {
132 let store = self.store();
133 let fs = store.filesystem();
134 let keys = lock_keys![LockKey::object(store.store_object_id(), self.directory.object_id())];
135 let _guard = fs.lock_manager().read_lock(keys.clone()).await;
137 let blob = self.open_blob_locked(id).await?.ok_or(FxfsError::NotFound)?;
138 let vmo = blob.create_child_vmo()?;
139 Ok((blob.clone(), vmo))
141 }
142
143 pub(crate) async fn open_blob(
145 self: &Arc<Self>,
146 id: &Identifier,
147 ) -> Result<Option<OpenedNode<FxBlob>>, Error> {
148 let store = self.store();
149 let fs = store.filesystem();
150 let keys = lock_keys![LockKey::object(store.store_object_id(), self.directory.object_id())];
151 let _guard = fs.lock_manager().read_lock(keys.clone()).await;
153 self.open_blob_locked(id).await
154 }
155
156 async fn open_blob_locked(
160 self: &Arc<Self>,
161 id: &Identifier,
162 ) -> Result<Option<OpenedNode<FxBlob>>, Error> {
163 let node = match self
164 .directory
165 .directory()
166 .owner()
167 .dirent_cache()
168 .lookup(&(self.directory.object_id(), &id.string))
169 {
170 Some(node) => Some(node),
171 None => {
172 if let Some((object_id, _, _)) =
173 self.directory.directory().lookup(&id.string).await?
174 {
175 let node = self.get_or_load_node(object_id, &id).await?;
176 self.directory.directory().owner().dirent_cache().insert(
177 self.directory.object_id(),
178 id.string.clone(),
179 node.clone(),
180 );
181 Some(node)
182 } else {
183 None
184 }
185 }
186 };
187 let Some(node) = node else {
188 return Ok(None);
189 };
190 if node.object_descriptor() != ObjectDescriptor::File {
191 return Err(FxfsError::Inconsistent)
192 .with_context(|| format!("Blob {} has invalid object descriptor!", id.string));
193 }
194 node.into_any()
195 .downcast::<FxBlob>()
196 .map(|node| Some(OpenedNode::new(node)))
197 .map_err(|_| FxfsError::Inconsistent)
198 .with_context(|| format!("Blob {} has incorrect node type!", id.string))
199 }
200
201 async fn get_or_load_node(
205 self: &Arc<Self>,
206 object_id: u64,
207 id: &Identifier,
208 ) -> Result<Arc<dyn FxNode>, Error> {
209 let volume = self.volume();
210 match volume.cache().get_or_reserve(object_id).await {
211 GetResult::Node(node) => {
212 let blob = node.into_any().downcast::<FxBlob>().map_err(|_| {
216 anyhow!(FxfsError::Inconsistent).context("Loaded non-blob from cache")
217 })?;
218 ensure!(
219 blob.root() == id.hash,
220 anyhow!(FxfsError::Inconsistent)
221 .context("Loaded blob by node that did not match the given hash")
222 );
223 Ok(blob as Arc<dyn FxNode>)
224 }
225 GetResult::Placeholder(placeholder) => {
226 let object =
227 ObjectStore::open_object(volume, object_id, HandleOptions::default(), None)
228 .await?;
229 let metadata = BlobMetadata::read_from(&object).await?;
230 let (uncompressed_size, compression_info) = match &metadata.format {
231 BlobFormat::Uncompressed => (object.get_size(), None),
232 BlobFormat::ChunkedZstd {
233 uncompressed_size,
234 chunk_size,
235 compressed_offsets,
236 } => (
237 *uncompressed_size,
238 Some(CompressionInfo::new(
239 *chunk_size,
240 compressed_offsets,
241 CompressionAlgorithm::Zstd,
242 )?),
243 ),
244 BlobFormat::ChunkedLz4 {
245 uncompressed_size,
246 chunk_size,
247 compressed_offsets,
248 } => (
249 *uncompressed_size,
250 Some(CompressionInfo::new(
251 *chunk_size,
252 compressed_offsets,
253 CompressionAlgorithm::Lz4,
254 )?),
255 ),
256 };
257 let merkle_verifier = metadata.into_merkle_verifier(id.hash)?;
258
259 let node = FxBlob::new(
260 object,
261 id.hash,
262 merkle_verifier,
263 compression_info,
264 uncompressed_size,
265 )? as Arc<dyn FxNode>;
266 placeholder.commit(&node);
267 Ok(node)
268 }
269 }
270 }
271
272 async fn create_blob_writer(
276 self: &Arc<Self>,
277 hash: Hash,
278 allow_existing: bool,
279 ) -> Result<ClientEnd<BlobWriterMarker>, CreateBlobError> {
280 let id = hash.into();
281 let blob_exists = self
282 .open_blob(&id)
283 .await
284 .map_err(|e| {
285 log::error!("Failed to lookup blob: {:?}", e);
286 CreateBlobError::Internal
287 })?
288 .is_some();
289 if blob_exists && !allow_existing {
290 return Err(CreateBlobError::AlreadyExists);
291 }
292 let (client_end, request_stream) = create_request_stream::<BlobWriterMarker>();
293 let writer = DeliveryBlobWriter::new(self, hash).await.map_err(|e| {
294 log::error!("Failed to create blob writer: {:?}", e);
295 CreateBlobError::Internal
296 })?;
297 self.volume().scope().spawn(async move {
298 if let Err(e) = writer.handle_requests(request_stream).await {
299 log::error!("Failed to handle BlobWriter requests: {}", e);
300 }
301 });
302 return Ok(client_end);
303 }
304
305 async fn needs_overwrite(&self, blob_hash: Identifier) -> Result<bool, Error> {
306 if self.volume().dirent_cache().lookup(&(self.object_id(), &blob_hash.string)).is_some() {
310 return Ok(false);
311 }
312 match self.directory.directory().lookup(&blob_hash.string).await? {
313 Some(_) => Ok(false),
314 None => Err(FxfsError::NotFound.into()),
315 }
316 }
317
318 async fn handle_blob_creator_requests(self: Arc<Self>, mut requests: BlobCreatorRequestStream) {
319 while let Ok(Some(request)) = requests.try_next().await {
320 match request {
321 BlobCreatorRequest::Create { responder, hash, allow_existing } => {
322 async {
323 responder
324 .send(self.create_blob_writer(Hash::from(hash), allow_existing).await)
325 .unwrap_or_else(|error| {
326 log::error!(error:?; "failed to send Create response");
327 });
328 }
329 .trace(trace_future_args!("BlobCreator::Create"))
330 .await;
331 }
332 BlobCreatorRequest::NeedsOverwrite { blob_hash, responder } => {
333 async {
334 let _ = responder.send(
335 self.needs_overwrite(Hash::from(blob_hash).into())
336 .await
337 .map_err(map_to_raw_status),
338 );
339 }
340 .trace(trace_future_args!("BlobCreator::NeedsOverwrite"))
341 .await;
342 }
343 }
344 }
345 }
346
347 async fn handle_blob_reader_requests(self: Arc<Self>, mut requests: BlobReaderRequestStream) {
348 while let Ok(Some(request)) = requests.try_next().await {
349 match request {
350 BlobReaderRequest::GetVmo { blob_hash, responder } => {
351 async {
352 responder
353 .send(
354 self.get_blob_vmo(blob_hash.into())
355 .await
356 .map_err(map_to_raw_status),
357 )
358 .unwrap_or_else(|error| {
359 log::error!(error:?; "failed to send GetVmo response");
360 });
361 }
362 .trace(trace_future_args!("BlobReader::GetVmo"))
363 .await;
364 }
365 };
366 }
367 }
368
369 async fn open_impl(
370 self: Arc<Self>,
371 scope: ExecutionScope,
372 path: Path,
373 flags: impl ProtocolsExt,
374 object_request: ObjectRequestRef<'_>,
375 ) -> Result<(), Status> {
376 if path.is_empty() {
377 object_request
378 .create_connection::<MutableConnection<_>, _>(
379 scope,
380 OpenedNode::new(self).take(),
381 flags,
382 )
383 .await
384 } else {
385 Err(Status::NOT_SUPPORTED)
386 }
387 }
388}
389
390impl FxNode for BlobDirectory {
391 fn object_id(&self) -> u64 {
392 self.directory.object_id()
393 }
394
395 fn parent(&self) -> Option<Arc<FxDirectory>> {
396 self.directory.parent()
397 }
398
399 fn set_parent(&self, _parent: Arc<FxDirectory>) {
400 unreachable!();
402 }
403
404 fn open_count_add_one(&self) {}
405 fn open_count_sub_one(self: Arc<Self>) {}
406
407 fn object_descriptor(&self) -> ObjectDescriptor {
408 ObjectDescriptor::Directory
409 }
410}
411
412impl MutableDirectory for BlobDirectory {
413 async fn unlink(self: Arc<Self>, name: &str, must_be_directory: bool) -> Result<(), Status> {
414 if must_be_directory {
415 return Err(Status::INVALID_ARGS);
416 }
417 self.directory.clone().unlink(name, must_be_directory).await
418 }
419
420 async fn update_attributes(
421 &self,
422 attributes: fio::MutableNodeAttributes,
423 ) -> Result<(), Status> {
424 self.directory.update_attributes(attributes).await
425 }
426
427 async fn sync(&self) -> Result<(), Status> {
428 self.directory.sync().await
429 }
430}
431
432impl DirectoryEntry for BlobDirectory {
434 fn open_entry(self: Arc<Self>, request: OpenRequest<'_>) -> Result<(), Status> {
435 request.open_dir(self)
436 }
437
438 fn scope(&self) -> Option<ExecutionScope> {
439 Some(self.volume().scope().clone())
440 }
441}
442
443impl GetEntryInfo for BlobDirectory {
444 fn entry_info(&self) -> EntryInfo {
445 self.directory.entry_info()
446 }
447}
448
449impl vfs::node::Node for BlobDirectory {
450 async fn get_attributes(
451 &self,
452 requested_attributes: fio::NodeAttributesQuery,
453 ) -> Result<fio::NodeAttributes2, Status> {
454 self.directory.get_attributes(requested_attributes).await
455 }
456
457 fn query_filesystem(&self) -> Result<FilesystemInfo, Status> {
458 self.directory.query_filesystem()
459 }
460}
461
462impl VfsDirectory for BlobDirectory {
464 fn deprecated_open(
465 self: Arc<Self>,
466 scope: ExecutionScope,
467 flags: fio::OpenFlags,
468 path: Path,
469 server_end: ServerEnd<NodeMarker>,
470 ) {
471 scope.clone().spawn(flags.to_object_request(server_end).handle_async(
472 async move |object_request| self.open_impl(scope, path, flags, object_request).await,
473 ));
474 }
475
476 fn open(
477 self: Arc<Self>,
478 scope: ExecutionScope,
479 path: Path,
480 flags: fio::Flags,
481 object_request: ObjectRequestRef<'_>,
482 ) -> Result<(), Status> {
483 scope.clone().spawn(object_request.take().handle_async(async move |object_request| {
484 self.open_impl(scope, path, flags, object_request).await
485 }));
486 Ok(())
487 }
488
489 async fn open_async(
490 self: Arc<Self>,
491 scope: ExecutionScope,
492 path: Path,
493 flags: fio::Flags,
494 object_request: ObjectRequestRef<'_>,
495 ) -> Result<(), Status> {
496 self.open_impl(scope, path, flags, object_request).await
497 }
498
499 async fn read_dirents(
500 &self,
501 pos: &TraversalPosition,
502 sink: Box<dyn dirents_sink::Sink>,
503 ) -> Result<(TraversalPosition, Box<dyn dirents_sink::Sealed>), Status> {
504 self.directory.read_dirents(pos, sink).await
505 }
506
507 fn register_watcher(
508 self: Arc<Self>,
509 scope: ExecutionScope,
510 mask: WatchMask,
511 watcher: DirectoryWatcher,
512 ) -> Result<(), Status> {
513 self.directory.clone().register_watcher(scope, mask, watcher)
514 }
515
516 fn unregister_watcher(self: Arc<Self>, key: usize) {
517 self.directory.clone().unregister_watcher(key)
518 }
519}
520
521impl From<object_store::Directory<FxVolume>> for BlobDirectory {
522 fn from(dir: object_store::Directory<FxVolume>) -> Self {
523 Self::new(dir.into())
524 }
525}
526
527#[cfg(test)]
528mod tests {
529 use super::*;
530 use crate::fuchsia::fxblob::testing::{BlobFixture, new_blob_fixture, open_blob_fixture};
531 use assert_matches::assert_matches;
532 use blob_writer::BlobWriter;
533 use delivery_blob::{CompressionMode, Type1Blob, delivery_blob_path};
534 use fidl_fuchsia_fxfs::BlobReaderMarker;
535 use fuchsia_async::{self as fasync, DurationExt as _, TimeoutExt as _};
536 use fuchsia_component_client::connect_to_protocol_at_dir_svc;
537 use fuchsia_fs::directory::{
538 DirEntry, DirentKind, WatchEvent, WatchMessage, Watcher, readdir_inclusive,
539 };
540
541 use futures::StreamExt as _;
542 use std::path::PathBuf;
543
544 #[fasync::run(10, test)]
545 async fn test_unlink() {
546 let fixture = new_blob_fixture().await;
547
548 let data = [1; 1000];
549
550 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
551
552 assert_eq!(fixture.read_blob(hash).await, data);
553
554 fixture
555 .root()
556 .unlink(&format!("{}", hash), &fio::UnlinkOptions::default())
557 .await
558 .expect("FIDL failed")
559 .expect("unlink failed");
560
561 fixture.close().await;
562 }
563
564 #[fasync::run(10, test)]
565 async fn test_readdir() {
566 let fixture = new_blob_fixture().await;
567
568 let data = [0xab; 2];
569 let hash;
570 {
571 hash = fuchsia_merkle::root_from_slice(&data);
572 let compressed_data: Vec<u8> = Type1Blob::generate(&data, CompressionMode::Always);
573
574 let blob_proxy =
575 connect_to_protocol_at_dir_svc::<fidl_fuchsia_fxfs::BlobCreatorMarker>(
576 fixture.volume_out_dir(),
577 )
578 .expect("failed to connect to the Blob service");
579
580 let blob_writer_client_end = blob_proxy
581 .create(&hash.into(), false)
582 .await
583 .expect("transport error on create")
584 .expect("failed to create blob");
585
586 let writer = blob_writer_client_end.into_proxy();
587 let mut blob_writer = BlobWriter::create(writer, compressed_data.len() as u64)
588 .await
589 .expect("failed to create BlobWriter");
590 blob_writer.write(&compressed_data[..1]).await.unwrap();
591
592 assert_eq!(
594 readdir_inclusive(fixture.root()).await.ok(),
595 Some(vec![DirEntry { name: ".".to_string(), kind: DirentKind::Directory }])
596 );
597
598 blob_writer.write(&compressed_data[1..]).await.unwrap();
599 }
600
601 assert_eq!(
602 readdir_inclusive(fixture.root()).await.ok(),
603 Some(vec![
604 DirEntry { name: ".".to_string(), kind: DirentKind::Directory },
605 DirEntry { name: format! {"{}", hash}, kind: DirentKind::File },
606 ])
607 );
608
609 fixture.close().await;
610 }
611
612 #[fasync::run(10, test)]
613 async fn test_watchers() {
614 let fixture = new_blob_fixture().await;
615
616 let mut watcher = Watcher::new(fixture.root()).await.unwrap();
617 assert_eq!(
618 watcher.next().await,
619 Some(Ok(WatchMessage { event: WatchEvent::EXISTING, filename: PathBuf::from(".") }))
620 );
621 assert_matches!(
622 watcher.next().await,
623 Some(Ok(WatchMessage { event: WatchEvent::IDLE, .. }))
624 );
625
626 let data = vec![vec![0xab; 2], vec![0xcd; 65_536]];
627 let mut hashes = vec![];
628 let mut filenames = vec![];
629 for datum in data {
630 let hash = fuchsia_merkle::root_from_slice(&datum);
631 let filename = PathBuf::from(format!("{}", hash));
632 hashes.push(hash.clone());
633 filenames.push(filename.clone());
634
635 let compressed_data: Vec<u8> = Type1Blob::generate(&datum, CompressionMode::Always);
636
637 let blob_proxy =
638 connect_to_protocol_at_dir_svc::<fidl_fuchsia_fxfs::BlobCreatorMarker>(
639 fixture.volume_out_dir(),
640 )
641 .expect("failed to connect to the Blob service");
642
643 let blob_writer_client_end = blob_proxy
644 .create(&hash.into(), false)
645 .await
646 .expect("transport error on create")
647 .expect("failed to create blob");
648
649 let writer = blob_writer_client_end.into_proxy();
650 let mut blob_writer = BlobWriter::create(writer, compressed_data.len() as u64)
651 .await
652 .expect("failed to create BlobWriter");
653 blob_writer.write(&compressed_data[..compressed_data.len() - 1]).await.unwrap();
654
655 assert_matches!(
657 watcher
658 .next()
659 .on_timeout(zx::MonotonicDuration::from_millis(500).after_now(), || None)
660 .await,
661 None
662 );
663
664 blob_writer.write(&compressed_data[compressed_data.len() - 1..]).await.unwrap();
665
666 assert_eq!(
667 watcher.next().await,
668 Some(Ok(WatchMessage { event: WatchEvent::ADD_FILE, filename }))
669 );
670 }
671
672 for (hash, filename) in hashes.iter().zip(filenames) {
673 fixture
674 .root()
675 .unlink(&format!("{}", hash), &fio::UnlinkOptions::default())
676 .await
677 .expect("FIDL call failed")
678 .expect("unlink failed");
679 assert_eq!(
680 watcher.next().await,
681 Some(Ok(WatchMessage { event: WatchEvent::REMOVE_FILE, filename }))
682 );
683 }
684
685 std::mem::drop(watcher);
686 fixture.close().await;
687 }
688
689 #[fasync::run(10, test)]
690 async fn test_rename_fails() {
691 let fixture = new_blob_fixture().await;
692
693 let data = vec![];
694 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
695
696 let (status, token) = fixture.root().get_token().await.expect("FIDL failed");
697 Status::ok(status).unwrap();
698 fixture
699 .root()
700 .rename(&format!("{}", delivery_blob_path(hash)), token.unwrap().into(), "foo")
701 .await
702 .expect("FIDL failed")
703 .expect_err("rename should fail");
704
705 fixture.close().await;
706 }
707
708 #[fasync::run(10, test)]
709 async fn test_link_fails() {
710 let fixture = new_blob_fixture().await;
711
712 let data = vec![];
713 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
714
715 let (status, token) = fixture.root().get_token().await.expect("FIDL failed");
716 Status::ok(status).unwrap();
717 let status = fixture
718 .root()
719 .link(&format!("{}", hash), token.unwrap().into(), "foo")
720 .await
721 .expect("FIDL failed");
722 assert_eq!(Status::from_raw(status), Status::NOT_SUPPORTED);
723
724 fixture.close().await;
725 }
726
727 #[fasync::run(10, test)]
728 async fn test_verify_cached_hash_node() {
729 let fixture = new_blob_fixture().await;
730
731 let data = vec![];
732 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
733 let evil_hash =
734 Hash::from_str("2222222222222222222222222222222222222222222222222222222222222222")
735 .unwrap();
736
737 {
740 let root = fixture
741 .volume()
742 .root()
743 .clone()
744 .as_node()
745 .into_any()
746 .downcast::<BlobDirectory>()
747 .unwrap()
748 .directory()
749 .clone();
750 root.clone()
751 .link(evil_hash.to_string(), root, &hash.to_string())
752 .await
753 .expect("Linking file");
754 }
755 let device = fixture.close().await;
756
757 let fixture = open_blob_fixture(device).await;
758 {
759 let _vmo = fixture.get_blob_vmo(hash).await;
761
762 let blob_reader =
764 connect_to_protocol_at_dir_svc::<BlobReaderMarker>(fixture.volume_out_dir())
765 .expect("failed to connect to the BlobReader service");
766 blob_reader
767 .get_vmo(&evil_hash.into())
768 .await
769 .expect("transport error on BlobReader.GetVmo")
770 .expect_err("Hashes should mismatch");
771 }
772 fixture.close().await;
773 }
774
775 #[fasync::run(10, test)]
776 async fn test_blob_needs_overwrite_verifies_existence() {
777 let fixture = new_blob_fixture().await;
778 let data = vec![42u8; 32];
779 let hash = fixture.write_blob(&data, CompressionMode::Never).await;
780
781 let blob_creator =
782 connect_to_protocol_at_dir_svc::<BlobCreatorMarker>(fixture.volume_out_dir())
783 .expect("failed to connect to the BlobCreator service");
784 fixture.volume().volume().dirent_cache().clear();
786 assert!(
787 !blob_creator
788 .needs_overwrite(&hash.into())
789 .await
790 .expect("fidl transport")
791 .expect("Find new blob")
792 );
793
794 let blob_reader =
796 connect_to_protocol_at_dir_svc::<BlobReaderMarker>(fixture.volume_out_dir())
797 .expect("failed to connect to the BlobReader service");
798 blob_reader
799 .get_vmo(&hash.into())
800 .await
801 .expect("transport error on BlobReader.GetVmo")
802 .expect("Opening blob");
803 assert!(
804 !blob_creator
805 .needs_overwrite(&hash.into())
806 .await
807 .expect("fidl transport")
808 .expect("Find new blob")
809 );
810
811 blob_creator
813 .needs_overwrite(&[1u8; 32].into())
814 .await
815 .expect("fidl transport")
816 .expect_err("Blob should not exist");
817 fixture.close().await;
818 }
819}