1#![deny(missing_docs)]
6
7use fidl::endpoints::{ClientEnd, ServerEnd};
10use fuchsia_hash::{Hash, ParseHashError};
11use futures::{StreamExt as _, stream};
12use log::{error, info};
13use std::collections::HashSet;
14use thiserror::Error;
15use vfs::common::send_on_open_with_error;
16use vfs::execution_scope::ExecutionScope;
17use vfs::file::StreamIoConnection;
18use vfs::{ObjectRequest, ObjectRequestRef, ProtocolsExt, ToObjectRequest as _};
19use zx::Status;
20use {fidl_fuchsia_fxfs as ffxfs, fidl_fuchsia_io as fio};
21
22pub mod mock;
23pub use mock::Mock;
24
25#[derive(Debug, Error)]
27#[allow(missing_docs)]
28pub enum BlobfsError {
29 #[error("while opening blobfs dir")]
30 OpenDir(#[from] fuchsia_fs::node::OpenError),
31
32 #[error("while cloning the blobfs dir")]
33 CloneDir(#[from] fuchsia_fs::node::CloneError),
34
35 #[error("while listing blobfs dir")]
36 ReadDir(#[source] fuchsia_fs::directory::EnumerateError),
37
38 #[error("while deleting blob")]
39 Unlink(#[source] Status),
40
41 #[error("while sync'ing")]
42 Sync(#[source] Status),
43
44 #[error("while parsing blob merkle hash")]
45 ParseHash(#[from] ParseHashError),
46
47 #[error("FIDL error")]
48 Fidl(#[from] fidl::Error),
49
50 #[error("while connecting to fuchsia.fxfs/BlobCreator")]
51 ConnectToBlobCreator(#[source] anyhow::Error),
52
53 #[error("while connecting to fuchsia.fxfs/BlobReader")]
54 ConnectToBlobReader(#[source] anyhow::Error),
55
56 #[error("while setting the VmexResource")]
57 InitVmexResource(#[source] anyhow::Error),
58}
59
60#[derive(Debug, Error)]
62#[allow(missing_docs)]
63pub enum CreateError {
64 #[error("the blob already exists or is being concurrently written")]
65 AlreadyExists,
66
67 #[error("while creating the blob")]
68 Io(#[source] fuchsia_fs::node::OpenError),
69
70 #[error("while converting the proxy into a client end")]
71 ConvertToClientEnd,
72
73 #[error("FIDL error")]
74 Fidl(#[from] fidl::Error),
75
76 #[error("while calling fuchsia.fxfs/BlobCreator.Create: {0:?}")]
77 BlobCreator(ffxfs::CreateBlobError),
78
79 #[error("this client was not created with a blob creator so it cannot write blobs")]
80 WritingNotConfigured,
81}
82
83impl From<ffxfs::CreateBlobError> for CreateError {
84 fn from(e: ffxfs::CreateBlobError) -> Self {
85 match e {
86 ffxfs::CreateBlobError::AlreadyExists => CreateError::AlreadyExists,
87 e @ ffxfs::CreateBlobError::Internal => CreateError::BlobCreator(e),
88 }
89 }
90}
91
92#[derive(Default)]
94pub struct ClientBuilder {
95 readable: bool,
96 writable: bool,
97 executable: bool,
98}
99
100impl ClientBuilder {
101 pub async fn build(self) -> Result<Client, BlobfsError> {
105 let mut flags = fio::Flags::empty();
106 if self.readable {
107 flags |= fio::PERM_READABLE
108 }
109 if self.writable {
110 flags |= fio::PERM_WRITABLE
111 }
112 if self.executable {
113 flags |= fio::PERM_EXECUTABLE
114 }
115 let dir = fuchsia_fs::directory::open_in_namespace("/blob", flags)?;
116 if let Ok(client) = fuchsia_component::client::connect_to_protocol::<
117 fidl_fuchsia_kernel::VmexResourceMarker,
118 >() {
119 if let Ok(vmex) = client.get().await {
120 info!("Got vmex resource");
121 vmo_blob::init_vmex_resource(vmex).map_err(BlobfsError::InitVmexResource)?;
122 }
123 }
124 let reader = fuchsia_component::client::connect_to_protocol::<ffxfs::BlobReaderMarker>()
125 .map_err(BlobfsError::ConnectToBlobReader)?;
126 let creator = if self.writable {
127 Some(
128 fuchsia_component::client::connect_to_protocol::<ffxfs::BlobCreatorMarker>()
129 .map_err(BlobfsError::ConnectToBlobCreator)?,
130 )
131 } else {
132 None
133 };
134
135 Ok(Client { dir, creator, reader })
136 }
137
138 pub fn readable(self) -> Self {
141 Self { readable: true, ..self }
142 }
143
144 pub fn writable(self) -> Self {
148 Self { writable: true, ..self }
149 }
150
151 pub fn executable(self) -> Self {
154 Self { executable: true, ..self }
155 }
156}
157
158impl Client {
159 pub fn builder() -> ClientBuilder {
161 Default::default()
162 }
163}
164#[derive(Debug, Clone)]
166pub struct Client {
167 dir: fio::DirectoryProxy,
168 creator: Option<ffxfs::BlobCreatorProxy>,
169 reader: ffxfs::BlobReaderProxy,
170}
171
172impl Client {
173 pub fn new(
177 dir: fio::DirectoryProxy,
178 creator: Option<ffxfs::BlobCreatorProxy>,
179 reader: ffxfs::BlobReaderProxy,
180 vmex: Option<zx::Resource>,
181 ) -> Result<Self, anyhow::Error> {
182 if let Some(vmex) = vmex {
183 vmo_blob::init_vmex_resource(vmex)?;
184 }
185 Ok(Self { dir, creator, reader })
186 }
187
188 pub fn new_test() -> (
195 Self,
196 fio::DirectoryRequestStream,
197 ffxfs::BlobReaderRequestStream,
198 ffxfs::BlobCreatorRequestStream,
199 ) {
200 let (dir, dir_stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
201 let (reader, reader_stream) =
202 fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobReaderMarker>();
203 let (creator, creator_stream) =
204 fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
205
206 (Self { dir, creator: Some(creator), reader }, dir_stream, reader_stream, creator_stream)
207 }
208
209 pub fn new_mock() -> (Self, mock::Mock) {
216 let (dir, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
217 let (reader, reader_stream) =
218 fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobReaderMarker>();
219 let (creator, creator_stream) =
220 fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
221
222 (
223 Self { dir, creator: Some(creator), reader },
224 mock::Mock { stream, reader_stream, creator_stream },
225 )
226 }
227
228 pub async fn get_blob_vmo(&self, hash: &Hash) -> Result<zx::Vmo, GetBlobVmoError> {
230 self.reader
231 .get_vmo(hash)
232 .await
233 .map_err(GetBlobVmoError::Fidl)?
234 .map_err(|s| GetBlobVmoError::GetVmo(Status::from_raw(s)))
235 }
236
237 pub fn deprecated_open_blob_for_read(
239 &self,
240 blob: &Hash,
241 flags: fio::OpenFlags,
242 scope: ExecutionScope,
243 server_end: ServerEnd<fio::NodeMarker>,
244 ) -> Result<(), fidl::Error> {
245 let describe = flags.contains(fio::OpenFlags::DESCRIBE);
246 if flags.contains(fio::OpenFlags::RIGHT_WRITABLE) {
248 send_on_open_with_error(describe, server_end, zx::Status::ACCESS_DENIED);
249 return Ok(());
250 }
251 if flags.intersects(fio::OpenFlags::CREATE | fio::OpenFlags::CREATE_IF_ABSENT) {
253 send_on_open_with_error(describe, server_end, zx::Status::NOT_SUPPORTED);
254 return Ok(());
255 }
256 let object_request = flags.to_object_request(server_end);
257 let () = open_blob_with_reader(self.reader.clone(), *blob, scope, flags, object_request);
258 Ok(())
259 }
260
261 pub fn open_blob_for_read(
263 &self,
264 blob: &Hash,
265 flags: fio::Flags,
266 scope: ExecutionScope,
267 object_request: ObjectRequestRef<'_>,
268 ) -> Result<(), zx::Status> {
269 if flags.rights().is_some_and(|rights| rights.contains(fio::Operations::WRITE_BYTES)) {
270 return Err(zx::Status::ACCESS_DENIED);
271 }
272 if flags.creation_mode() != vfs::CreationMode::Never {
273 return Err(zx::Status::NOT_SUPPORTED);
274 }
275 let object_request = object_request.take();
277 let () = open_blob_with_reader(self.reader.clone(), *blob, scope, flags, object_request);
278 Ok(())
279 }
280
281 pub async fn list_known_blobs(&self) -> Result<HashSet<Hash>, BlobfsError> {
283 let private_connection = fuchsia_fs::directory::clone(&self.dir)?;
289 fuchsia_fs::directory::readdir(&private_connection)
290 .await
291 .map_err(BlobfsError::ReadDir)?
292 .into_iter()
293 .filter(|entry| entry.kind == fuchsia_fs::directory::DirentKind::File)
294 .map(|entry| entry.name.parse().map_err(BlobfsError::ParseHash))
295 .collect()
296 }
297
298 pub async fn delete_blob(&self, blob: &Hash) -> Result<(), BlobfsError> {
300 self.dir
301 .unlink(&blob.to_string(), &fio::UnlinkOptions::default())
302 .await?
303 .map_err(|s| BlobfsError::Unlink(Status::from_raw(s)))
304 }
305
306 pub async fn open_blob_for_write(
308 &self,
309 blob: &Hash,
310 ) -> Result<ClientEnd<ffxfs::BlobWriterMarker>, CreateError> {
311 let Some(creator) = &self.creator else {
312 return Err(CreateError::WritingNotConfigured);
313 };
314 Ok(creator.create(blob, false).await??)
315 }
316
317 pub async fn has_blob(&self, blob: &Hash) -> bool {
324 matches!(self.reader.get_vmo(blob).await, Ok(Ok(_)))
326 }
327
328 pub async fn filter_to_missing_blobs(
339 &self,
340 candidates: impl IntoIterator<Item = Hash>,
341 ) -> HashSet<Hash> {
342 stream::iter(candidates)
350 .map(move |blob| async move {
351 if self.has_blob(&blob).await {
352 None
353 } else {
354 Some(blob)
355 }
356 })
357 .buffer_unordered(10)
360 .filter_map(|blob| async move { blob })
361 .collect()
362 .await
363 }
364
365 pub async fn sync(&self) -> Result<(), BlobfsError> {
367 self.dir.sync().await?.map_err(zx::Status::from_raw).map_err(BlobfsError::Sync)
368 }
369}
370
371fn open_blob_with_reader<P: ProtocolsExt + Send>(
374 reader: ffxfs::BlobReaderProxy,
375 blob_hash: Hash,
376 scope: ExecutionScope,
377 protocols: P,
378 object_request: ObjectRequest,
379) {
380 scope.clone().spawn(object_request.handle_async(async move |object_request| {
381 let get_vmo_result = reader.get_vmo(&blob_hash.into()).await.map_err(|fidl_error| {
382 if let fidl::Error::ClientChannelClosed { status, .. } = fidl_error {
383 error!("Blob reader channel closed: {:?}", status);
384 status
385 } else {
386 error!("Transport error on get_vmo: {:?}", fidl_error);
387 zx::Status::INTERNAL
388 }
389 })?;
390 let vmo = get_vmo_result.map_err(zx::Status::from_raw)?;
391 let vmo_blob = vmo_blob::VmoBlob::new(vmo);
392 object_request
393 .create_connection::<StreamIoConnection<_>, _>(scope, vmo_blob, protocols)
394 .await
395 }));
396}
397
398#[derive(thiserror::Error, Debug)]
399#[allow(missing_docs)]
400pub enum GetBlobVmoError {
401 #[error("getting the vmo")]
402 GetVmo(#[source] Status),
403
404 #[error("opening the blob")]
405 OpenBlob(#[source] fuchsia_fs::node::OpenError),
406
407 #[error("making a fidl request")]
408 Fidl(#[source] fidl::Error),
409}
410
411#[cfg(test)]
412impl Client {
413 pub fn for_ramdisk(blobfs: &blobfs_ramdisk::BlobfsRamdisk) -> Self {
424 Self::new(
425 blobfs.root_dir_proxy().unwrap(),
426 Some(blobfs.blob_creator_proxy().unwrap()),
427 blobfs.blob_reader_proxy().unwrap(),
428 None,
429 )
430 .unwrap()
431 }
432}
433
434#[cfg(test)]
435#[allow(clippy::bool_assert_comparison)]
436mod tests {
437 use super::*;
438 use assert_matches::assert_matches;
439 use blobfs_ramdisk::BlobfsRamdisk;
440 use fuchsia_async as fasync;
441 use futures::stream::TryStreamExt as _;
442 use std::sync::Arc;
443 use test_case::test_case;
444
445 #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
446 #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
447 #[fuchsia::test]
448 async fn list_known_blobs_empty(blob_impl: blobfs_ramdisk::Implementation) {
449 let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
450 let client = Client::for_ramdisk(&blobfs);
451
452 assert_eq!(client.list_known_blobs().await.unwrap(), HashSet::new());
453 blobfs.stop().await.unwrap();
454 }
455
456 #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
457 #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
458 #[fuchsia::test]
459 async fn list_known_blobs(blob_impl: blobfs_ramdisk::Implementation) {
460 let blobfs = BlobfsRamdisk::builder()
461 .implementation(blob_impl)
462 .with_blob(&b"blob 1"[..])
463 .with_blob(&b"blob 2"[..])
464 .start()
465 .await
466 .unwrap();
467 let client = Client::for_ramdisk(&blobfs);
468
469 let expected = blobfs.list_blobs().unwrap().into_iter().collect();
470 assert_eq!(client.list_known_blobs().await.unwrap(), expected);
471 blobfs.stop().await.unwrap();
472 }
473
474 #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
475 #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
476 #[fuchsia::test]
477 async fn delete_blob_and_then_list(blob_impl: blobfs_ramdisk::Implementation) {
478 let blobfs = BlobfsRamdisk::builder()
479 .implementation(blob_impl)
480 .with_blob(&b"blob 1"[..])
481 .with_blob(&b"blob 2"[..])
482 .start()
483 .await
484 .unwrap();
485 let client = Client::for_ramdisk(&blobfs);
486
487 let merkle = fuchsia_merkle::from_slice(&b"blob 1"[..]).root();
488 assert_matches!(client.delete_blob(&merkle).await, Ok(()));
489
490 let expected = HashSet::from([fuchsia_merkle::from_slice(&b"blob 2"[..]).root()]);
491 assert_eq!(client.list_known_blobs().await.unwrap(), expected);
492 blobfs.stop().await.unwrap();
493 }
494
495 #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
496 #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
497 #[fuchsia::test]
498 async fn delete_nonexistent_blob(blob_impl: blobfs_ramdisk::Implementation) {
499 let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
500 let client = Client::for_ramdisk(&blobfs);
501 let blob_merkle = Hash::from([1; 32]);
502
503 assert_matches!(
504 client.delete_blob(&blob_merkle).await,
505 Err(BlobfsError::Unlink(Status::NOT_FOUND))
506 );
507 blobfs.stop().await.unwrap();
508 }
509
510 #[fuchsia::test]
511 async fn delete_blob_mock() {
512 let (client, mut stream, _, _) = Client::new_test();
513 let blob_merkle = Hash::from([1; 32]);
514 fasync::Task::spawn(async move {
515 match stream.try_next().await.unwrap().unwrap() {
516 fio::DirectoryRequest::Unlink { name, responder, .. } => {
517 assert_eq!(name, blob_merkle.to_string());
518 responder.send(Ok(())).unwrap();
519 }
520 other => panic!("unexpected request: {other:?}"),
521 }
522 })
523 .detach();
524
525 assert_matches!(client.delete_blob(&blob_merkle).await, Ok(()));
526 }
527
528 #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
529 #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
530 #[fuchsia::test]
531 async fn has_blob(blob_impl: blobfs_ramdisk::Implementation) {
532 let blobfs = BlobfsRamdisk::builder()
533 .implementation(blob_impl)
534 .with_blob(&b"blob 1"[..])
535 .start()
536 .await
537 .unwrap();
538 let client = Client::for_ramdisk(&blobfs);
539
540 assert!(client.has_blob(&fuchsia_merkle::from_slice(&b"blob 1"[..]).root()).await);
541 assert!(!client.has_blob(&Hash::from([1; 32])).await);
542
543 blobfs.stop().await.unwrap();
544 }
545
546 #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
547 #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
548 #[fuchsia::test]
549 async fn has_blob_return_false_if_blob_is_partially_written(
550 blob_impl: blobfs_ramdisk::Implementation,
551 ) {
552 let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
553 let client = Client::for_ramdisk(&blobfs);
554
555 let content = [3; 1024];
556 let hash = fuchsia_merkle::from_slice(&content).root();
557 let delivery_content =
558 delivery_blob::Type1Blob::generate(&content, delivery_blob::CompressionMode::Always);
559
560 let writer = client.open_blob_for_write(&hash).await.unwrap().into_proxy();
561 assert!(!client.has_blob(&hash).await);
562
563 let n = delivery_content.len();
564 let vmo = writer.get_vmo(n.try_into().unwrap()).await.unwrap().unwrap();
565 assert!(!client.has_blob(&hash).await);
566
567 let () = vmo.write(&delivery_content[0..n - 1], 0).unwrap();
568 let () = writer.bytes_ready((n - 1).try_into().unwrap()).await.unwrap().unwrap();
569 assert!(!client.has_blob(&hash).await);
570
571 let () = vmo.write(&delivery_content[n - 1..], (n - 1).try_into().unwrap()).unwrap();
572 let () = writer.bytes_ready(1.try_into().unwrap()).await.unwrap().unwrap();
573 assert!(client.has_blob(&hash).await);
574
575 blobfs.stop().await.unwrap();
576 }
577
578 async fn fully_write_blob(client: &Client, content: &[u8]) -> Hash {
579 let hash = fuchsia_merkle::from_slice(content).root();
580 let delivery_content =
581 delivery_blob::Type1Blob::generate(content, delivery_blob::CompressionMode::Always);
582 let writer = client.open_blob_for_write(&hash).await.unwrap().into_proxy();
583 let vmo = writer
584 .get_vmo(delivery_content.len().try_into().unwrap())
585 .await
586 .expect("a")
587 .map_err(zx::Status::from_raw)
588 .expect("b");
589 let () = vmo.write(&delivery_content, 0).unwrap();
590 let () =
591 writer.bytes_ready(delivery_content.len().try_into().unwrap()).await.unwrap().unwrap();
592 hash
593 }
594
595 #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
596 #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
597 #[fuchsia::test]
598 async fn filter_to_missing_blobs(blob_impl: blobfs_ramdisk::Implementation) {
599 let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
600 let client = Client::for_ramdisk(&blobfs);
601
602 let missing_hash0 = Hash::from([0; 32]);
603 let missing_hash1 = Hash::from([1; 32]);
604
605 let present_blob0 = fully_write_blob(&client, &[2; 1024]).await;
606 let present_blob1 = fully_write_blob(&client, &[3; 1024]).await;
607
608 assert_eq!(
609 client
610 .filter_to_missing_blobs([
611 missing_hash0,
612 missing_hash1,
613 present_blob0,
614 present_blob1
615 ])
616 .await,
617 HashSet::from([missing_hash0, missing_hash1])
618 );
619
620 blobfs.stop().await.unwrap();
621 }
622
623 #[fuchsia::test]
624 async fn sync() {
625 let counter = Arc::new(std::sync::atomic::AtomicU32::new(0));
626 let counter_clone = Arc::clone(&counter);
627 let (client, mut stream, _, _) = Client::new_test();
628 fasync::Task::spawn(async move {
629 match stream.try_next().await.unwrap().unwrap() {
630 fio::DirectoryRequest::Sync { responder } => {
631 counter_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
632 responder.send(Ok(())).unwrap();
633 }
634 other => panic!("unexpected request: {other:?}"),
635 }
636 })
637 .detach();
638
639 assert_matches!(client.sync().await, Ok(()));
640 assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1);
641 }
642
643 #[fuchsia::test]
644 async fn open_blob_for_write_maps_already_exists() {
645 let (blob_creator, mut blob_creator_stream) =
646 fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
647 let (blob_reader, _) = fidl::endpoints::create_proxy::<ffxfs::BlobReaderMarker>();
648
649 let client = Client::new(
650 fidl::endpoints::create_proxy::<fio::DirectoryMarker>().0,
651 Some(blob_creator),
652 blob_reader,
653 None,
654 )
655 .unwrap();
656
657 fuchsia_async::Task::spawn(async move {
658 match blob_creator_stream.next().await.unwrap().unwrap() {
659 ffxfs::BlobCreatorRequest::Create { hash, allow_existing, responder } => {
660 assert_eq!(hash, [0; 32]);
661 assert!(!allow_existing);
662 let () = responder.send(Err(ffxfs::CreateBlobError::AlreadyExists)).unwrap();
663 }
664 }
665 })
666 .detach();
667
668 assert_matches!(
669 client.open_blob_for_write(&[0; 32].into()).await,
670 Err(CreateError::AlreadyExists)
671 );
672 }
673
674 #[fuchsia::test]
675 async fn concurrent_list_known_blobs_all_return_full_contents() {
676 use futures::StreamExt;
677 let blobfs = BlobfsRamdisk::builder().start().await.unwrap();
678 let client = Client::for_ramdisk(&blobfs);
679
680 for i in 0..256u16 {
687 let _: Hash = fully_write_blob(&client, i.to_le_bytes().as_slice()).await;
688 }
689
690 let () = futures::stream::iter(0..100)
691 .for_each_concurrent(None, |_| async {
692 assert_eq!(client.list_known_blobs().await.unwrap().len(), 256);
693 })
694 .await;
695 }
696}