1#![deny(missing_docs)]
6
7use fidl::endpoints::ClientEnd;
10use fuchsia_hash::{Hash, ParseHashError};
11use futures::{StreamExt as _, stream};
12use log::{error, info};
13use std::collections::HashSet;
14use thiserror::Error;
15use vfs::execution_scope::ExecutionScope;
16use vfs::file::StreamIoConnection;
17use vfs::{ObjectRequest, ObjectRequestRef, ProtocolsExt};
18use zx::{self as zx, Status};
19use {fidl_fuchsia_fxfs as ffxfs, fidl_fuchsia_io as fio};
20
21pub mod mock;
22pub use mock::Mock;
23
24#[derive(Debug, Error)]
26#[allow(missing_docs)]
27pub enum BlobfsError {
28 #[error("while opening blobfs dir")]
29 OpenDir(#[from] fuchsia_fs::node::OpenError),
30
31 #[error("while cloning the blobfs dir")]
32 CloneDir(#[from] fuchsia_fs::node::CloneError),
33
34 #[error("while listing blobfs dir")]
35 ReadDir(#[source] fuchsia_fs::directory::EnumerateError),
36
37 #[error("while deleting blob")]
38 Unlink(#[source] Status),
39
40 #[error("while sync'ing")]
41 Sync(#[source] Status),
42
43 #[error("while parsing blob merkle hash")]
44 ParseHash(#[from] ParseHashError),
45
46 #[error("FIDL error")]
47 Fidl(#[from] fidl::Error),
48
49 #[error("while connecting to fuchsia.fxfs/BlobCreator")]
50 ConnectToBlobCreator(#[source] anyhow::Error),
51
52 #[error("while connecting to fuchsia.fxfs/BlobReader")]
53 ConnectToBlobReader(#[source] anyhow::Error),
54
55 #[error("while setting the VmexResource")]
56 InitVmexResource(#[source] anyhow::Error),
57}
58
59#[derive(Debug, Error)]
61#[allow(missing_docs)]
62pub enum CreateError {
63 #[error("the blob already exists or is being concurrently written")]
64 AlreadyExists,
65
66 #[error("while creating the blob")]
67 Io(#[source] fuchsia_fs::node::OpenError),
68
69 #[error("while converting the proxy into a client end")]
70 ConvertToClientEnd,
71
72 #[error("FIDL error")]
73 Fidl(#[from] fidl::Error),
74
75 #[error("while calling fuchsia.fxfs/BlobCreator.Create: {0:?}")]
76 BlobCreator(ffxfs::CreateBlobError),
77
78 #[error("this client was not created with a blob creator so it cannot write blobs")]
79 WritingNotConfigured,
80}
81
82impl From<ffxfs::CreateBlobError> for CreateError {
83 fn from(e: ffxfs::CreateBlobError) -> Self {
84 match e {
85 ffxfs::CreateBlobError::AlreadyExists => CreateError::AlreadyExists,
86 e @ ffxfs::CreateBlobError::Internal => CreateError::BlobCreator(e),
87 }
88 }
89}
90
91#[derive(Default)]
93pub struct ClientBuilder {
94 readable: bool,
95 writable: bool,
96 executable: bool,
97}
98
99impl ClientBuilder {
100 pub async fn build(self) -> Result<Client, BlobfsError> {
104 let mut flags = fio::Flags::empty();
105 if self.readable {
106 flags |= fio::PERM_READABLE
107 }
108 if self.writable {
109 flags |= fio::PERM_WRITABLE
110 }
111 if self.executable {
112 flags |= fio::PERM_EXECUTABLE
113 }
114 let dir = fuchsia_fs::directory::open_in_namespace("/blob", flags)?;
115 if let Ok(client) = fuchsia_component::client::connect_to_protocol::<
116 fidl_fuchsia_kernel::VmexResourceMarker,
117 >() && let Ok(vmex) = client.get().await
118 {
119 info!("Got vmex resource");
120 vmo_blob::init_vmex_resource(vmex).map_err(BlobfsError::InitVmexResource)?;
121 }
122 let reader = fuchsia_component::client::connect_to_protocol::<ffxfs::BlobReaderMarker>()
123 .map_err(BlobfsError::ConnectToBlobReader)?;
124 let creator = if self.writable {
125 Some(
126 fuchsia_component::client::connect_to_protocol::<ffxfs::BlobCreatorMarker>()
127 .map_err(BlobfsError::ConnectToBlobCreator)?,
128 )
129 } else {
130 None
131 };
132
133 Ok(Client { dir, creator, reader })
134 }
135
136 pub fn readable(self) -> Self {
139 Self { readable: true, ..self }
140 }
141
142 pub fn writable(self) -> Self {
146 Self { writable: true, ..self }
147 }
148
149 pub fn executable(self) -> Self {
152 Self { executable: true, ..self }
153 }
154}
155
156impl Client {
157 pub fn builder() -> ClientBuilder {
159 Default::default()
160 }
161}
162#[derive(Debug, Clone)]
164pub struct Client {
165 dir: fio::DirectoryProxy,
166 creator: Option<ffxfs::BlobCreatorProxy>,
167 reader: ffxfs::BlobReaderProxy,
168}
169
170impl Client {
171 pub fn new(
175 dir: fio::DirectoryProxy,
176 creator: Option<ffxfs::BlobCreatorProxy>,
177 reader: ffxfs::BlobReaderProxy,
178 vmex: Option<zx::Resource>,
179 ) -> Result<Self, anyhow::Error> {
180 if let Some(vmex) = vmex {
181 vmo_blob::init_vmex_resource(vmex)?;
182 }
183 Ok(Self { dir, creator, reader })
184 }
185
186 pub fn new_test() -> (
193 Self,
194 fio::DirectoryRequestStream,
195 ffxfs::BlobReaderRequestStream,
196 ffxfs::BlobCreatorRequestStream,
197 ) {
198 let (dir, dir_stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
199 let (reader, reader_stream) =
200 fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobReaderMarker>();
201 let (creator, creator_stream) =
202 fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
203
204 (Self { dir, creator: Some(creator), reader }, dir_stream, reader_stream, creator_stream)
205 }
206
207 pub fn new_mock() -> (Self, mock::Mock) {
214 let (dir, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
215 let (reader, reader_stream) =
216 fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobReaderMarker>();
217 let (creator, creator_stream) =
218 fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
219
220 (
221 Self { dir, creator: Some(creator), reader },
222 mock::Mock { stream, reader_stream, creator_stream },
223 )
224 }
225
226 pub async fn get_blob_vmo(&self, hash: &Hash) -> Result<zx::Vmo, GetBlobVmoError> {
228 self.reader
229 .get_vmo(hash)
230 .await
231 .map_err(GetBlobVmoError::Fidl)?
232 .map_err(|s| GetBlobVmoError::GetVmo(Status::from_raw(s)))
233 }
234
235 pub fn open_blob_for_read(
238 &self,
239 blob: &Hash,
240 flags: fio::Flags,
241 scope: ExecutionScope,
242 object_request: ObjectRequestRef<'_>,
243 ) -> Result<(), zx::Status> {
244 if flags.rights().is_some_and(|rights| rights.contains(fio::Operations::WRITE_BYTES)) {
245 return Err(zx::Status::ACCESS_DENIED);
246 }
247 if flags.creation_mode() != vfs::CreationMode::Never {
248 return Err(zx::Status::NOT_SUPPORTED);
249 }
250 let object_request = object_request.take();
252 let () = open_blob_with_reader(self.reader.clone(), *blob, scope, flags, object_request);
253 Ok(())
254 }
255
256 pub async fn list_known_blobs(&self) -> Result<HashSet<Hash>, BlobfsError> {
258 let private_connection = fuchsia_fs::directory::clone(&self.dir)?;
264 fuchsia_fs::directory::readdir(&private_connection)
265 .await
266 .map_err(BlobfsError::ReadDir)?
267 .into_iter()
268 .filter(|entry| entry.kind == fuchsia_fs::directory::DirentKind::File)
269 .map(|entry| entry.name.parse().map_err(BlobfsError::ParseHash))
270 .collect()
271 }
272
273 pub async fn delete_blob(&self, blob: &Hash) -> Result<(), BlobfsError> {
275 self.dir
276 .unlink(&blob.to_string(), &fio::UnlinkOptions::default())
277 .await?
278 .map_err(|s| BlobfsError::Unlink(Status::from_raw(s)))
279 }
280
281 pub async fn open_blob_for_write(
283 &self,
284 blob: &Hash,
285 allow_existing: bool,
286 ) -> Result<ClientEnd<ffxfs::BlobWriterMarker>, CreateError> {
287 let Some(creator) = &self.creator else {
288 return Err(CreateError::WritingNotConfigured);
289 };
290 Ok(creator.create(blob, allow_existing).await??)
291 }
292
293 pub async fn has_blob(&self, blob: &Hash) -> bool {
300 matches!(self.reader.get_vmo(blob).await, Ok(Ok(_)))
302 }
303
304 pub async fn filter_to_missing_blobs(
315 &self,
316 candidates: impl IntoIterator<Item = Hash>,
317 ) -> HashSet<Hash> {
318 stream::iter(candidates)
326 .map(move |blob| async move {
327 if self.has_blob(&blob).await {
328 None
329 } else {
330 Some(blob)
331 }
332 })
333 .buffer_unordered(10)
336 .filter_map(|blob| async move { blob })
337 .collect()
338 .await
339 }
340
341 pub async fn sync(&self) -> Result<(), BlobfsError> {
343 self.dir.sync().await?.map_err(zx::Status::from_raw).map_err(BlobfsError::Sync)
344 }
345}
346
347fn open_blob_with_reader<P: ProtocolsExt + Send>(
350 reader: ffxfs::BlobReaderProxy,
351 blob_hash: Hash,
352 scope: ExecutionScope,
353 protocols: P,
354 object_request: ObjectRequest,
355) {
356 scope.clone().spawn(object_request.handle_async(async move |object_request| {
357 let get_vmo_result = reader.get_vmo(&blob_hash.into()).await.map_err(|fidl_error| {
358 if let fidl::Error::ClientChannelClosed { status, .. } = fidl_error {
359 error!("Blob reader channel closed: {:?}", status);
360 status
361 } else {
362 error!("Transport error on get_vmo: {:?}", fidl_error);
363 zx::Status::INTERNAL
364 }
365 })?;
366 let vmo = get_vmo_result.map_err(zx::Status::from_raw)?;
367 let vmo_blob = vmo_blob::VmoBlob::new(vmo);
368 object_request
369 .create_connection::<StreamIoConnection<_>, _>(scope, vmo_blob, protocols)
370 .await
371 }));
372}
373
374#[derive(thiserror::Error, Debug)]
375#[allow(missing_docs)]
376pub enum GetBlobVmoError {
377 #[error("getting the vmo")]
378 GetVmo(#[source] Status),
379
380 #[error("opening the blob")]
381 OpenBlob(#[source] fuchsia_fs::node::OpenError),
382
383 #[error("making a fidl request")]
384 Fidl(#[source] fidl::Error),
385}
386
387#[cfg(test)]
388impl Client {
389 pub fn for_ramdisk(blobfs: &blobfs_ramdisk::BlobfsRamdisk) -> Self {
400 Self::new(
401 blobfs.root_dir_proxy().unwrap(),
402 Some(blobfs.blob_creator_proxy().unwrap()),
403 blobfs.blob_reader_proxy().unwrap(),
404 None,
405 )
406 .unwrap()
407 }
408}
409
410#[cfg(test)]
411#[allow(clippy::bool_assert_comparison)]
412mod tests {
413 use super::*;
414 use assert_matches::assert_matches;
415 use blobfs_ramdisk::BlobfsRamdisk;
416 use fuchsia_async as fasync;
417 use futures::stream::TryStreamExt as _;
418 use std::sync::Arc;
419 use test_case::test_case;
420
421 #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
422 #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
423 #[fuchsia::test]
424 async fn list_known_blobs_empty(blob_impl: blobfs_ramdisk::Implementation) {
425 let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
426 let client = Client::for_ramdisk(&blobfs);
427
428 assert_eq!(client.list_known_blobs().await.unwrap(), HashSet::new());
429 blobfs.stop().await.unwrap();
430 }
431
432 #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
433 #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
434 #[fuchsia::test]
435 async fn list_known_blobs(blob_impl: blobfs_ramdisk::Implementation) {
436 let blobfs = BlobfsRamdisk::builder()
437 .implementation(blob_impl)
438 .with_blob(&b"blob 1"[..])
439 .with_blob(&b"blob 2"[..])
440 .start()
441 .await
442 .unwrap();
443 let client = Client::for_ramdisk(&blobfs);
444
445 let expected = blobfs.list_blobs().unwrap().into_iter().collect();
446 assert_eq!(client.list_known_blobs().await.unwrap(), expected);
447 blobfs.stop().await.unwrap();
448 }
449
450 #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
451 #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
452 #[fuchsia::test]
453 async fn delete_blob_and_then_list(blob_impl: blobfs_ramdisk::Implementation) {
454 let blobfs = BlobfsRamdisk::builder()
455 .implementation(blob_impl)
456 .with_blob(&b"blob 1"[..])
457 .with_blob(&b"blob 2"[..])
458 .start()
459 .await
460 .unwrap();
461 let client = Client::for_ramdisk(&blobfs);
462
463 let merkle = fuchsia_merkle::root_from_slice(b"blob 1");
464 assert_matches!(client.delete_blob(&merkle).await, Ok(()));
465
466 let expected = HashSet::from([fuchsia_merkle::root_from_slice(b"blob 2")]);
467 assert_eq!(client.list_known_blobs().await.unwrap(), expected);
468 blobfs.stop().await.unwrap();
469 }
470
471 #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
472 #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
473 #[fuchsia::test]
474 async fn delete_nonexistent_blob(blob_impl: blobfs_ramdisk::Implementation) {
475 let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
476 let client = Client::for_ramdisk(&blobfs);
477 let blob_merkle = Hash::from([1; 32]);
478
479 assert_matches!(
480 client.delete_blob(&blob_merkle).await,
481 Err(BlobfsError::Unlink(Status::NOT_FOUND))
482 );
483 blobfs.stop().await.unwrap();
484 }
485
486 #[fuchsia::test]
487 async fn delete_blob_mock() {
488 let (client, mut stream, _, _) = Client::new_test();
489 let blob_merkle = Hash::from([1; 32]);
490 fasync::Task::spawn(async move {
491 match stream.try_next().await.unwrap().unwrap() {
492 fio::DirectoryRequest::Unlink { name, responder, .. } => {
493 assert_eq!(name, blob_merkle.to_string());
494 responder.send(Ok(())).unwrap();
495 }
496 other => panic!("unexpected request: {other:?}"),
497 }
498 })
499 .detach();
500
501 assert_matches!(client.delete_blob(&blob_merkle).await, Ok(()));
502 }
503
504 #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
505 #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
506 #[fuchsia::test]
507 async fn has_blob(blob_impl: blobfs_ramdisk::Implementation) {
508 let blobfs = BlobfsRamdisk::builder()
509 .implementation(blob_impl)
510 .with_blob(&b"blob 1"[..])
511 .start()
512 .await
513 .unwrap();
514 let client = Client::for_ramdisk(&blobfs);
515
516 assert!(client.has_blob(&fuchsia_merkle::root_from_slice(b"blob 1")).await);
517 assert!(!client.has_blob(&Hash::from([1; 32])).await);
518
519 blobfs.stop().await.unwrap();
520 }
521
522 #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
523 #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
524 #[fuchsia::test]
525 async fn has_blob_return_false_if_blob_is_partially_written(
526 blob_impl: blobfs_ramdisk::Implementation,
527 ) {
528 let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
529 let client = Client::for_ramdisk(&blobfs);
530
531 let content = &[3; 1024];
532 let hash = fuchsia_merkle::root_from_slice(content);
533 let delivery_content =
534 delivery_blob::Type1Blob::generate(content, delivery_blob::CompressionMode::Always);
535
536 let writer = client.open_blob_for_write(&hash, false).await.unwrap().into_proxy();
537 assert!(!client.has_blob(&hash).await);
538
539 let n = delivery_content.len();
540 let vmo = writer.get_vmo(n.try_into().unwrap()).await.unwrap().unwrap();
541 assert!(!client.has_blob(&hash).await);
542
543 let () = vmo.write(&delivery_content[0..n - 1], 0).unwrap();
544 let () = writer.bytes_ready((n - 1).try_into().unwrap()).await.unwrap().unwrap();
545 assert!(!client.has_blob(&hash).await);
546
547 let () = vmo.write(&delivery_content[n - 1..], (n - 1).try_into().unwrap()).unwrap();
548 let () = writer.bytes_ready(1.try_into().unwrap()).await.unwrap().unwrap();
549 assert!(client.has_blob(&hash).await);
550
551 blobfs.stop().await.unwrap();
552 }
553
554 async fn fully_write_blob(client: &Client, content: &[u8]) -> Hash {
555 let hash = fuchsia_merkle::root_from_slice(content);
556 let delivery_content =
557 delivery_blob::Type1Blob::generate(content, delivery_blob::CompressionMode::Always);
558 let writer = client.open_blob_for_write(&hash, false).await.unwrap().into_proxy();
559 let vmo = writer
560 .get_vmo(delivery_content.len().try_into().unwrap())
561 .await
562 .expect("a")
563 .map_err(zx::Status::from_raw)
564 .expect("b");
565 let () = vmo.write(&delivery_content, 0).unwrap();
566 let () =
567 writer.bytes_ready(delivery_content.len().try_into().unwrap()).await.unwrap().unwrap();
568 hash
569 }
570
571 #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
572 #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
573 #[fuchsia::test]
574 async fn filter_to_missing_blobs(blob_impl: blobfs_ramdisk::Implementation) {
575 let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
576 let client = Client::for_ramdisk(&blobfs);
577
578 let missing_hash0 = Hash::from([0; 32]);
579 let missing_hash1 = Hash::from([1; 32]);
580
581 let present_blob0 = fully_write_blob(&client, &[2; 1024]).await;
582 let present_blob1 = fully_write_blob(&client, &[3; 1024]).await;
583
584 assert_eq!(
585 client
586 .filter_to_missing_blobs([
587 missing_hash0,
588 missing_hash1,
589 present_blob0,
590 present_blob1
591 ])
592 .await,
593 HashSet::from([missing_hash0, missing_hash1])
594 );
595
596 blobfs.stop().await.unwrap();
597 }
598
599 #[fuchsia::test]
600 async fn sync() {
601 let counter = Arc::new(std::sync::atomic::AtomicU32::new(0));
602 let counter_clone = Arc::clone(&counter);
603 let (client, mut stream, _, _) = Client::new_test();
604 fasync::Task::spawn(async move {
605 match stream.try_next().await.unwrap().unwrap() {
606 fio::DirectoryRequest::Sync { responder } => {
607 counter_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
608 responder.send(Ok(())).unwrap();
609 }
610 other => panic!("unexpected request: {other:?}"),
611 }
612 })
613 .detach();
614
615 assert_matches!(client.sync().await, Ok(()));
616 assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1);
617 }
618
619 #[fuchsia::test]
620 async fn open_blob_for_write_maps_already_exists() {
621 let (blob_creator, mut blob_creator_stream) =
622 fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
623 let (blob_reader, _) = fidl::endpoints::create_proxy::<ffxfs::BlobReaderMarker>();
624
625 let client = Client::new(
626 fidl::endpoints::create_proxy::<fio::DirectoryMarker>().0,
627 Some(blob_creator),
628 blob_reader,
629 None,
630 )
631 .unwrap();
632
633 fuchsia_async::Task::spawn(async move {
634 match blob_creator_stream.next().await.unwrap().unwrap() {
635 ffxfs::BlobCreatorRequest::Create { hash, allow_existing, responder } => {
636 assert_eq!(hash, [0; 32]);
637 assert!(!allow_existing);
638 let () = responder.send(Err(ffxfs::CreateBlobError::AlreadyExists)).unwrap();
639 }
640 ffxfs::BlobCreatorRequest::NeedsOverwrite { .. } => {
641 unreachable!("This code path is not yet exercised.");
642 }
643 }
644 })
645 .detach();
646
647 assert_matches!(
648 client.open_blob_for_write(&[0; 32].into(), false).await,
649 Err(CreateError::AlreadyExists)
650 );
651 }
652
653 #[fuchsia::test]
654 async fn concurrent_list_known_blobs_all_return_full_contents() {
655 use futures::StreamExt;
656 let blobfs = BlobfsRamdisk::builder().start().await.unwrap();
657 let client = Client::for_ramdisk(&blobfs);
658
659 for i in 0..256u16 {
666 let _: Hash = fully_write_blob(&client, i.to_le_bytes().as_slice()).await;
667 }
668
669 let () = futures::stream::iter(0..100)
670 .for_each_concurrent(None, |_| async {
671 assert_eq!(client.list_known_blobs().await.unwrap().len(), 256);
672 })
673 .await;
674 }
675}