1#![deny(missing_docs)]
6
7use fidl::endpoints::{ClientEnd, ServerEnd};
10use fuchsia_hash::{Hash, ParseHashError};
11use futures::{StreamExt as _, stream};
12use log::{error, info};
13use std::collections::HashSet;
14use thiserror::Error;
15use vfs::common::send_on_open_with_error;
16use vfs::execution_scope::ExecutionScope;
17use vfs::file::StreamIoConnection;
18use vfs::{ObjectRequest, ObjectRequestRef, ProtocolsExt, ToObjectRequest as _};
19use zx::Status;
20use {fidl_fuchsia_fxfs as ffxfs, fidl_fuchsia_io as fio};
21
22pub mod mock;
23pub use mock::Mock;
24
25#[derive(Debug, Error)]
27#[allow(missing_docs)]
28pub enum BlobfsError {
29 #[error("while opening blobfs dir")]
30 OpenDir(#[from] fuchsia_fs::node::OpenError),
31
32 #[error("while cloning the blobfs dir")]
33 CloneDir(#[from] fuchsia_fs::node::CloneError),
34
35 #[error("while listing blobfs dir")]
36 ReadDir(#[source] fuchsia_fs::directory::EnumerateError),
37
38 #[error("while deleting blob")]
39 Unlink(#[source] Status),
40
41 #[error("while sync'ing")]
42 Sync(#[source] Status),
43
44 #[error("while parsing blob merkle hash")]
45 ParseHash(#[from] ParseHashError),
46
47 #[error("FIDL error")]
48 Fidl(#[from] fidl::Error),
49
50 #[error("while connecting to fuchsia.fxfs/BlobCreator")]
51 ConnectToBlobCreator(#[source] anyhow::Error),
52
53 #[error("while connecting to fuchsia.fxfs/BlobReader")]
54 ConnectToBlobReader(#[source] anyhow::Error),
55
56 #[error("while setting the VmexResource")]
57 InitVmexResource(#[source] anyhow::Error),
58}
59
60#[derive(Debug, Error)]
62#[allow(missing_docs)]
63pub enum CreateError {
64 #[error("the blob already exists or is being concurrently written")]
65 AlreadyExists,
66
67 #[error("while creating the blob")]
68 Io(#[source] fuchsia_fs::node::OpenError),
69
70 #[error("while converting the proxy into a client end")]
71 ConvertToClientEnd,
72
73 #[error("FIDL error")]
74 Fidl(#[from] fidl::Error),
75
76 #[error("while calling fuchsia.fxfs/BlobCreator.Create: {0:?}")]
77 BlobCreator(ffxfs::CreateBlobError),
78
79 #[error("this client was not created with a blob creator so it cannot write blobs")]
80 WritingNotConfigured,
81}
82
83impl From<ffxfs::CreateBlobError> for CreateError {
84 fn from(e: ffxfs::CreateBlobError) -> Self {
85 match e {
86 ffxfs::CreateBlobError::AlreadyExists => CreateError::AlreadyExists,
87 e @ ffxfs::CreateBlobError::Internal => CreateError::BlobCreator(e),
88 }
89 }
90}
91
92#[derive(Default)]
94pub struct ClientBuilder {
95 readable: bool,
96 writable: bool,
97 executable: bool,
98}
99
100impl ClientBuilder {
101 pub async fn build(self) -> Result<Client, BlobfsError> {
105 let mut flags = fio::Flags::empty();
106 if self.readable {
107 flags |= fio::PERM_READABLE
108 }
109 if self.writable {
110 flags |= fio::PERM_WRITABLE
111 }
112 if self.executable {
113 flags |= fio::PERM_EXECUTABLE
114 }
115 let dir = fuchsia_fs::directory::open_in_namespace("/blob", flags)?;
116 if let Ok(client) = fuchsia_component::client::connect_to_protocol::<
117 fidl_fuchsia_kernel::VmexResourceMarker,
118 >() && let Ok(vmex) = client.get().await
119 {
120 info!("Got vmex resource");
121 vmo_blob::init_vmex_resource(vmex).map_err(BlobfsError::InitVmexResource)?;
122 }
123 let reader = fuchsia_component::client::connect_to_protocol::<ffxfs::BlobReaderMarker>()
124 .map_err(BlobfsError::ConnectToBlobReader)?;
125 let creator = if self.writable {
126 Some(
127 fuchsia_component::client::connect_to_protocol::<ffxfs::BlobCreatorMarker>()
128 .map_err(BlobfsError::ConnectToBlobCreator)?,
129 )
130 } else {
131 None
132 };
133
134 Ok(Client { dir, creator, reader })
135 }
136
137 pub fn readable(self) -> Self {
140 Self { readable: true, ..self }
141 }
142
143 pub fn writable(self) -> Self {
147 Self { writable: true, ..self }
148 }
149
150 pub fn executable(self) -> Self {
153 Self { executable: true, ..self }
154 }
155}
156
157impl Client {
158 pub fn builder() -> ClientBuilder {
160 Default::default()
161 }
162}
163#[derive(Debug, Clone)]
165pub struct Client {
166 dir: fio::DirectoryProxy,
167 creator: Option<ffxfs::BlobCreatorProxy>,
168 reader: ffxfs::BlobReaderProxy,
169}
170
171impl Client {
172 pub fn new(
176 dir: fio::DirectoryProxy,
177 creator: Option<ffxfs::BlobCreatorProxy>,
178 reader: ffxfs::BlobReaderProxy,
179 vmex: Option<zx::Resource>,
180 ) -> Result<Self, anyhow::Error> {
181 if let Some(vmex) = vmex {
182 vmo_blob::init_vmex_resource(vmex)?;
183 }
184 Ok(Self { dir, creator, reader })
185 }
186
187 pub fn new_test() -> (
194 Self,
195 fio::DirectoryRequestStream,
196 ffxfs::BlobReaderRequestStream,
197 ffxfs::BlobCreatorRequestStream,
198 ) {
199 let (dir, dir_stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
200 let (reader, reader_stream) =
201 fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobReaderMarker>();
202 let (creator, creator_stream) =
203 fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
204
205 (Self { dir, creator: Some(creator), reader }, dir_stream, reader_stream, creator_stream)
206 }
207
208 pub fn new_mock() -> (Self, mock::Mock) {
215 let (dir, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
216 let (reader, reader_stream) =
217 fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobReaderMarker>();
218 let (creator, creator_stream) =
219 fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
220
221 (
222 Self { dir, creator: Some(creator), reader },
223 mock::Mock { stream, reader_stream, creator_stream },
224 )
225 }
226
227 pub async fn get_blob_vmo(&self, hash: &Hash) -> Result<zx::Vmo, GetBlobVmoError> {
229 self.reader
230 .get_vmo(hash)
231 .await
232 .map_err(GetBlobVmoError::Fidl)?
233 .map_err(|s| GetBlobVmoError::GetVmo(Status::from_raw(s)))
234 }
235
236 pub fn deprecated_open_blob_for_read(
238 &self,
239 blob: &Hash,
240 flags: fio::OpenFlags,
241 scope: ExecutionScope,
242 server_end: ServerEnd<fio::NodeMarker>,
243 ) -> Result<(), fidl::Error> {
244 let describe = flags.contains(fio::OpenFlags::DESCRIBE);
245 if flags.contains(fio::OpenFlags::RIGHT_WRITABLE) {
247 send_on_open_with_error(describe, server_end, zx::Status::ACCESS_DENIED);
248 return Ok(());
249 }
250 if flags.intersects(fio::OpenFlags::CREATE | fio::OpenFlags::CREATE_IF_ABSENT) {
252 send_on_open_with_error(describe, server_end, zx::Status::NOT_SUPPORTED);
253 return Ok(());
254 }
255 let object_request = flags.to_object_request(server_end);
256 let () = open_blob_with_reader(self.reader.clone(), *blob, scope, flags, object_request);
257 Ok(())
258 }
259
260 pub fn open_blob_for_read(
262 &self,
263 blob: &Hash,
264 flags: fio::Flags,
265 scope: ExecutionScope,
266 object_request: ObjectRequestRef<'_>,
267 ) -> Result<(), zx::Status> {
268 if flags.rights().is_some_and(|rights| rights.contains(fio::Operations::WRITE_BYTES)) {
269 return Err(zx::Status::ACCESS_DENIED);
270 }
271 if flags.creation_mode() != vfs::CreationMode::Never {
272 return Err(zx::Status::NOT_SUPPORTED);
273 }
274 let object_request = object_request.take();
276 let () = open_blob_with_reader(self.reader.clone(), *blob, scope, flags, object_request);
277 Ok(())
278 }
279
280 pub async fn list_known_blobs(&self) -> Result<HashSet<Hash>, BlobfsError> {
282 let private_connection = fuchsia_fs::directory::clone(&self.dir)?;
288 fuchsia_fs::directory::readdir(&private_connection)
289 .await
290 .map_err(BlobfsError::ReadDir)?
291 .into_iter()
292 .filter(|entry| entry.kind == fuchsia_fs::directory::DirentKind::File)
293 .map(|entry| entry.name.parse().map_err(BlobfsError::ParseHash))
294 .collect()
295 }
296
297 pub async fn delete_blob(&self, blob: &Hash) -> Result<(), BlobfsError> {
299 self.dir
300 .unlink(&blob.to_string(), &fio::UnlinkOptions::default())
301 .await?
302 .map_err(|s| BlobfsError::Unlink(Status::from_raw(s)))
303 }
304
305 pub async fn open_blob_for_write(
307 &self,
308 blob: &Hash,
309 ) -> Result<ClientEnd<ffxfs::BlobWriterMarker>, CreateError> {
310 let Some(creator) = &self.creator else {
311 return Err(CreateError::WritingNotConfigured);
312 };
313 Ok(creator.create(blob, false).await??)
314 }
315
316 pub async fn has_blob(&self, blob: &Hash) -> bool {
323 matches!(self.reader.get_vmo(blob).await, Ok(Ok(_)))
325 }
326
327 pub async fn filter_to_missing_blobs(
338 &self,
339 candidates: impl IntoIterator<Item = Hash>,
340 ) -> HashSet<Hash> {
341 stream::iter(candidates)
349 .map(move |blob| async move {
350 if self.has_blob(&blob).await {
351 None
352 } else {
353 Some(blob)
354 }
355 })
356 .buffer_unordered(10)
359 .filter_map(|blob| async move { blob })
360 .collect()
361 .await
362 }
363
364 pub async fn sync(&self) -> Result<(), BlobfsError> {
366 self.dir.sync().await?.map_err(zx::Status::from_raw).map_err(BlobfsError::Sync)
367 }
368}
369
370fn open_blob_with_reader<P: ProtocolsExt + Send>(
373 reader: ffxfs::BlobReaderProxy,
374 blob_hash: Hash,
375 scope: ExecutionScope,
376 protocols: P,
377 object_request: ObjectRequest,
378) {
379 scope.clone().spawn(object_request.handle_async(async move |object_request| {
380 let get_vmo_result = reader.get_vmo(&blob_hash.into()).await.map_err(|fidl_error| {
381 if let fidl::Error::ClientChannelClosed { status, .. } = fidl_error {
382 error!("Blob reader channel closed: {:?}", status);
383 status
384 } else {
385 error!("Transport error on get_vmo: {:?}", fidl_error);
386 zx::Status::INTERNAL
387 }
388 })?;
389 let vmo = get_vmo_result.map_err(zx::Status::from_raw)?;
390 let vmo_blob = vmo_blob::VmoBlob::new(vmo);
391 object_request
392 .create_connection::<StreamIoConnection<_>, _>(scope, vmo_blob, protocols)
393 .await
394 }));
395}
396
397#[derive(thiserror::Error, Debug)]
398#[allow(missing_docs)]
399pub enum GetBlobVmoError {
400 #[error("getting the vmo")]
401 GetVmo(#[source] Status),
402
403 #[error("opening the blob")]
404 OpenBlob(#[source] fuchsia_fs::node::OpenError),
405
406 #[error("making a fidl request")]
407 Fidl(#[source] fidl::Error),
408}
409
410#[cfg(test)]
411impl Client {
412 pub fn for_ramdisk(blobfs: &blobfs_ramdisk::BlobfsRamdisk) -> Self {
423 Self::new(
424 blobfs.root_dir_proxy().unwrap(),
425 Some(blobfs.blob_creator_proxy().unwrap()),
426 blobfs.blob_reader_proxy().unwrap(),
427 None,
428 )
429 .unwrap()
430 }
431}
432
433#[cfg(test)]
434#[allow(clippy::bool_assert_comparison)]
435mod tests {
436 use super::*;
437 use assert_matches::assert_matches;
438 use blobfs_ramdisk::BlobfsRamdisk;
439 use fuchsia_async as fasync;
440 use futures::stream::TryStreamExt as _;
441 use std::sync::Arc;
442 use test_case::test_case;
443
444 #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
445 #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
446 #[fuchsia::test]
447 async fn list_known_blobs_empty(blob_impl: blobfs_ramdisk::Implementation) {
448 let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
449 let client = Client::for_ramdisk(&blobfs);
450
451 assert_eq!(client.list_known_blobs().await.unwrap(), HashSet::new());
452 blobfs.stop().await.unwrap();
453 }
454
455 #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
456 #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
457 #[fuchsia::test]
458 async fn list_known_blobs(blob_impl: blobfs_ramdisk::Implementation) {
459 let blobfs = BlobfsRamdisk::builder()
460 .implementation(blob_impl)
461 .with_blob(&b"blob 1"[..])
462 .with_blob(&b"blob 2"[..])
463 .start()
464 .await
465 .unwrap();
466 let client = Client::for_ramdisk(&blobfs);
467
468 let expected = blobfs.list_blobs().unwrap().into_iter().collect();
469 assert_eq!(client.list_known_blobs().await.unwrap(), expected);
470 blobfs.stop().await.unwrap();
471 }
472
473 #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
474 #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
475 #[fuchsia::test]
476 async fn delete_blob_and_then_list(blob_impl: blobfs_ramdisk::Implementation) {
477 let blobfs = BlobfsRamdisk::builder()
478 .implementation(blob_impl)
479 .with_blob(&b"blob 1"[..])
480 .with_blob(&b"blob 2"[..])
481 .start()
482 .await
483 .unwrap();
484 let client = Client::for_ramdisk(&blobfs);
485
486 let merkle = fuchsia_merkle::root_from_slice(b"blob 1");
487 assert_matches!(client.delete_blob(&merkle).await, Ok(()));
488
489 let expected = HashSet::from([fuchsia_merkle::root_from_slice(b"blob 2")]);
490 assert_eq!(client.list_known_blobs().await.unwrap(), expected);
491 blobfs.stop().await.unwrap();
492 }
493
494 #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
495 #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
496 #[fuchsia::test]
497 async fn delete_nonexistent_blob(blob_impl: blobfs_ramdisk::Implementation) {
498 let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
499 let client = Client::for_ramdisk(&blobfs);
500 let blob_merkle = Hash::from([1; 32]);
501
502 assert_matches!(
503 client.delete_blob(&blob_merkle).await,
504 Err(BlobfsError::Unlink(Status::NOT_FOUND))
505 );
506 blobfs.stop().await.unwrap();
507 }
508
509 #[fuchsia::test]
510 async fn delete_blob_mock() {
511 let (client, mut stream, _, _) = Client::new_test();
512 let blob_merkle = Hash::from([1; 32]);
513 fasync::Task::spawn(async move {
514 match stream.try_next().await.unwrap().unwrap() {
515 fio::DirectoryRequest::Unlink { name, responder, .. } => {
516 assert_eq!(name, blob_merkle.to_string());
517 responder.send(Ok(())).unwrap();
518 }
519 other => panic!("unexpected request: {other:?}"),
520 }
521 })
522 .detach();
523
524 assert_matches!(client.delete_blob(&blob_merkle).await, Ok(()));
525 }
526
527 #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
528 #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
529 #[fuchsia::test]
530 async fn has_blob(blob_impl: blobfs_ramdisk::Implementation) {
531 let blobfs = BlobfsRamdisk::builder()
532 .implementation(blob_impl)
533 .with_blob(&b"blob 1"[..])
534 .start()
535 .await
536 .unwrap();
537 let client = Client::for_ramdisk(&blobfs);
538
539 assert!(client.has_blob(&fuchsia_merkle::root_from_slice(b"blob 1")).await);
540 assert!(!client.has_blob(&Hash::from([1; 32])).await);
541
542 blobfs.stop().await.unwrap();
543 }
544
545 #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
546 #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
547 #[fuchsia::test]
548 async fn has_blob_return_false_if_blob_is_partially_written(
549 blob_impl: blobfs_ramdisk::Implementation,
550 ) {
551 let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
552 let client = Client::for_ramdisk(&blobfs);
553
554 let content = &[3; 1024];
555 let hash = fuchsia_merkle::root_from_slice(content);
556 let delivery_content =
557 delivery_blob::Type1Blob::generate(content, delivery_blob::CompressionMode::Always);
558
559 let writer = client.open_blob_for_write(&hash).await.unwrap().into_proxy();
560 assert!(!client.has_blob(&hash).await);
561
562 let n = delivery_content.len();
563 let vmo = writer.get_vmo(n.try_into().unwrap()).await.unwrap().unwrap();
564 assert!(!client.has_blob(&hash).await);
565
566 let () = vmo.write(&delivery_content[0..n - 1], 0).unwrap();
567 let () = writer.bytes_ready((n - 1).try_into().unwrap()).await.unwrap().unwrap();
568 assert!(!client.has_blob(&hash).await);
569
570 let () = vmo.write(&delivery_content[n - 1..], (n - 1).try_into().unwrap()).unwrap();
571 let () = writer.bytes_ready(1.try_into().unwrap()).await.unwrap().unwrap();
572 assert!(client.has_blob(&hash).await);
573
574 blobfs.stop().await.unwrap();
575 }
576
577 async fn fully_write_blob(client: &Client, content: &[u8]) -> Hash {
578 let hash = fuchsia_merkle::root_from_slice(content);
579 let delivery_content =
580 delivery_blob::Type1Blob::generate(content, delivery_blob::CompressionMode::Always);
581 let writer = client.open_blob_for_write(&hash).await.unwrap().into_proxy();
582 let vmo = writer
583 .get_vmo(delivery_content.len().try_into().unwrap())
584 .await
585 .expect("a")
586 .map_err(zx::Status::from_raw)
587 .expect("b");
588 let () = vmo.write(&delivery_content, 0).unwrap();
589 let () =
590 writer.bytes_ready(delivery_content.len().try_into().unwrap()).await.unwrap().unwrap();
591 hash
592 }
593
594 #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
595 #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
596 #[fuchsia::test]
597 async fn filter_to_missing_blobs(blob_impl: blobfs_ramdisk::Implementation) {
598 let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
599 let client = Client::for_ramdisk(&blobfs);
600
601 let missing_hash0 = Hash::from([0; 32]);
602 let missing_hash1 = Hash::from([1; 32]);
603
604 let present_blob0 = fully_write_blob(&client, &[2; 1024]).await;
605 let present_blob1 = fully_write_blob(&client, &[3; 1024]).await;
606
607 assert_eq!(
608 client
609 .filter_to_missing_blobs([
610 missing_hash0,
611 missing_hash1,
612 present_blob0,
613 present_blob1
614 ])
615 .await,
616 HashSet::from([missing_hash0, missing_hash1])
617 );
618
619 blobfs.stop().await.unwrap();
620 }
621
622 #[fuchsia::test]
623 async fn sync() {
624 let counter = Arc::new(std::sync::atomic::AtomicU32::new(0));
625 let counter_clone = Arc::clone(&counter);
626 let (client, mut stream, _, _) = Client::new_test();
627 fasync::Task::spawn(async move {
628 match stream.try_next().await.unwrap().unwrap() {
629 fio::DirectoryRequest::Sync { responder } => {
630 counter_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
631 responder.send(Ok(())).unwrap();
632 }
633 other => panic!("unexpected request: {other:?}"),
634 }
635 })
636 .detach();
637
638 assert_matches!(client.sync().await, Ok(()));
639 assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1);
640 }
641
642 #[fuchsia::test]
643 async fn open_blob_for_write_maps_already_exists() {
644 let (blob_creator, mut blob_creator_stream) =
645 fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
646 let (blob_reader, _) = fidl::endpoints::create_proxy::<ffxfs::BlobReaderMarker>();
647
648 let client = Client::new(
649 fidl::endpoints::create_proxy::<fio::DirectoryMarker>().0,
650 Some(blob_creator),
651 blob_reader,
652 None,
653 )
654 .unwrap();
655
656 fuchsia_async::Task::spawn(async move {
657 match blob_creator_stream.next().await.unwrap().unwrap() {
658 ffxfs::BlobCreatorRequest::Create { hash, allow_existing, responder } => {
659 assert_eq!(hash, [0; 32]);
660 assert!(!allow_existing);
661 let () = responder.send(Err(ffxfs::CreateBlobError::AlreadyExists)).unwrap();
662 }
663 ffxfs::BlobCreatorRequest::NeedsOverwrite { .. } => {
664 unreachable!("This code path is not yet exercised.");
665 }
666 }
667 })
668 .detach();
669
670 assert_matches!(
671 client.open_blob_for_write(&[0; 32].into()).await,
672 Err(CreateError::AlreadyExists)
673 );
674 }
675
676 #[fuchsia::test]
677 async fn concurrent_list_known_blobs_all_return_full_contents() {
678 use futures::StreamExt;
679 let blobfs = BlobfsRamdisk::builder().start().await.unwrap();
680 let client = Client::for_ramdisk(&blobfs);
681
682 for i in 0..256u16 {
689 let _: Hash = fully_write_blob(&client, i.to_le_bytes().as_slice()).await;
690 }
691
692 let () = futures::stream::iter(0..100)
693 .for_each_concurrent(None, |_| async {
694 assert_eq!(client.list_known_blobs().await.unwrap().len(), 256);
695 })
696 .await;
697 }
698}