1#![deny(missing_docs)]
6#![allow(clippy::let_unit_value)]
7
8use anyhow::{Context as _, Error, anyhow};
11use delivery_blob::{CompressionMode, Type1Blob};
12use fidl::endpoints::{ClientEnd, DiscoverableProtocolMarker};
13use fidl_fuchsia_fs_startup::{CreateOptions, MountOptions};
14use fidl_fuchsia_fxfs as ffxfs;
15use fidl_fuchsia_io as fio;
16use fuchsia_merkle::Hash;
17use std::borrow::Cow;
18use std::collections::BTreeSet;
19
20const RAMDISK_BLOCK_SIZE: u64 = 512;
21static FXFS_BLOB_VOLUME_NAME: &str = "blob";
22
23#[cfg(test)]
24mod test;
25
26#[derive(Debug, Clone)]
28pub struct BlobInfo {
29 merkle: Hash,
30 contents: Cow<'static, [u8]>,
31}
32
33impl<B> From<B> for BlobInfo
34where
35 B: Into<Cow<'static, [u8]>>,
36{
37 fn from(bytes: B) -> Self {
38 let bytes = bytes.into();
39 Self { merkle: fuchsia_merkle::root_from_slice(&bytes), contents: bytes }
40 }
41}
42
43pub struct BlobfsRamdiskBuilder {
45 ramdisk: Option<SuppliedRamdisk>,
46 blobs: Vec<BlobInfo>,
47 implementation: Implementation,
48}
49
50enum SuppliedRamdisk {
51 Formatted(FormattedRamdisk),
52 Unformatted(Ramdisk),
53}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum Implementation {
58 CppBlobfs,
60 Fxblob,
62}
63
64impl Implementation {
65 pub fn from_env() -> Self {
71 match env!("FXFS_BLOB") {
72 "true" => Self::Fxblob,
73 "false" => Self::CppBlobfs,
74 other => panic!("unexpected value for env var 'FXFS_BLOB': {other}"),
75 }
76 }
77}
78
79impl BlobfsRamdiskBuilder {
80 fn new() -> Self {
81 Self { ramdisk: None, blobs: vec![], implementation: Implementation::CppBlobfs }
82 }
83
84 pub fn formatted_ramdisk(self, ramdisk: FormattedRamdisk) -> Self {
86 Self { ramdisk: Some(SuppliedRamdisk::Formatted(ramdisk)), ..self }
87 }
88
89 pub fn ramdisk(self, ramdisk: Ramdisk) -> Self {
91 Self { ramdisk: Some(SuppliedRamdisk::Unformatted(ramdisk)), ..self }
92 }
93
94 pub fn with_blob(mut self, blob: impl Into<BlobInfo>) -> Self {
96 self.blobs.push(blob.into());
97 self
98 }
99
100 pub fn cpp_blobfs(self) -> Self {
103 Self { implementation: Implementation::CppBlobfs, ..self }
104 }
105
106 pub fn fxblob(self) -> Self {
109 Self { implementation: Implementation::Fxblob, ..self }
110 }
111
112 pub fn implementation(self, implementation: Implementation) -> Self {
114 Self { implementation, ..self }
115 }
116
117 pub fn impl_from_env(self) -> Self {
119 self.implementation(Implementation::from_env())
120 }
121
122 pub async fn start(self) -> Result<BlobfsRamdisk, Error> {
124 let Self { ramdisk, blobs, implementation } = self;
125 let (ramdisk, needs_format) = match ramdisk {
126 Some(SuppliedRamdisk::Formatted(FormattedRamdisk(ramdisk))) => (ramdisk, false),
127 Some(SuppliedRamdisk::Unformatted(ramdisk)) => (ramdisk, true),
128 None => (Ramdisk::start().await.context("creating backing ramdisk for blobfs")?, true),
129 };
130
131 let block_dir = fuchsia_fs::directory::clone(ramdisk.client.outgoing())?;
132 let block_connector = fs_management::filesystem::DirBasedBlockConnector::new(
133 block_dir,
134 format!("svc/{}", fidl_fuchsia_storage_block::BlockMarker::PROTOCOL_NAME),
135 );
136
137 let mut fs = match implementation {
139 Implementation::CppBlobfs => fs_management::filesystem::Filesystem::new(
140 block_connector,
141 fs_management::Blobfs { ..fs_management::Blobfs::dynamic_child() },
142 ),
143 Implementation::Fxblob => fs_management::filesystem::Filesystem::new(
144 block_connector,
145 fs_management::Fxfs::default(),
146 ),
147 };
148 if needs_format {
149 let () = fs.format().await.context("formatting ramdisk")?;
150 }
151
152 let fs = match implementation {
153 Implementation::CppBlobfs => ServingFilesystem::SingleVolume(
154 fs.serve().await.context("serving single volume filesystem")?,
155 ),
156 Implementation::Fxblob => {
157 let fs =
158 fs.serve_multi_volume().await.context("serving multi volume filesystem")?;
159 let volume = if needs_format {
160 fs.create_volume(
161 FXFS_BLOB_VOLUME_NAME,
162 CreateOptions::default(),
163 MountOptions { as_blob: Some(true), ..MountOptions::default() },
164 )
165 .await
166 .context("creating blob volume")?
167 } else {
168 fs.open_volume(
169 FXFS_BLOB_VOLUME_NAME,
170 MountOptions { as_blob: Some(true), ..MountOptions::default() },
171 )
172 .await
173 .context("opening blob volume")?
174 };
175 ServingFilesystem::MultiVolume(fs, volume)
176 }
177 };
178
179 let blobfs = BlobfsRamdisk { backing_ramdisk: FormattedRamdisk(ramdisk), fs };
180
181 if !blobs.is_empty() {
183 let mut present_blobs = blobfs.list_blobs()?;
184
185 for blob in blobs {
186 if present_blobs.contains(&blob.merkle) {
187 continue;
188 }
189 blobfs
190 .write_blob(blob.merkle, &blob.contents)
191 .await
192 .context(format!("writing {}", blob.merkle))?;
193 present_blobs.insert(blob.merkle);
194 }
195 }
196
197 Ok(blobfs)
198 }
199}
200
201pub struct BlobfsRamdisk {
203 backing_ramdisk: FormattedRamdisk,
204 fs: ServingFilesystem,
205}
206
207enum ServingFilesystem {
212 SingleVolume(fs_management::filesystem::ServingSingleVolumeFilesystem),
213 MultiVolume(
214 fs_management::filesystem::ServingMultiVolumeFilesystem,
215 fs_management::filesystem::ServingVolume,
216 ),
217}
218
219impl ServingFilesystem {
220 async fn shutdown(self) -> Result<(), Error> {
221 match self {
222 Self::SingleVolume(fs) => fs.shutdown().await.context("shutting down single volume"),
223 Self::MultiVolume(fs, _volume) => {
224 fs.shutdown().await.context("shutting down multi volume")
225 }
226 }
227 }
228
229 fn exposed_dir(&self) -> Result<&fio::DirectoryProxy, Error> {
230 match self {
231 Self::SingleVolume(fs) => Ok(fs.exposed_dir()),
232 Self::MultiVolume(_fs, volume) => Ok(volume.exposed_dir()),
233 }
234 }
235
236 fn blob_dir_name(&self) -> &'static str {
238 match self {
239 Self::SingleVolume(_) => "blob-exec",
240 Self::MultiVolume(_, _) => "root",
241 }
242 }
243
244 fn svc_dir(&self) -> Result<fio::DirectoryProxy, Error> {
245 match self {
246 Self::SingleVolume(_) => Ok(fuchsia_fs::directory::open_directory_async(
247 self.exposed_dir()?,
248 ".",
249 fio::PERM_READABLE,
250 )
251 .context("opening svc dir")?),
252 Self::MultiVolume(_, _) => Ok(fuchsia_fs::directory::open_directory_async(
253 self.exposed_dir()?,
254 "svc",
255 fio::PERM_READABLE,
256 )
257 .context("opening svc dir")?),
258 }
259 }
260
261 fn blob_creator_proxy(&self) -> Result<ffxfs::BlobCreatorProxy, Error> {
262 fuchsia_component::client::connect_to_protocol_at_dir_root::<ffxfs::BlobCreatorMarker>(
263 &self.svc_dir()?,
264 )
265 .context("connecting to fuchsia.fxfs.BlobCreator")
266 }
267
268 fn blob_reader_proxy(&self) -> Result<ffxfs::BlobReaderProxy, Error> {
269 fuchsia_component::client::connect_to_protocol_at_dir_root::<ffxfs::BlobReaderMarker>(
270 &self.svc_dir()?,
271 )
272 .context("connecting to fuchsia.fxfs.BlobReader")
273 }
274
275 fn overwrite_configuration_proxy(
276 &self,
277 ) -> Result<fidl_fuchsia_storage_blobfs::OverwriteConfigurationProxy, Error> {
278 fuchsia_component::client::connect_to_protocol_at_dir_root::<
279 fidl_fuchsia_storage_blobfs::OverwriteConfigurationMarker,
280 >(&self.svc_dir()?)
281 .context("connecting to fuchsia.storage.blobfs.OverwriteConfiguration")
282 }
283
284 fn implementation(&self) -> Implementation {
285 match self {
286 Self::SingleVolume(_) => Implementation::CppBlobfs,
287 Self::MultiVolume(_, _) => Implementation::Fxblob,
288 }
289 }
290}
291
292impl BlobfsRamdisk {
293 pub fn builder() -> BlobfsRamdiskBuilder {
295 BlobfsRamdiskBuilder::new()
296 }
297
298 pub async fn start() -> Result<Self, Error> {
300 Self::builder().start().await
301 }
302
303 pub fn client(&self) -> blobfs::Client {
309 blobfs::Client::new(
310 self.root_dir_proxy().unwrap(),
311 Some(self.blob_creator_proxy().unwrap()),
312 self.blob_reader_proxy().unwrap(),
313 None,
314 )
315 .unwrap()
316 }
317
318 pub fn root_dir_handle(&self) -> Result<ClientEnd<fio::DirectoryMarker>, Error> {
320 let (root_clone, server_end) = zx::Channel::create();
321 self.fs.exposed_dir()?.open(
322 self.fs.blob_dir_name(),
323 fio::PERM_READABLE | fio::Flags::PERM_INHERIT_WRITE | fio::Flags::PERM_EXECUTE,
324 &Default::default(),
325 server_end,
326 )?;
327 Ok(root_clone.into())
328 }
329
330 pub fn root_dir_proxy(&self) -> Result<fio::DirectoryProxy, Error> {
332 Ok(self.root_dir_handle()?.into_proxy())
333 }
334
335 pub fn root_dir(&self) -> Result<openat::Dir, Error> {
337 use std::os::fd::{FromRawFd as _, IntoRawFd as _, OwnedFd};
338
339 let fd: OwnedFd =
340 fdio::create_fd(self.root_dir_handle()?.into()).context("failed to create fd")?;
341
342 unsafe { Ok(openat::Dir::from_raw_fd(fd.into_raw_fd())) }
347 }
348
349 pub async fn into_builder(self) -> Result<BlobfsRamdiskBuilder, Error> {
352 let implementation = self.fs.implementation();
353 let ramdisk = self.unmount().await?;
354 Ok(Self::builder().formatted_ramdisk(ramdisk).implementation(implementation))
355 }
356
357 pub async fn unmount(self) -> Result<FormattedRamdisk, Error> {
359 self.fs.shutdown().await?;
360 Ok(self.backing_ramdisk)
361 }
362
363 pub async fn stop(self) -> Result<(), Error> {
365 let _ = self.unmount().await?;
366 Ok(())
367 }
368
369 pub fn list_blobs(&self) -> Result<BTreeSet<Hash>, Error> {
371 self.root_dir()?
372 .list_dir(".")?
373 .map(|entry| {
374 Ok(entry?
375 .file_name()
376 .to_str()
377 .ok_or_else(|| anyhow!("expected valid utf-8"))?
378 .parse()?)
379 })
380 .collect()
381 }
382
383 pub async fn add_blob_from(
385 &self,
386 merkle: Hash,
387 mut source: impl std::io::Read,
388 ) -> Result<(), Error> {
389 let mut bytes = vec![];
390 source.read_to_end(&mut bytes)?;
391 self.write_blob(merkle, &bytes).await
392 }
393
394 pub async fn write_blob_with_overwrite(
398 &self,
399 merkle: Hash,
400 bytes: &[u8],
401 overwrite: bool,
402 ) -> Result<(), Error> {
403 let compressed_data = Type1Blob::generate(bytes, CompressionMode::Attempt);
404 let blob_creator = self.blob_creator_proxy()?;
405 let writer_client_end = match blob_creator.create(&merkle.into(), overwrite).await? {
406 Ok(writer_client_end) => writer_client_end,
407 Err(ffxfs::CreateBlobError::AlreadyExists) => {
408 return Ok(());
409 }
410 Err(e) => {
411 return Err(anyhow!("create blob error {:?}", e));
412 }
413 };
414 let writer = writer_client_end.into_proxy();
415 let mut blob_writer = blob_writer::BlobWriter::create(writer, compressed_data.len() as u64)
416 .await
417 .context("failed to create BlobWriter")?;
418 blob_writer.write(&compressed_data).await?;
419 Ok(())
420 }
421
422 pub async fn write_blob(&self, merkle: Hash, bytes: &[u8]) -> Result<(), Error> {
425 self.write_blob_with_overwrite(merkle, bytes, false).await
426 }
427
428 pub fn svc_dir(&self) -> Result<fio::DirectoryProxy, Error> {
432 self.fs.svc_dir()
433 }
434
435 pub fn blob_creator_proxy(&self) -> Result<ffxfs::BlobCreatorProxy, Error> {
437 self.fs.blob_creator_proxy()
438 }
439
440 pub fn blob_reader_proxy(&self) -> Result<ffxfs::BlobReaderProxy, Error> {
442 self.fs.blob_reader_proxy()
443 }
444
445 pub fn overwrite_configuration_proxy(
447 &self,
448 ) -> Result<fidl_fuchsia_storage_blobfs::OverwriteConfigurationProxy, Error> {
449 self.fs.overwrite_configuration_proxy()
450 }
451}
452
453pub struct RamdiskBuilder {
455 block_count: u64,
456}
457
458impl RamdiskBuilder {
459 fn new() -> Self {
460 Self { block_count: 1 << 20 }
461 }
462
463 pub fn block_count(mut self, block_count: u64) -> Self {
465 self.block_count = block_count;
466 self
467 }
468
469 pub async fn start(self) -> Result<Ramdisk, Error> {
471 let client = ramdevice_client::RamdiskClient::builder(RAMDISK_BLOCK_SIZE, self.block_count);
472 let client = client.build().await?;
473 Ok(Ramdisk { client })
474 }
475
476 pub async fn into_blobfs_builder(self) -> Result<BlobfsRamdiskBuilder, Error> {
478 Ok(BlobfsRamdiskBuilder::new().ramdisk(self.start().await?))
479 }
480}
481
482pub struct Ramdisk {
484 client: ramdevice_client::RamdiskClient,
485}
486
487impl Ramdisk {
490 pub fn builder() -> RamdiskBuilder {
492 RamdiskBuilder::new()
493 }
494
495 pub async fn start() -> Result<Self, Error> {
498 Self::builder().start().await
499 }
500}
501
502pub struct FormattedRamdisk(Ramdisk);
504
505impl std::ops::Deref for FormattedRamdisk {
507 type Target = Ramdisk;
508 fn deref(&self) -> &Self::Target {
509 &self.0
510 }
511}
512
513#[cfg(test)]
514mod tests {
515 use super::*;
516 use test_case::test_case;
517
518 #[test_case(Implementation::CppBlobfs; "blobfs")]
519 #[test_case(Implementation::Fxblob; "fxblob")]
520 #[fuchsia_async::run_singlethreaded(test)]
521 async fn clean_start_and_stop(implementation: Implementation) {
522 let blobfs = BlobfsRamdisk::builder().implementation(implementation).start().await.unwrap();
523
524 let proxy = blobfs.root_dir_proxy().unwrap();
525 drop(proxy);
526
527 blobfs.stop().await.unwrap();
528 }
529
530 #[test_case(Implementation::CppBlobfs; "blobfs")]
531 #[test_case(Implementation::Fxblob; "fxblob")]
532 #[fuchsia_async::run_singlethreaded(test)]
533 async fn clean_start_contains_no_blobs(implementation: Implementation) {
534 let blobfs = BlobfsRamdisk::builder().implementation(implementation).start().await.unwrap();
535
536 assert_eq!(blobfs.list_blobs().unwrap(), BTreeSet::new());
537
538 blobfs.stop().await.unwrap();
539 }
540
541 #[test]
542 fn blob_info_conversions() {
543 let a = BlobInfo::from(&b"static slice"[..]);
544 let b = BlobInfo::from(b"owned vec".to_vec());
545 let c = BlobInfo::from(Cow::from(&b"cow"[..]));
546 assert_ne!(a.merkle, b.merkle);
547 assert_ne!(b.merkle, c.merkle);
548 assert_eq!(a.merkle, fuchsia_merkle::root_from_slice(b"static slice"));
549
550 let _ = BlobfsRamdisk::builder()
552 .with_blob(&b"static slice"[..])
553 .with_blob(b"owned vec".to_vec())
554 .with_blob(Cow::from(&b"cow"[..]));
555 }
556
557 #[test_case(Implementation::CppBlobfs; "blobfs")]
558 #[test_case(Implementation::Fxblob; "fxblob")]
559 #[fuchsia_async::run_singlethreaded(test)]
560 async fn with_blob_ignores_duplicates(implementation: Implementation) {
561 let blob = BlobInfo::from(&b"duplicate"[..]);
562
563 let blobfs = BlobfsRamdisk::builder()
564 .implementation(implementation)
565 .with_blob(blob.clone())
566 .with_blob(blob.clone())
567 .start()
568 .await
569 .unwrap();
570 assert_eq!(blobfs.list_blobs().unwrap(), BTreeSet::from([blob.merkle]));
571
572 let blobfs =
573 blobfs.into_builder().await.unwrap().with_blob(blob.clone()).start().await.unwrap();
574 assert_eq!(blobfs.list_blobs().unwrap(), BTreeSet::from([blob.merkle]));
575 }
576
577 #[test_case(Implementation::CppBlobfs; "blobfs")]
578 #[test_case(Implementation::Fxblob; "fxblob")]
579 #[fuchsia_async::run_singlethreaded(test)]
580 async fn build_with_two_blobs(implementation: Implementation) {
581 let blobfs = BlobfsRamdisk::builder()
582 .implementation(implementation)
583 .with_blob(&b"blob 1"[..])
584 .with_blob(&b"blob 2"[..])
585 .start()
586 .await
587 .unwrap();
588
589 let expected = BTreeSet::from([
590 fuchsia_merkle::root_from_slice(b"blob 1"),
591 fuchsia_merkle::root_from_slice(b"blob 2"),
592 ]);
593 assert_eq!(expected.len(), 2);
594 assert_eq!(blobfs.list_blobs().unwrap(), expected);
595
596 blobfs.stop().await.unwrap();
597 }
598
599 #[test_case(Implementation::CppBlobfs; "blobfs")]
600 #[test_case(Implementation::Fxblob; "fxblob")]
601 #[fuchsia_async::run_singlethreaded(test)]
602 async fn remount(implementation: Implementation) {
603 let blobfs = BlobfsRamdisk::builder()
604 .implementation(implementation)
605 .with_blob(&b"test"[..])
606 .start()
607 .await
608 .unwrap();
609 let blobs = blobfs.list_blobs().unwrap();
610
611 let blobfs = blobfs.into_builder().await.unwrap().start().await.unwrap();
612
613 assert_eq!(blobs, blobfs.list_blobs().unwrap());
614
615 blobfs.stop().await.unwrap();
616 }
617
618 #[test_case(Implementation::CppBlobfs; "blobfs")]
619 #[test_case(Implementation::Fxblob; "fxblob")]
620 #[fuchsia_async::run_singlethreaded(test)]
621 async fn blob_appears_in_readdir(implementation: Implementation) {
622 let blobfs = BlobfsRamdisk::builder().implementation(implementation).start().await.unwrap();
623
624 let data = b"Hello blobfs!";
625 let hello_merkle = fuchsia_merkle::root_from_slice(data);
626 blobfs.write_blob(hello_merkle, data).await.unwrap();
627 assert_eq!(blobfs.list_blobs().unwrap(), BTreeSet::from([hello_merkle]));
628
629 blobfs.stop().await.unwrap();
630 }
631
632 #[fuchsia_async::run_singlethreaded(test)]
633 async fn ramdisk_builder_sets_block_count() {
634 for block_count in [1, 2, 3, 16] {
635 let ramdisk = Ramdisk::builder().block_count(block_count).start().await.unwrap();
636 let client_end = ramdisk.client.open().unwrap();
637 let proxy = client_end.into_proxy();
638 let info = proxy.get_info().await.unwrap().unwrap();
639 assert_eq!(info.block_count, block_count);
640 }
641 }
642
643 #[test_case(Implementation::CppBlobfs; "blobfs")]
644 #[test_case(Implementation::Fxblob; "fxblob")]
645 #[fuchsia_async::run_singlethreaded(test)]
646 async fn ramdisk_into_blobfs_formats_ramdisk(implementation: Implementation) {
647 let _: BlobfsRamdisk = Ramdisk::builder()
648 .into_blobfs_builder()
649 .await
650 .unwrap()
651 .implementation(implementation)
652 .start()
653 .await
654 .unwrap();
655 }
656
657 #[test_case(Implementation::CppBlobfs; "blobfs")]
658 #[test_case(Implementation::Fxblob; "fxblob")]
659 #[fuchsia_async::run_singlethreaded(test)]
660 async fn read_and_write(implementation: Implementation) {
661 let blobfs = BlobfsRamdisk::builder().implementation(implementation).start().await.unwrap();
662
663 assert_eq!(blobfs.list_blobs().unwrap(), BTreeSet::from([]));
664 let data = "Hello blobfs!".as_bytes();
665 let merkle = fuchsia_merkle::root_from_slice(data);
666 blobfs.write_blob(merkle, data).await.unwrap();
667
668 assert_eq!(blobfs.list_blobs().unwrap(), BTreeSet::from([merkle]));
669
670 blobfs.stop().await.unwrap();
671 }
672
673 #[test_case(Implementation::CppBlobfs; "blobfs")]
674 #[test_case(Implementation::Fxblob; "fxblob")]
675 #[fuchsia_async::run_singlethreaded(test)]
676 async fn blob_creator_api(implementation: Implementation) {
677 let blobfs = BlobfsRamdisk::builder().implementation(implementation).start().await.unwrap();
678 assert_eq!(blobfs.list_blobs().unwrap(), BTreeSet::from([]));
679
680 let bytes = &[1u8; 40];
681 let hash = fuchsia_merkle::root_from_slice(bytes);
682 let compressed_data = Type1Blob::generate(bytes, CompressionMode::Always);
683
684 let blob_creator = blobfs.blob_creator_proxy().unwrap();
685 let blob_writer = blob_creator.create(&hash, false).await.unwrap().unwrap();
686 let mut blob_writer =
687 blob_writer::BlobWriter::create(blob_writer.into_proxy(), compressed_data.len() as u64)
688 .await
689 .unwrap();
690 let () = blob_writer.write(&compressed_data).await.unwrap();
691
692 assert_eq!(blobfs.list_blobs().unwrap(), BTreeSet::from([hash]));
693
694 blobfs.stop().await.unwrap();
695 }
696
697 #[test_case(Implementation::CppBlobfs; "blobfs")]
698 #[test_case(Implementation::Fxblob; "fxblob")]
699 #[fuchsia_async::run_singlethreaded(test)]
700 async fn blob_reader_api(implementation: Implementation) {
701 let data = "Hello blobfs!".as_bytes();
702 let hash = fuchsia_merkle::root_from_slice(data);
703 let blobfs = BlobfsRamdisk::builder()
704 .implementation(implementation)
705 .with_blob(data)
706 .start()
707 .await
708 .unwrap();
709
710 assert_eq!(blobfs.list_blobs().unwrap(), BTreeSet::from([hash]));
711
712 let blob_reader = blobfs.blob_reader_proxy().unwrap();
713 let vmo = blob_reader.get_vmo(&hash.into()).await.unwrap().unwrap();
714 let mut buf = vec![0; vmo.get_content_size().unwrap() as usize];
715 let () = vmo.read(&mut buf, 0).unwrap();
716 assert_eq!(buf, data);
717
718 blobfs.stop().await.unwrap();
719 }
720}