1#![deny(missing_docs)]
6
7use fidl::endpoints::{Proxy as _, ServerEnd};
10use fuchsia_hash::{Hash, ParseHashError};
11use futures::{stream, StreamExt as _};
12use log::{error, info, warn};
13use std::collections::HashSet;
14use thiserror::Error;
15use vfs::common::send_on_open_with_error;
16use vfs::execution_scope::ExecutionScope;
17use vfs::file::StreamIoConnection;
18use vfs::{ObjectRequest, ObjectRequestRef, ProtocolsExt, ToObjectRequest as _};
19use zx::{self as zx, AsHandleRef as _, Status};
20use {fidl_fuchsia_fxfs as ffxfs, fidl_fuchsia_io as fio, fidl_fuchsia_pkg as fpkg};
21
22pub mod mock;
23pub use mock::Mock;
24
25#[derive(Debug, Error)]
27#[allow(missing_docs)]
28pub enum BlobfsError {
29 #[error("while opening blobfs dir")]
30 OpenDir(#[from] fuchsia_fs::node::OpenError),
31
32 #[error("while cloning the blobfs dir")]
33 CloneDir(#[from] fuchsia_fs::node::CloneError),
34
35 #[error("while listing blobfs dir")]
36 ReadDir(#[source] fuchsia_fs::directory::EnumerateError),
37
38 #[error("while deleting blob")]
39 Unlink(#[source] Status),
40
41 #[error("while sync'ing")]
42 Sync(#[source] Status),
43
44 #[error("while parsing blob merkle hash")]
45 ParseHash(#[from] ParseHashError),
46
47 #[error("FIDL error")]
48 Fidl(#[from] fidl::Error),
49
50 #[error("while connecting to fuchsia.fxfs/BlobCreator")]
51 ConnectToBlobCreator(#[source] anyhow::Error),
52
53 #[error("while connecting to fuchsia.fxfs/BlobReader")]
54 ConnectToBlobReader(#[source] anyhow::Error),
55
56 #[error("while setting the VmexResource")]
57 InitVmexResource(#[source] anyhow::Error),
58}
59
60#[derive(Debug, Error)]
62#[allow(missing_docs)]
63pub enum CreateError {
64 #[error("the blob already exists or is being concurrently written")]
65 AlreadyExists,
66
67 #[error("while creating the blob")]
68 Io(#[source] fuchsia_fs::node::OpenError),
69
70 #[error("while converting the proxy into a client end")]
71 ConvertToClientEnd,
72
73 #[error("FIDL error")]
74 Fidl(#[from] fidl::Error),
75
76 #[error("while calling fuchsia.fxfs/BlobCreator.Create: {0:?}")]
77 BlobCreator(ffxfs::CreateBlobError),
78}
79
80impl From<ffxfs::CreateBlobError> for CreateError {
81 fn from(e: ffxfs::CreateBlobError) -> Self {
82 match e {
83 ffxfs::CreateBlobError::AlreadyExists => CreateError::AlreadyExists,
84 e @ ffxfs::CreateBlobError::Internal => CreateError::BlobCreator(e),
85 }
86 }
87}
88
89#[derive(Default)]
91pub struct ClientBuilder {
92 use_reader: Reader,
93 use_creator: bool,
94 readable: bool,
95 writable: bool,
96 executable: bool,
97}
98
99#[derive(Default)]
100enum Reader {
101 #[default]
102 DontUse,
103 Use {
104 use_vmex: bool,
105 },
106}
107
108impl ClientBuilder {
109 pub async fn build(self) -> Result<Client, BlobfsError> {
113 let mut flags = fio::Flags::empty();
114 if self.readable {
115 flags |= fio::PERM_READABLE
116 }
117 if self.writable {
118 flags |= fio::PERM_WRITABLE
119 }
120 if self.executable {
121 flags |= fio::PERM_EXECUTABLE
122 }
123 let dir = fuchsia_fs::directory::open_in_namespace("/blob", flags)?;
124 let reader = match self.use_reader {
125 Reader::DontUse => None,
126 Reader::Use { use_vmex } => {
127 if use_vmex {
128 if let Ok(client) = fuchsia_component::client::connect_to_protocol::<
129 fidl_fuchsia_kernel::VmexResourceMarker,
130 >() {
131 if let Ok(vmex) = client.get().await {
132 info!("Got vmex resource");
133 vmo_blob::init_vmex_resource(vmex)
134 .map_err(BlobfsError::InitVmexResource)?;
135 }
136 }
137 }
138 Some(
139 fuchsia_component::client::connect_to_protocol::<ffxfs::BlobReaderMarker>()
140 .map_err(BlobfsError::ConnectToBlobReader)?,
141 )
142 }
143 };
144
145 let creator = if self.use_creator {
146 Some(
147 fuchsia_component::client::connect_to_protocol::<ffxfs::BlobCreatorMarker>()
148 .map_err(BlobfsError::ConnectToBlobCreator)?,
149 )
150 } else {
151 None
152 };
153
154 Ok(Client { dir, creator, reader })
155 }
156
157 pub fn use_reader(self) -> Self {
160 Self { use_reader: Reader::Use { use_vmex: true }, ..self }
161 }
162
163 pub fn use_reader_no_vmex(self) -> Self {
166 Self { use_reader: Reader::Use { use_vmex: false }, ..self }
167 }
168
169 pub fn use_creator(self) -> Self {
171 Self { use_creator: true, ..self }
172 }
173
174 pub fn readable(self) -> Self {
177 Self { readable: true, ..self }
178 }
179
180 pub fn writable(self) -> Self {
183 Self { writable: true, ..self }
184 }
185
186 pub fn executable(self) -> Self {
189 Self { executable: true, ..self }
190 }
191}
192
193impl Client {
194 pub fn builder() -> ClientBuilder {
196 Default::default()
197 }
198}
199#[derive(Debug, Clone)]
201pub struct Client {
202 dir: fio::DirectoryProxy,
203 creator: Option<ffxfs::BlobCreatorProxy>,
204 reader: Option<ffxfs::BlobReaderProxy>,
205}
206
207impl Client {
208 pub fn new(
213 dir: fio::DirectoryProxy,
214 creator: Option<ffxfs::BlobCreatorProxy>,
215 reader: Option<ffxfs::BlobReaderProxy>,
216 vmex: Option<zx::Resource>,
217 ) -> Result<Self, anyhow::Error> {
218 if let Some(vmex) = vmex {
219 vmo_blob::init_vmex_resource(vmex)?;
220 }
221 Ok(Self { dir, creator, reader })
222 }
223
224 pub fn new_test() -> (Self, fio::DirectoryRequestStream) {
231 let (dir, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
232
233 (Self { dir, creator: None, reader: None }, stream)
234 }
235
236 pub fn new_mock() -> (Self, mock::Mock) {
243 let (dir, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
244
245 (Self { dir, creator: None, reader: None }, mock::Mock { stream })
246 }
247
248 pub async fn get_blob_vmo(&self, hash: &Hash) -> Result<zx::Vmo, GetBlobVmoError> {
250 if let Some(reader) = &self.reader {
251 reader
252 .get_vmo(hash)
253 .await
254 .map_err(GetBlobVmoError::Fidl)?
255 .map_err(|s| GetBlobVmoError::GetVmo(Status::from_raw(s)))
256 } else {
257 let blob =
258 fuchsia_fs::directory::open_file(&self.dir, &hash.to_string(), fio::PERM_READABLE)
259 .await
260 .map_err(GetBlobVmoError::OpenBlob)?;
261 blob.get_backing_memory(fio::VmoFlags::READ)
262 .await
263 .map_err(GetBlobVmoError::Fidl)?
264 .map_err(|s| GetBlobVmoError::GetVmo(Status::from_raw(s)))
265 }
266 }
267
268 pub fn deprecated_open_blob_for_read(
271 &self,
272 blob: &Hash,
273 flags: fio::OpenFlags,
274 scope: ExecutionScope,
275 server_end: ServerEnd<fio::NodeMarker>,
276 ) -> Result<(), fidl::Error> {
277 let describe = flags.contains(fio::OpenFlags::DESCRIBE);
278 if flags.contains(fio::OpenFlags::RIGHT_WRITABLE) {
280 send_on_open_with_error(describe, server_end, zx::Status::ACCESS_DENIED);
281 return Ok(());
282 }
283 if flags.intersects(fio::OpenFlags::CREATE | fio::OpenFlags::CREATE_IF_ABSENT) {
285 send_on_open_with_error(describe, server_end, zx::Status::NOT_SUPPORTED);
286 return Ok(());
287 }
288 if let Some(reader) = &self.reader {
290 let object_request = flags.to_object_request(server_end);
291 let () = open_blob_with_reader(reader.clone(), *blob, scope, flags, object_request);
292 Ok(())
293 } else {
294 self.dir.deprecated_open(flags, fio::ModeType::empty(), &blob.to_string(), server_end)
295 }
296 }
297
298 pub fn open_blob_for_read(
301 &self,
302 blob: &Hash,
303 flags: fio::Flags,
304 scope: ExecutionScope,
305 object_request: ObjectRequestRef<'_>,
306 ) -> Result<(), zx::Status> {
307 if flags.rights().is_some_and(|rights| rights.contains(fio::Operations::WRITE_BYTES)) {
309 return Err(zx::Status::ACCESS_DENIED);
310 }
311 if flags.creation_mode() != vfs::CreationMode::Never {
313 return Err(zx::Status::NOT_SUPPORTED);
314 }
315 let object_request = object_request.take();
317 if let Some(reader) = &self.reader {
319 let () = open_blob_with_reader(reader.clone(), *blob, scope, flags, object_request);
320 } else {
321 let _: Result<(), ()> = self
322 .dir
323 .open(
324 &blob.to_string(),
325 flags,
326 &object_request.options(),
327 object_request.into_channel(),
328 )
329 .map_err(|fidl_error| warn!("Failed to open blob {:?}: {:?}", blob, fidl_error));
330 }
331 Ok(())
332 }
333
334 pub async fn list_known_blobs(&self) -> Result<HashSet<Hash>, BlobfsError> {
336 let private_connection = fuchsia_fs::directory::clone(&self.dir)?;
342 fuchsia_fs::directory::readdir(&private_connection)
343 .await
344 .map_err(BlobfsError::ReadDir)?
345 .into_iter()
346 .filter(|entry| entry.kind == fuchsia_fs::directory::DirentKind::File)
347 .map(|entry| entry.name.parse().map_err(BlobfsError::ParseHash))
348 .collect()
349 }
350
351 pub async fn delete_blob(&self, blob: &Hash) -> Result<(), BlobfsError> {
353 self.dir
354 .unlink(&blob.to_string(), &fio::UnlinkOptions::default())
355 .await?
356 .map_err(|s| BlobfsError::Unlink(Status::from_raw(s)))
357 }
358
359 pub async fn open_blob_for_write(&self, blob: &Hash) -> Result<fpkg::BlobWriter, CreateError> {
361 Ok(if let Some(blob_creator) = &self.creator {
362 fpkg::BlobWriter::Writer(blob_creator.create(blob, false).await??)
363 } else {
364 fpkg::BlobWriter::File(
365 self.open_blob_proxy_from_dir_for_write(blob)
366 .await?
367 .into_channel()
368 .map_err(|_: fio::FileProxy| CreateError::ConvertToClientEnd)?
369 .into_zx_channel()
370 .into(),
371 )
372 })
373 }
374
375 async fn open_blob_proxy_from_dir_for_write(
377 &self,
378 blob: &Hash,
379 ) -> Result<fio::FileProxy, CreateError> {
380 let flags = fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_WRITABLE | fio::PERM_READABLE;
381
382 let path = delivery_blob::delivery_blob_path(blob);
383 fuchsia_fs::directory::open_file(&self.dir, &path, flags).await.map_err(|e| match e {
384 fuchsia_fs::node::OpenError::OpenError(Status::ACCESS_DENIED) => {
385 CreateError::AlreadyExists
386 }
387 other => CreateError::Io(other),
388 })
389 }
390
391 pub async fn has_blob(&self, blob: &Hash) -> bool {
399 if let Some(reader) = &self.reader {
400 matches!(reader.get_vmo(blob).await, Ok(Ok(_)))
402 } else {
403 let file = match fuchsia_fs::directory::open_file_async(
404 &self.dir,
405 &blob.to_string(),
406 fio::Flags::FLAG_SEND_REPRESENTATION | fio::PERM_READABLE,
407 ) {
408 Ok(file) => file,
409 Err(_) => return false,
410 };
411
412 let mut events = file.take_event_stream();
413
414 let event = match events.next().await {
415 None => return false,
416 Some(event) => match event {
417 Err(_) => return false,
418 Ok(event) => match event {
419 fio::FileEvent::OnOpen_ { s, info } => {
420 if Status::from_raw(s) != Status::OK {
421 return false;
422 }
423
424 match info {
425 Some(info) => match *info {
426 fio::NodeInfoDeprecated::File(fio::FileObject {
427 event: Some(event),
428 stream: _, }) => event,
430 _ => return false,
431 },
432 _ => return false,
433 }
434 }
435 fio::FileEvent::OnRepresentation { payload } => match payload {
436 fio::Representation::File(fio::FileInfo {
437 observer: Some(event),
438 stream: _, ..
440 }) => event,
441 _ => return false,
442 },
443 fio::FileEvent::_UnknownEvent { .. } => return false,
444 },
445 },
446 };
447
448 match event
451 .wait_handle(zx::Signals::USER_0, zx::MonotonicInstant::INFINITE_PAST)
452 .to_result()
453 {
454 Ok(_) => true,
455 Err(status) => {
456 if status != Status::TIMED_OUT {
457 warn!("blobfs: unknown error asserting blob existence: {}", status);
458 }
459 false
460 }
461 }
462 }
463 }
464
465 pub async fn filter_to_missing_blobs(&self, candidates: &HashSet<Hash>) -> HashSet<Hash> {
476 stream::iter(candidates.clone())
484 .map(move |blob| async move {
485 if self.has_blob(&blob).await {
486 None
487 } else {
488 Some(blob)
489 }
490 })
491 .buffer_unordered(10)
494 .filter_map(|blob| async move { blob })
495 .collect()
496 .await
497 }
498
499 pub async fn sync(&self) -> Result<(), BlobfsError> {
501 self.dir.sync().await?.map_err(zx::Status::from_raw).map_err(BlobfsError::Sync)
502 }
503}
504
505fn open_blob_with_reader<P: ProtocolsExt + Send>(
508 reader: ffxfs::BlobReaderProxy,
509 blob_hash: Hash,
510 scope: ExecutionScope,
511 protocols: P,
512 object_request: ObjectRequest,
513) {
514 scope.clone().spawn(object_request.handle_async(async move |object_request| {
515 let get_vmo_result = reader.get_vmo(&blob_hash.into()).await.map_err(|fidl_error| {
516 if let fidl::Error::ClientChannelClosed { status, .. } = fidl_error {
517 error!("Blob reader channel closed: {:?}", status);
518 status
519 } else {
520 error!("Transport error on get_vmo: {:?}", fidl_error);
521 zx::Status::INTERNAL
522 }
523 })?;
524 let vmo = get_vmo_result.map_err(zx::Status::from_raw)?;
525 let vmo_blob = vmo_blob::VmoBlob::new(vmo);
526 object_request
527 .create_connection::<StreamIoConnection<_>, _>(scope, vmo_blob, protocols)
528 .await
529 }));
530}
531
532#[derive(thiserror::Error, Debug)]
533#[allow(missing_docs)]
534pub enum GetBlobVmoError {
535 #[error("getting the vmo")]
536 GetVmo(#[source] Status),
537
538 #[error("opening the blob")]
539 OpenBlob(#[source] fuchsia_fs::node::OpenError),
540
541 #[error("making a fidl request")]
542 Fidl(#[source] fidl::Error),
543}
544
545#[cfg(test)]
546impl Client {
547 pub fn for_ramdisk(blobfs: &blobfs_ramdisk::BlobfsRamdisk) -> Self {
558 Self::new(
559 blobfs.root_dir_proxy().unwrap(),
560 blobfs.blob_creator_proxy().unwrap(),
561 blobfs.blob_reader_proxy().unwrap(),
562 None,
563 )
564 .unwrap()
565 }
566}
567
568#[cfg(test)]
569#[allow(clippy::bool_assert_comparison)]
570mod tests {
571 use super::*;
572 use assert_matches::assert_matches;
573 use blobfs_ramdisk::BlobfsRamdisk;
574 use fuchsia_async as fasync;
575 use futures::stream::TryStreamExt;
576 use maplit::hashset;
577 use std::io::Write as _;
578 use std::sync::Arc;
579
580 #[fasync::run_singlethreaded(test)]
581 async fn list_known_blobs_empty() {
582 let blobfs = BlobfsRamdisk::start().await.unwrap();
583 let client = Client::for_ramdisk(&blobfs);
584
585 assert_eq!(client.list_known_blobs().await.unwrap(), HashSet::new());
586 blobfs.stop().await.unwrap();
587 }
588
589 #[fasync::run_singlethreaded(test)]
590 async fn list_known_blobs() {
591 let blobfs = BlobfsRamdisk::builder()
592 .with_blob(&b"blob 1"[..])
593 .with_blob(&b"blob 2"[..])
594 .start()
595 .await
596 .unwrap();
597 let client = Client::for_ramdisk(&blobfs);
598
599 let expected = blobfs.list_blobs().unwrap().into_iter().collect();
600 assert_eq!(client.list_known_blobs().await.unwrap(), expected);
601 blobfs.stop().await.unwrap();
602 }
603
604 #[fasync::run_singlethreaded(test)]
605 async fn delete_blob_and_then_list() {
606 let blobfs = BlobfsRamdisk::builder()
607 .with_blob(&b"blob 1"[..])
608 .with_blob(&b"blob 2"[..])
609 .start()
610 .await
611 .unwrap();
612 let client = Client::for_ramdisk(&blobfs);
613
614 let merkle = fuchsia_merkle::from_slice(&b"blob 1"[..]).root();
615 assert_matches!(client.delete_blob(&merkle).await, Ok(()));
616
617 let expected = hashset! {fuchsia_merkle::from_slice(&b"blob 2"[..]).root()};
618 assert_eq!(client.list_known_blobs().await.unwrap(), expected);
619 blobfs.stop().await.unwrap();
620 }
621
622 #[fasync::run_singlethreaded(test)]
623 async fn delete_non_existing_blob() {
624 let blobfs = BlobfsRamdisk::start().await.unwrap();
625 let client = Client::for_ramdisk(&blobfs);
626 let blob_merkle = Hash::from([1; 32]);
627
628 assert_matches!(
629 client.delete_blob(&blob_merkle).await,
630 Err(BlobfsError::Unlink(Status::NOT_FOUND))
631 );
632 blobfs.stop().await.unwrap();
633 }
634
635 #[fasync::run_singlethreaded(test)]
636 async fn delete_blob_mock() {
637 let (client, mut stream) = Client::new_test();
638 let blob_merkle = Hash::from([1; 32]);
639 fasync::Task::spawn(async move {
640 match stream.try_next().await.unwrap().unwrap() {
641 fio::DirectoryRequest::Unlink { name, responder, .. } => {
642 assert_eq!(name, blob_merkle.to_string());
643 responder.send(Ok(())).unwrap();
644 }
645 other => panic!("unexpected request: {other:?}"),
646 }
647 })
648 .detach();
649
650 assert_matches!(client.delete_blob(&blob_merkle).await, Ok(()));
651 }
652
653 #[fasync::run_singlethreaded(test)]
654 async fn has_blob() {
655 let blobfs = BlobfsRamdisk::builder().with_blob(&b"blob 1"[..]).start().await.unwrap();
656 let client = Client::for_ramdisk(&blobfs);
657
658 assert_eq!(client.has_blob(&fuchsia_merkle::from_slice(&b"blob 1"[..]).root()).await, true);
659 assert_eq!(client.has_blob(&Hash::from([1; 32])).await, false);
660
661 blobfs.stop().await.unwrap();
662 }
663
664 #[fasync::run_singlethreaded(test)]
665 async fn has_blob_fxblob() {
666 let blobfs =
667 BlobfsRamdisk::builder().fxblob().with_blob(&b"blob 1"[..]).start().await.unwrap();
668 let client = Client::for_ramdisk(&blobfs);
669
670 assert!(client.has_blob(&fuchsia_merkle::from_slice(&b"blob 1"[..]).root()).await);
671 assert!(!client.has_blob(&Hash::from([1; 32])).await);
672
673 blobfs.stop().await.unwrap();
674 }
675
676 #[fasync::run_singlethreaded(test)]
677 async fn has_blob_return_false_if_blob_is_partially_written() {
678 let blobfs = BlobfsRamdisk::start().await.unwrap();
679 let client = Client::for_ramdisk(&blobfs);
680
681 let blob = [3; 1024];
682 let hash = fuchsia_merkle::from_slice(&blob).root();
683
684 let mut file = blobfs.root_dir().unwrap().write_file(hash.to_string(), 0o777).unwrap();
685 assert_eq!(client.has_blob(&hash).await, false);
686 file.set_len(blob.len() as u64).unwrap();
687 assert_eq!(client.has_blob(&hash).await, false);
688 file.write_all(&blob[..512]).unwrap();
689 assert_eq!(client.has_blob(&hash).await, false);
690 file.write_all(&blob[512..]).unwrap();
691 assert_eq!(client.has_blob(&hash).await, true);
692
693 blobfs.stop().await.unwrap();
694 }
695
696 async fn resize(blob: &fio::FileProxy, size: usize) {
697 let () = blob.resize(size as u64).await.unwrap().map_err(Status::from_raw).unwrap();
698 }
699
700 async fn write(blob: &fio::FileProxy, bytes: &[u8]) {
701 assert_eq!(
702 blob.write(bytes).await.unwrap().map_err(Status::from_raw).unwrap(),
703 bytes.len() as u64
704 );
705 }
706
707 #[fasync::run_singlethreaded(test)]
708 async fn write_delivery_blob() {
709 let blobfs = BlobfsRamdisk::start().await.unwrap();
710 let client = Client::for_ramdisk(&blobfs);
711
712 let content = [3; 1024];
713 let hash = fuchsia_merkle::from_slice(&content).root();
714 let delivery_content =
715 delivery_blob::Type1Blob::generate(&content, delivery_blob::CompressionMode::Always);
716
717 let proxy = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
718
719 let () = resize(&proxy, delivery_content.len()).await;
720 let () = write(&proxy, &delivery_content).await;
721
722 assert!(client.has_blob(&hash).await);
723
724 blobfs.stop().await.unwrap();
725 }
726
727 struct TestBlob {
731 _blob: fio::FileProxy,
732 hash: Hash,
733 }
734
735 async fn open_blob_only(client: &Client, content: &[u8]) -> TestBlob {
736 let hash = fuchsia_merkle::from_slice(content).root();
737 let _blob = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
738 TestBlob { _blob, hash }
739 }
740
741 async fn open_and_truncate_blob(client: &Client, content: &[u8]) -> TestBlob {
742 let hash = fuchsia_merkle::from_slice(content).root();
743 let _blob = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
744 let () = resize(&_blob, content.len()).await;
745 TestBlob { _blob, hash }
746 }
747
748 async fn partially_write_blob(client: &Client, content: &[u8]) -> TestBlob {
749 let hash = fuchsia_merkle::from_slice(content).root();
750 let _blob = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
751 let content = delivery_blob::generate(delivery_blob::DeliveryBlobType::Type1, content);
752 let () = resize(&_blob, content.len()).await;
753 let () = write(&_blob, &content[..content.len() / 2]).await;
754 TestBlob { _blob, hash }
755 }
756
757 async fn fully_write_blob(client: &Client, content: &[u8]) -> TestBlob {
758 let hash = fuchsia_merkle::from_slice(content).root();
759 let _blob = client.open_blob_proxy_from_dir_for_write(&hash).await.unwrap();
760 let content = delivery_blob::generate(delivery_blob::DeliveryBlobType::Type1, content);
761 let () = resize(&_blob, content.len()).await;
762 let () = write(&_blob, &content).await;
763 TestBlob { _blob, hash }
764 }
765
766 #[fasync::run_singlethreaded(test)]
767 async fn filter_to_missing_blobs() {
768 let blobfs = BlobfsRamdisk::builder().start().await.unwrap();
769 let client = Client::for_ramdisk(&blobfs);
770
771 let missing_hash0 = Hash::from([0; 32]);
772 let missing_hash1 = Hash::from([1; 32]);
773
774 let present_blob0 = fully_write_blob(&client, &[2; 1024]).await;
775 let present_blob1 = fully_write_blob(&client, &[3; 1024]).await;
776
777 assert_eq!(
778 client
779 .filter_to_missing_blobs(
780 &hashset! { missing_hash0, missing_hash1,
782 present_blob0.hash, present_blob1.hash
783 },
784 )
785 .await,
786 hashset! { missing_hash0, missing_hash1 }
787 );
788
789 blobfs.stop().await.unwrap();
790 }
791
792 #[fasync::run_singlethreaded(test)]
794 async fn filter_to_missing_blobs_with_partially_written_blobs() {
795 let blobfs = BlobfsRamdisk::builder().start().await.unwrap();
796 let client = Client::for_ramdisk(&blobfs);
797
798 let missing_blob0 = open_blob_only(&client, &[0; 1024]).await;
800
801 let missing_blob1 = open_and_truncate_blob(&client, &[1; 1024]).await;
803
804 let missing_blob2 = partially_write_blob(&client, &[2; 1024]).await;
806
807 let present_blob = fully_write_blob(&client, &[3; 1024]).await;
809
810 assert_eq!(
811 client
812 .filter_to_missing_blobs(&hashset! {
813 missing_blob0.hash,
814 missing_blob1.hash,
815 missing_blob2.hash,
816 present_blob.hash
817 },)
818 .await,
819 hashset! { missing_blob0.hash, missing_blob1.hash, missing_blob2.hash }
821 );
822
823 blobfs.stop().await.unwrap();
824 }
825
826 #[fasync::run_singlethreaded(test)]
827 async fn sync() {
828 let counter = Arc::new(std::sync::atomic::AtomicU32::new(0));
829 let counter_clone = Arc::clone(&counter);
830 let (client, mut stream) = Client::new_test();
831 fasync::Task::spawn(async move {
832 match stream.try_next().await.unwrap().unwrap() {
833 fio::DirectoryRequest::Sync { responder } => {
834 counter_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
835 responder.send(Ok(())).unwrap();
836 }
837 other => panic!("unexpected request: {other:?}"),
838 }
839 })
840 .detach();
841
842 assert_matches!(client.sync().await, Ok(()));
843 assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1);
844 }
845
846 #[fasync::run_singlethreaded(test)]
847 async fn open_blob_for_write_uses_fxblob_if_configured() {
848 let (blob_creator, mut blob_creator_stream) =
849 fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
850 let (blob_reader, _) = fidl::endpoints::create_proxy::<ffxfs::BlobReaderMarker>();
851 let client = Client::new(
852 fidl::endpoints::create_proxy::<fio::DirectoryMarker>().0,
853 Some(blob_creator),
854 Some(blob_reader),
855 None,
856 )
857 .unwrap();
858
859 fuchsia_async::Task::spawn(async move {
860 match blob_creator_stream.next().await.unwrap().unwrap() {
861 ffxfs::BlobCreatorRequest::Create { hash, allow_existing, responder } => {
862 assert_eq!(hash, [0; 32]);
863 assert!(!allow_existing);
864 let () = responder.send(Ok(fidl::endpoints::create_endpoints().0)).unwrap();
865 }
866 }
867 })
868 .detach();
869
870 assert_matches!(
871 client.open_blob_for_write(&[0; 32].into()).await,
872 Ok(fpkg::BlobWriter::Writer(_))
873 );
874 }
875
876 #[fasync::run_singlethreaded(test)]
877 async fn open_blob_for_write_fxblob_maps_already_exists() {
878 let (blob_creator, mut blob_creator_stream) =
879 fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
880 let (blob_reader, _) = fidl::endpoints::create_proxy::<ffxfs::BlobReaderMarker>();
881
882 let client = Client::new(
883 fidl::endpoints::create_proxy::<fio::DirectoryMarker>().0,
884 Some(blob_creator),
885 Some(blob_reader),
886 None,
887 )
888 .unwrap();
889
890 fuchsia_async::Task::spawn(async move {
891 match blob_creator_stream.next().await.unwrap().unwrap() {
892 ffxfs::BlobCreatorRequest::Create { hash, allow_existing, responder } => {
893 assert_eq!(hash, [0; 32]);
894 assert!(!allow_existing);
895 let () = responder.send(Err(ffxfs::CreateBlobError::AlreadyExists)).unwrap();
896 }
897 }
898 })
899 .detach();
900
901 assert_matches!(
902 client.open_blob_for_write(&[0; 32].into()).await,
903 Err(CreateError::AlreadyExists)
904 );
905 }
906
907 #[fasync::run_singlethreaded(test)]
908 async fn concurrent_list_known_blobs_all_return_full_contents() {
909 use futures::StreamExt;
910 let blobfs = BlobfsRamdisk::builder().start().await.unwrap();
911 let client = Client::for_ramdisk(&blobfs);
912
913 for i in 0..256u16 {
920 let _: TestBlob = fully_write_blob(&client, i.to_le_bytes().as_slice()).await;
921 }
922
923 let () = futures::stream::iter(0..100)
924 .for_each_concurrent(None, |_| async {
925 assert_eq!(client.list_known_blobs().await.unwrap().len(), 256);
926 })
927 .await;
928 }
929}