1#![deny(missing_docs)]
6
7use fidl::endpoints::ClientEnd;
10use fuchsia_hash::{Hash, ParseHashError};
11use futures::{StreamExt as _, stream};
12use log::{error, info};
13use std::collections::HashSet;
14use thiserror::Error;
15use vfs::execution_scope::ExecutionScope;
16use vfs::file::StreamIoConnection;
17use vfs::{ObjectRequest, ObjectRequestRef, ProtocolsExt};
18use zx::{self as zx, Status};
19use {fidl_fuchsia_fxfs as ffxfs, fidl_fuchsia_io as fio};
20
21pub mod mock;
22pub use mock::Mock;
23
24#[derive(Debug, Error)]
25#[allow(missing_docs)]
26pub enum BlobStatusError {
27 #[error("this client was not created with a blob creator so it cannot write blobs")]
28 WritingNotConfigured,
29
30 #[error("the fidl call returned an unexpected error")]
31 NeedsOverwrite(#[source] Status),
32}
33
34#[derive(Debug, Error)]
36#[allow(missing_docs)]
37pub enum BlobfsError {
38 #[error("while opening blobfs dir")]
39 OpenDir(#[from] fuchsia_fs::node::OpenError),
40
41 #[error("while cloning the blobfs dir")]
42 CloneDir(#[from] fuchsia_fs::node::CloneError),
43
44 #[error("while listing blobfs dir")]
45 ReadDir(#[source] fuchsia_fs::directory::EnumerateError),
46
47 #[error("while deleting blob")]
48 Unlink(#[source] Status),
49
50 #[error("while sync'ing")]
51 Sync(#[source] Status),
52
53 #[error("while parsing blob merkle hash")]
54 ParseHash(#[from] ParseHashError),
55
56 #[error("FIDL error")]
57 Fidl(#[from] fidl::Error),
58
59 #[error("while connecting to fuchsia.fxfs/BlobCreator")]
60 ConnectToBlobCreator(#[source] anyhow::Error),
61
62 #[error("while connecting to fuchsia.fxfs/BlobReader")]
63 ConnectToBlobReader(#[source] anyhow::Error),
64
65 #[error("while setting the VmexResource")]
66 InitVmexResource(#[source] anyhow::Error),
67
68 #[error("while checking NeedsOverwrite for blob status")]
69 BlobStatus(BlobStatusError),
70}
71
72#[derive(Debug, Error)]
74#[allow(missing_docs)]
75pub enum CreateError {
76 #[error("the blob already exists or is being concurrently written")]
77 AlreadyExists,
78
79 #[error("while creating the blob")]
80 Io(#[source] fuchsia_fs::node::OpenError),
81
82 #[error("while converting the proxy into a client end")]
83 ConvertToClientEnd,
84
85 #[error("FIDL error")]
86 Fidl(#[from] fidl::Error),
87
88 #[error("while calling fuchsia.fxfs/BlobCreator.Create: {0:?}")]
89 BlobCreator(ffxfs::CreateBlobError),
90
91 #[error("this client was not created with a blob creator so it cannot write blobs")]
92 WritingNotConfigured,
93}
94
95pub enum BlobStatus {
97 UpToDate,
99
100 NeedsOverwrite,
102
103 Absent,
105}
106
107impl From<ffxfs::CreateBlobError> for CreateError {
108 fn from(e: ffxfs::CreateBlobError) -> Self {
109 match e {
110 ffxfs::CreateBlobError::AlreadyExists => CreateError::AlreadyExists,
111 e @ ffxfs::CreateBlobError::Internal => CreateError::BlobCreator(e),
112 }
113 }
114}
115
116#[derive(Default)]
118pub struct ClientBuilder {
119 readable: bool,
120 writable: bool,
121 executable: bool,
122}
123
124impl ClientBuilder {
125 pub async fn build(self) -> Result<Client, BlobfsError> {
129 let mut flags = fio::Flags::empty();
130 if self.readable {
131 flags |= fio::PERM_READABLE
132 }
133 if self.writable {
134 flags |= fio::PERM_WRITABLE
135 }
136 if self.executable {
137 flags |= fio::PERM_EXECUTABLE
138 }
139 let dir = fuchsia_fs::directory::open_in_namespace("/blob", flags)?;
140 if let Ok(client) = fuchsia_component::client::connect_to_protocol::<
141 fidl_fuchsia_kernel::VmexResourceMarker,
142 >() && let Ok(vmex) = client.get().await
143 {
144 info!("Got vmex resource");
145 vmo_blob::init_vmex_resource(vmex).map_err(BlobfsError::InitVmexResource)?;
146 }
147 let reader = fuchsia_component::client::connect_to_protocol::<ffxfs::BlobReaderMarker>()
148 .map_err(BlobfsError::ConnectToBlobReader)?;
149 let creator = if self.writable {
150 Some(
151 fuchsia_component::client::connect_to_protocol::<ffxfs::BlobCreatorMarker>()
152 .map_err(BlobfsError::ConnectToBlobCreator)?,
153 )
154 } else {
155 None
156 };
157
158 Ok(Client { dir, creator, reader })
159 }
160
161 pub fn readable(self) -> Self {
164 Self { readable: true, ..self }
165 }
166
167 pub fn writable(self) -> Self {
171 Self { writable: true, ..self }
172 }
173
174 pub fn executable(self) -> Self {
177 Self { executable: true, ..self }
178 }
179}
180
181impl Client {
182 pub fn builder() -> ClientBuilder {
184 Default::default()
185 }
186}
187#[derive(Debug, Clone)]
189pub struct Client {
190 dir: fio::DirectoryProxy,
191 creator: Option<ffxfs::BlobCreatorProxy>,
192 reader: ffxfs::BlobReaderProxy,
193}
194
195impl Client {
196 pub fn new(
200 dir: fio::DirectoryProxy,
201 creator: Option<ffxfs::BlobCreatorProxy>,
202 reader: ffxfs::BlobReaderProxy,
203 vmex: Option<zx::Resource>,
204 ) -> Result<Self, anyhow::Error> {
205 if let Some(vmex) = vmex {
206 vmo_blob::init_vmex_resource(vmex)?;
207 }
208 Ok(Self { dir, creator, reader })
209 }
210
211 pub fn new_test() -> (
218 Self,
219 fio::DirectoryRequestStream,
220 ffxfs::BlobReaderRequestStream,
221 ffxfs::BlobCreatorRequestStream,
222 ) {
223 let (dir, dir_stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
224 let (reader, reader_stream) =
225 fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobReaderMarker>();
226 let (creator, creator_stream) =
227 fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
228
229 (Self { dir, creator: Some(creator), reader }, dir_stream, reader_stream, creator_stream)
230 }
231
232 pub fn new_mock() -> (Self, mock::Mock) {
239 let (dir, stream) = fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
240 let (reader, reader_stream) =
241 fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobReaderMarker>();
242 let (creator, creator_stream) =
243 fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
244
245 (
246 Self { dir, creator: Some(creator), reader },
247 mock::Mock { stream, reader_stream, creator_stream },
248 )
249 }
250
251 pub async fn get_blob_vmo(&self, hash: &Hash) -> Result<zx::Vmo, GetBlobVmoError> {
253 self.reader
254 .get_vmo(hash)
255 .await
256 .map_err(GetBlobVmoError::Fidl)?
257 .map_err(|s| GetBlobVmoError::GetVmo(Status::from_raw(s)))
258 }
259
260 pub fn open_blob_for_read(
263 &self,
264 blob: &Hash,
265 flags: fio::Flags,
266 scope: ExecutionScope,
267 object_request: ObjectRequestRef<'_>,
268 ) -> Result<(), zx::Status> {
269 if flags.rights().is_some_and(|rights| rights.contains(fio::Operations::WRITE_BYTES)) {
270 return Err(zx::Status::ACCESS_DENIED);
271 }
272 if flags.creation_mode() != vfs::CreationMode::Never {
273 return Err(zx::Status::NOT_SUPPORTED);
274 }
275 let object_request = object_request.take();
277 let () = open_blob_with_reader(self.reader.clone(), *blob, scope, flags, object_request);
278 Ok(())
279 }
280
281 pub async fn list_known_blobs(&self) -> Result<HashSet<Hash>, BlobfsError> {
283 let private_connection = fuchsia_fs::directory::clone(&self.dir)?;
289 fuchsia_fs::directory::readdir(&private_connection)
290 .await
291 .map_err(BlobfsError::ReadDir)?
292 .into_iter()
293 .filter(|entry| entry.kind == fuchsia_fs::directory::DirentKind::File)
294 .map(|entry| entry.name.parse().map_err(BlobfsError::ParseHash))
295 .collect()
296 }
297
298 pub async fn delete_blob(&self, blob: &Hash) -> Result<(), BlobfsError> {
300 self.dir
301 .unlink(&blob.to_string(), &fio::UnlinkOptions::default())
302 .await?
303 .map_err(|s| BlobfsError::Unlink(Status::from_raw(s)))
304 }
305
306 pub async fn open_blob_for_write(
308 &self,
309 blob: &Hash,
310 allow_existing: bool,
311 ) -> Result<ClientEnd<ffxfs::BlobWriterMarker>, CreateError> {
312 let Some(creator) = &self.creator else {
313 return Err(CreateError::WritingNotConfigured);
314 };
315 Ok(creator.create(blob, allow_existing).await??)
316 }
317
318 pub async fn blob_present_and_up_to_date(&self, blob: &Hash) -> bool {
320 matches!(
323 self.creator.as_ref().expect("Missing BlobCreator access").needs_overwrite(blob).await,
324 Ok(Ok(false))
325 )
326 }
327
328 pub async fn blob_status(&self, blob: &Hash) -> Result<BlobStatus, BlobfsError> {
330 let Some(creator) = &self.creator else {
331 return Err(BlobfsError::BlobStatus(BlobStatusError::WritingNotConfigured));
332 };
333 match creator.needs_overwrite(blob).await? {
334 Ok(true) => Ok(BlobStatus::NeedsOverwrite),
335 Ok(false) => Ok(BlobStatus::UpToDate),
336 Err(status) if status == Status::NOT_FOUND.into_raw() => Ok(BlobStatus::Absent),
337 Err(s) => {
338 Err(BlobfsError::BlobStatus(BlobStatusError::NeedsOverwrite(Status::from_raw(s))))
339 }
340 }
341 }
342
343 pub async fn filter_to_missing_blobs(
354 &self,
355 candidates: impl IntoIterator<Item = Hash>,
356 ) -> HashSet<Hash> {
357 stream::iter(candidates)
365 .map(move |blob| async move {
366 if self.blob_present_and_up_to_date(&blob).await { None } else { Some(blob) }
367 })
368 .buffer_unordered(10)
371 .filter_map(|blob| async move { blob })
372 .collect()
373 .await
374 }
375
376 pub async fn sync(&self) -> Result<(), BlobfsError> {
378 self.dir.sync().await?.map_err(zx::Status::from_raw).map_err(BlobfsError::Sync)
379 }
380}
381
382fn open_blob_with_reader<P: ProtocolsExt + Send>(
385 reader: ffxfs::BlobReaderProxy,
386 blob_hash: Hash,
387 scope: ExecutionScope,
388 protocols: P,
389 object_request: ObjectRequest,
390) {
391 scope.clone().spawn(object_request.handle_async(async move |object_request| {
392 let get_vmo_result = reader.get_vmo(&blob_hash.into()).await.map_err(|fidl_error| {
393 if let fidl::Error::ClientChannelClosed { status, .. } = fidl_error {
394 error!("Blob reader channel closed: {:?}", status);
395 status
396 } else {
397 error!("Transport error on get_vmo: {:?}", fidl_error);
398 zx::Status::INTERNAL
399 }
400 })?;
401 let vmo = get_vmo_result.map_err(zx::Status::from_raw)?;
402 let vmo_blob = vmo_blob::VmoBlob::new(vmo);
403 object_request
404 .create_connection::<StreamIoConnection<_>, _>(scope, vmo_blob, protocols)
405 .await
406 }));
407}
408
409#[derive(thiserror::Error, Debug)]
410#[allow(missing_docs)]
411pub enum GetBlobVmoError {
412 #[error("getting the vmo")]
413 GetVmo(#[source] Status),
414
415 #[error("opening the blob")]
416 OpenBlob(#[source] fuchsia_fs::node::OpenError),
417
418 #[error("making a fidl request")]
419 Fidl(#[source] fidl::Error),
420}
421
422#[cfg(test)]
423impl Client {
424 pub fn for_ramdisk(blobfs: &blobfs_ramdisk::BlobfsRamdisk) -> Self {
435 Self::new(
436 blobfs.root_dir_proxy().unwrap(),
437 Some(blobfs.blob_creator_proxy().unwrap()),
438 blobfs.blob_reader_proxy().unwrap(),
439 None,
440 )
441 .unwrap()
442 }
443}
444
445#[cfg(test)]
446#[allow(clippy::bool_assert_comparison)]
447mod tests {
448 use super::*;
449 use assert_matches::assert_matches;
450 use blobfs_ramdisk::BlobfsRamdisk;
451 use fuchsia_async as fasync;
452 use futures::stream::TryStreamExt as _;
453 use std::sync::Arc;
454 use test_case::test_case;
455
456 #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
457 #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
458 #[fuchsia::test]
459 async fn list_known_blobs_empty(blob_impl: blobfs_ramdisk::Implementation) {
460 let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
461 let client = Client::for_ramdisk(&blobfs);
462
463 assert_eq!(client.list_known_blobs().await.unwrap(), HashSet::new());
464 blobfs.stop().await.unwrap();
465 }
466
467 #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
468 #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
469 #[fuchsia::test]
470 async fn list_known_blobs(blob_impl: blobfs_ramdisk::Implementation) {
471 let blobfs = BlobfsRamdisk::builder()
472 .implementation(blob_impl)
473 .with_blob(&b"blob 1"[..])
474 .with_blob(&b"blob 2"[..])
475 .start()
476 .await
477 .unwrap();
478 let client = Client::for_ramdisk(&blobfs);
479
480 let expected = blobfs.list_blobs().unwrap().into_iter().collect();
481 assert_eq!(client.list_known_blobs().await.unwrap(), expected);
482 blobfs.stop().await.unwrap();
483 }
484
485 #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
486 #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
487 #[fuchsia::test]
488 async fn delete_blob_and_then_list(blob_impl: blobfs_ramdisk::Implementation) {
489 let blobfs = BlobfsRamdisk::builder()
490 .implementation(blob_impl)
491 .with_blob(&b"blob 1"[..])
492 .with_blob(&b"blob 2"[..])
493 .start()
494 .await
495 .unwrap();
496 let client = Client::for_ramdisk(&blobfs);
497
498 let merkle = fuchsia_merkle::root_from_slice(b"blob 1");
499 assert_matches!(client.delete_blob(&merkle).await, Ok(()));
500
501 let expected = HashSet::from([fuchsia_merkle::root_from_slice(b"blob 2")]);
502 assert_eq!(client.list_known_blobs().await.unwrap(), expected);
503 blobfs.stop().await.unwrap();
504 }
505
506 #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
507 #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
508 #[fuchsia::test]
509 async fn delete_nonexistent_blob(blob_impl: blobfs_ramdisk::Implementation) {
510 let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
511 let client = Client::for_ramdisk(&blobfs);
512 let blob_merkle = Hash::from([1; 32]);
513
514 assert_matches!(
515 client.delete_blob(&blob_merkle).await,
516 Err(BlobfsError::Unlink(Status::NOT_FOUND))
517 );
518 blobfs.stop().await.unwrap();
519 }
520
521 #[fuchsia::test]
522 async fn delete_blob_mock() {
523 let (client, mut stream, _, _) = Client::new_test();
524 let blob_merkle = Hash::from([1; 32]);
525 fasync::Task::spawn(async move {
526 match stream.try_next().await.unwrap().unwrap() {
527 fio::DirectoryRequest::Unlink { name, responder, .. } => {
528 assert_eq!(name, blob_merkle.to_string());
529 responder.send(Ok(())).unwrap();
530 }
531 other => panic!("unexpected request: {other:?}"),
532 }
533 })
534 .detach();
535
536 assert_matches!(client.delete_blob(&blob_merkle).await, Ok(()));
537 }
538
539 #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
540 #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
541 #[fuchsia::test]
542 async fn has_blob(blob_impl: blobfs_ramdisk::Implementation) {
543 let blobfs = BlobfsRamdisk::builder()
544 .implementation(blob_impl)
545 .with_blob(&b"blob 1"[..])
546 .start()
547 .await
548 .unwrap();
549 let client = Client::for_ramdisk(&blobfs);
550
551 assert!(
552 client.blob_present_and_up_to_date(&fuchsia_merkle::root_from_slice(b"blob 1")).await
553 );
554 assert!(!client.blob_present_and_up_to_date(&Hash::from([1; 32])).await);
555
556 blobfs.stop().await.unwrap();
557 }
558
559 #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
560 #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
561 #[fuchsia::test]
562 async fn has_blob_return_false_if_blob_is_partially_written(
563 blob_impl: blobfs_ramdisk::Implementation,
564 ) {
565 let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
566 let client = Client::for_ramdisk(&blobfs);
567
568 let content = &[3; 1024];
569 let hash = fuchsia_merkle::root_from_slice(content);
570 let delivery_content =
571 delivery_blob::Type1Blob::generate(content, delivery_blob::CompressionMode::Always);
572
573 let writer = client.open_blob_for_write(&hash, false).await.unwrap().into_proxy();
574 assert!(!client.blob_present_and_up_to_date(&hash).await);
575
576 let n = delivery_content.len();
577 let vmo = writer.get_vmo(n.try_into().unwrap()).await.unwrap().unwrap();
578 assert!(!client.blob_present_and_up_to_date(&hash).await);
579
580 let () = vmo.write(&delivery_content[0..n - 1], 0).unwrap();
581 let () = writer.bytes_ready((n - 1).try_into().unwrap()).await.unwrap().unwrap();
582 assert!(!client.blob_present_and_up_to_date(&hash).await);
583
584 let () = vmo.write(&delivery_content[n - 1..], (n - 1).try_into().unwrap()).unwrap();
585 let () = writer.bytes_ready(1.try_into().unwrap()).await.unwrap().unwrap();
586 assert!(client.blob_present_and_up_to_date(&hash).await);
587
588 blobfs.stop().await.unwrap();
589 }
590
591 async fn fully_write_blob(client: &Client, content: &[u8]) -> Hash {
592 let hash = fuchsia_merkle::root_from_slice(content);
593 let delivery_content =
594 delivery_blob::Type1Blob::generate(content, delivery_blob::CompressionMode::Always);
595 let writer = client.open_blob_for_write(&hash, false).await.unwrap().into_proxy();
596 let vmo = writer
597 .get_vmo(delivery_content.len().try_into().unwrap())
598 .await
599 .expect("a")
600 .map_err(zx::Status::from_raw)
601 .expect("b");
602 let () = vmo.write(&delivery_content, 0).unwrap();
603 let () =
604 writer.bytes_ready(delivery_content.len().try_into().unwrap()).await.unwrap().unwrap();
605 hash
606 }
607
608 #[test_case(blobfs_ramdisk::Implementation::CppBlobfs; "cpp_blobfs")]
609 #[test_case(blobfs_ramdisk::Implementation::Fxblob; "fxblob")]
610 #[fuchsia::test]
611 async fn filter_to_missing_blobs(blob_impl: blobfs_ramdisk::Implementation) {
612 let blobfs = BlobfsRamdisk::builder().implementation(blob_impl).start().await.unwrap();
613 let client = Client::for_ramdisk(&blobfs);
614
615 let missing_hash0 = Hash::from([0; 32]);
616 let missing_hash1 = Hash::from([1; 32]);
617
618 let present_blob0 = fully_write_blob(&client, &[2; 1024]).await;
619 let present_blob1 = fully_write_blob(&client, &[3; 1024]).await;
620
621 assert_eq!(
622 client
623 .filter_to_missing_blobs([
624 missing_hash0,
625 missing_hash1,
626 present_blob0,
627 present_blob1
628 ])
629 .await,
630 HashSet::from([missing_hash0, missing_hash1])
631 );
632
633 blobfs.stop().await.unwrap();
634 }
635
636 #[fuchsia::test]
637 async fn sync() {
638 let counter = Arc::new(std::sync::atomic::AtomicU32::new(0));
639 let counter_clone = Arc::clone(&counter);
640 let (client, mut stream, _, _) = Client::new_test();
641 fasync::Task::spawn(async move {
642 match stream.try_next().await.unwrap().unwrap() {
643 fio::DirectoryRequest::Sync { responder } => {
644 counter_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
645 responder.send(Ok(())).unwrap();
646 }
647 other => panic!("unexpected request: {other:?}"),
648 }
649 })
650 .detach();
651
652 assert_matches!(client.sync().await, Ok(()));
653 assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1);
654 }
655
656 #[fuchsia::test]
657 async fn open_blob_for_write_maps_already_exists() {
658 let (blob_creator, mut blob_creator_stream) =
659 fidl::endpoints::create_proxy_and_stream::<ffxfs::BlobCreatorMarker>();
660 let (blob_reader, _) = fidl::endpoints::create_proxy::<ffxfs::BlobReaderMarker>();
661
662 let client = Client::new(
663 fidl::endpoints::create_proxy::<fio::DirectoryMarker>().0,
664 Some(blob_creator),
665 blob_reader,
666 None,
667 )
668 .unwrap();
669
670 fuchsia_async::Task::spawn(async move {
671 match blob_creator_stream.next().await.unwrap().unwrap() {
672 ffxfs::BlobCreatorRequest::Create { hash, allow_existing, responder } => {
673 assert_eq!(hash, [0; 32]);
674 assert!(!allow_existing);
675 let () = responder.send(Err(ffxfs::CreateBlobError::AlreadyExists)).unwrap();
676 }
677 ffxfs::BlobCreatorRequest::NeedsOverwrite { .. } => {
678 unreachable!("This code path is not yet exercised.");
679 }
680 }
681 })
682 .detach();
683
684 assert_matches!(
685 client.open_blob_for_write(&[0; 32].into(), false).await,
686 Err(CreateError::AlreadyExists)
687 );
688 }
689
690 #[fuchsia::test]
691 async fn concurrent_list_known_blobs_all_return_full_contents() {
692 use futures::StreamExt;
693 let blobfs = BlobfsRamdisk::builder().start().await.unwrap();
694 let client = Client::for_ramdisk(&blobfs);
695
696 for i in 0..256u16 {
703 let _: Hash = fully_write_blob(&client, i.to_le_bytes().as_slice()).await;
704 }
705
706 let () = futures::stream::iter(0..100)
707 .for_each_concurrent(None, |_| async {
708 assert_eq!(client.list_known_blobs().await.unwrap().len(), 256);
709 })
710 .await;
711 }
712}