1use crate::errors::FxfsError;
6use crate::fsck::{FsckOptions, fsck_volume_with_options, fsck_with_options};
7use crate::log::*;
8use crate::metrics;
9use crate::object_store::allocator::{Allocator, Hold, Reservation};
10use crate::object_store::directory::Directory;
11use crate::object_store::graveyard::Graveyard;
12use crate::object_store::journal::super_block::{SuperBlockHeader, SuperBlockInstance};
13use crate::object_store::journal::{self, Journal, JournalCheckpoint, JournalOptions};
14use crate::object_store::object_manager::ObjectManager;
15use crate::object_store::transaction::{
16 self, AssocObj, LockKey, LockKeys, LockManager, MetadataReservation, Mutation, ReadGuard,
17 TRANSACTION_METADATA_MAX_AMOUNT, Transaction, WriteGuard, lock_keys,
18};
19use crate::object_store::volume::{VOLUMES_DIRECTORY, root_volume};
20use crate::object_store::{NewChildStoreOptions, ObjectStore, StoreOptions};
21use crate::range::RangeExt;
22use crate::serialized_types::{LATEST_VERSION, Version};
23use anyhow::{Context, Error, anyhow, bail, ensure};
24use async_trait::async_trait;
25use event_listener::Event;
26use fuchsia_async as fasync;
27use fuchsia_inspect::{Inspector, LazyNode, NumericProperty as _, UintProperty};
28use fuchsia_sync::Mutex;
29use futures::FutureExt;
30use fxfs_crypto::Crypt;
31use static_assertions::const_assert;
32use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
33use std::sync::{Arc, OnceLock, Weak};
34use storage_device::{Device, DeviceHolder};
35
36pub const MIN_BLOCK_SIZE: u64 = 4096;
37pub const MAX_BLOCK_SIZE: u64 = u16::MAX as u64 + 1;
38
39pub const MAX_FILE_SIZE: u64 = i64::MAX as u64 - 4095;
44const_assert!(9223372036854771712 == MAX_FILE_SIZE);
45
46const MAX_IN_FLIGHT_TRANSACTIONS: u64 = 4;
48
49const TRIM_AFTER_BOOT_TIMER: std::time::Duration = std::time::Duration::from_secs(60 * 60);
53
54const TRIM_INTERVAL_TIMER: std::time::Duration = std::time::Duration::from_secs(60 * 60 * 24);
56
57pub struct Info {
59 pub total_bytes: u64,
60 pub used_bytes: u64,
61}
62
63pub type PostCommitHook =
64 Option<Box<dyn Fn() -> futures::future::BoxFuture<'static, ()> + Send + Sync>>;
65
66pub type PreCommitHook = Option<Box<dyn Fn(&Transaction<'_>) -> Result<(), Error> + Send + Sync>>;
67
68pub struct Options {
69 pub read_only: bool,
71
72 pub roll_metadata_key_byte_count: u64,
76
77 pub pre_commit_hook: PreCommitHook,
80
81 pub post_commit_hook: PostCommitHook,
84
85 pub skip_initial_reap: bool,
88
89 pub trim_config: Option<(std::time::Duration, std::time::Duration)>,
94
95 pub image_builder_mode: Option<SuperBlockInstance>,
98
99 pub inline_crypto_enabled: bool,
106
107 pub barriers_enabled: bool,
112}
113
114impl Default for Options {
115 fn default() -> Self {
116 Options {
117 roll_metadata_key_byte_count: 128 * 1024 * 1024,
118 read_only: false,
119 pre_commit_hook: None,
120 post_commit_hook: None,
121 skip_initial_reap: false,
122 trim_config: Some((TRIM_AFTER_BOOT_TIMER, TRIM_INTERVAL_TIMER)),
123 image_builder_mode: None,
124 inline_crypto_enabled: false,
125 barriers_enabled: false,
126 }
127 }
128}
129
130pub struct ApplyContext<'a, 'b> {
132 pub mode: ApplyMode<'a, 'b>,
134
135 pub checkpoint: JournalCheckpoint,
137}
138
139pub enum ApplyMode<'a, 'b> {
142 Replay,
143 Live(&'a Transaction<'b>),
144}
145
146impl ApplyMode<'_, '_> {
147 pub fn is_replay(&self) -> bool {
148 matches!(self, ApplyMode::Replay)
149 }
150
151 pub fn is_live(&self) -> bool {
152 matches!(self, ApplyMode::Live(_))
153 }
154}
155
156#[async_trait]
159pub trait JournalingObject: Send + Sync {
160 fn apply_mutation(
164 &self,
165 mutation: Mutation,
166 context: &ApplyContext<'_, '_>,
167 assoc_obj: AssocObj<'_>,
168 ) -> Result<(), Error>;
169
170 fn drop_mutation(&self, mutation: Mutation, transaction: &Transaction<'_>);
172
173 async fn flush(&self) -> Result<Version, Error>;
177
178 fn write_mutation(&self, mutation: &Mutation, mut writer: journal::Writer<'_>) {
181 writer.write(mutation.clone());
182 }
183}
184
185#[derive(Default)]
186pub struct SyncOptions<'a> {
187 pub flush_device: bool,
194
195 pub precondition: Option<Box<dyn FnOnce() -> bool + 'a + Send>>,
198}
199
200pub struct OpenFxFilesystem(Arc<FxFilesystem>);
201
202impl OpenFxFilesystem {
203 pub async fn take_device(self) -> DeviceHolder {
206 let fut = self.device.take_when_dropped();
207 std::mem::drop(self);
208 debug_assert_not_too_long!(fut)
209 }
210
211 pub async fn finalize(&self) -> Result<(), Error> {
213 ensure!(
214 self.journal().image_builder_mode().is_some(),
215 "finalize() only valid in image_builder_mode."
216 );
217 self.journal().allocate_journal().await?;
218 self.journal().set_image_builder_mode(None);
219 self.journal().compact().await?;
221 Ok(())
222 }
223}
224
225impl From<Arc<FxFilesystem>> for OpenFxFilesystem {
226 fn from(fs: Arc<FxFilesystem>) -> Self {
227 Self(fs)
228 }
229}
230
231impl Drop for OpenFxFilesystem {
232 fn drop(&mut self) {
233 if self.options.image_builder_mode.is_some()
234 && self.journal().image_builder_mode().is_some()
235 {
236 error!("OpenFxFilesystem in image_builder_mode dropped without calling finalize().");
237 }
238 if !self.options.read_only && !self.closed.load(Ordering::SeqCst) {
239 error!("OpenFxFilesystem dropped without first being closed. Data loss may occur.");
240 }
241 }
242}
243
244impl std::ops::Deref for OpenFxFilesystem {
245 type Target = Arc<FxFilesystem>;
246
247 fn deref(&self) -> &Self::Target {
248 &self.0
249 }
250}
251
252pub struct FxFilesystemBuilder {
253 format: bool,
254 trace: bool,
255 options: Options,
256 journal_options: JournalOptions,
257 on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
258 on_new_store: Option<Box<dyn Fn(&ObjectStore) + Send + Sync>>,
259 fsck_after_every_transaction: bool,
260}
261
262impl FxFilesystemBuilder {
263 pub fn new() -> Self {
264 Self {
265 format: false,
266 trace: false,
267 options: Options::default(),
268 journal_options: JournalOptions::default(),
269 on_new_allocator: None,
270 on_new_store: None,
271 fsck_after_every_transaction: false,
272 }
273 }
274
275 pub fn format(mut self, format: bool) -> Self {
277 self.format = format;
278 self
279 }
280
281 pub fn trace(mut self, trace: bool) -> Self {
283 self.trace = trace;
284 self
285 }
286
287 pub fn read_only(mut self, read_only: bool) -> Self {
290 self.options.read_only = read_only;
291 self
292 }
293
294 pub fn image_builder_mode(mut self, mode: Option<SuperBlockInstance>) -> Self {
301 self.options.image_builder_mode = mode;
302 self
303 }
304
305 pub fn roll_metadata_key_byte_count(mut self, roll_metadata_key_byte_count: u64) -> Self {
307 self.options.roll_metadata_key_byte_count = roll_metadata_key_byte_count;
308 self
309 }
310
311 pub fn pre_commit_hook(
313 mut self,
314 hook: impl Fn(&Transaction<'_>) -> Result<(), Error> + Send + Sync + 'static,
315 ) -> Self {
316 self.options.pre_commit_hook = Some(Box::new(hook));
317 self
318 }
319
320 pub fn post_commit_hook(
323 mut self,
324 hook: impl Fn() -> futures::future::BoxFuture<'static, ()> + Send + Sync + 'static,
325 ) -> Self {
326 self.options.post_commit_hook = Some(Box::new(hook));
327 self
328 }
329
330 pub fn skip_initial_reap(mut self, skip_initial_reap: bool) -> Self {
333 self.options.skip_initial_reap = skip_initial_reap;
334 self
335 }
336
337 pub fn journal_options(mut self, journal_options: JournalOptions) -> Self {
339 self.journal_options = journal_options;
340 self
341 }
342
343 pub fn on_new_allocator(
345 mut self,
346 on_new_allocator: impl Fn(Arc<Allocator>) + Send + Sync + 'static,
347 ) -> Self {
348 self.on_new_allocator = Some(Box::new(on_new_allocator));
349 self
350 }
351
352 pub fn on_new_store(
354 mut self,
355 on_new_store: impl Fn(&ObjectStore) + Send + Sync + 'static,
356 ) -> Self {
357 self.on_new_store = Some(Box::new(on_new_store));
358 self
359 }
360
361 pub fn fsck_after_every_transaction(mut self, fsck_after_every_transaction: bool) -> Self {
363 self.fsck_after_every_transaction = fsck_after_every_transaction;
364 self
365 }
366
367 pub fn trim_config(
368 mut self,
369 delay_and_interval: Option<(std::time::Duration, std::time::Duration)>,
370 ) -> Self {
371 self.options.trim_config = delay_and_interval;
372 self
373 }
374
375 pub fn inline_crypto_enabled(mut self, inline_crypto_enabled: bool) -> Self {
377 self.options.inline_crypto_enabled = inline_crypto_enabled;
378 self
379 }
380
381 pub fn barriers_enabled(mut self, barriers_enabled: bool) -> Self {
384 self.options.barriers_enabled = barriers_enabled;
385 self.journal_options.barriers_enabled = barriers_enabled;
386 self
387 }
388
389 pub async fn open(self, device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
391 let read_only = self.options.read_only;
392 if self.format && read_only {
393 bail!("Cannot initialize a filesystem as read-only");
394 }
395
396 if self.options.inline_crypto_enabled && !self.options.barriers_enabled {
398 bail!("A filesystem using inline encryption requires barriers");
399 }
400
401 let objects = Arc::new(ObjectManager::new(self.on_new_store));
402 let journal = Arc::new(Journal::new(objects.clone(), self.journal_options));
403
404 let image_builder_mode = self.options.image_builder_mode;
405
406 let block_size = std::cmp::max(device.block_size().into(), MIN_BLOCK_SIZE);
407 assert_eq!(block_size % MIN_BLOCK_SIZE, 0);
408 assert!(block_size <= MAX_BLOCK_SIZE, "Max supported block size is 64KiB");
409
410 let mut fsck_after_every_transaction = None;
411 let mut filesystem_options = self.options;
412 if self.fsck_after_every_transaction {
413 let instance =
414 FsckAfterEveryTransaction::new(filesystem_options.post_commit_hook.take());
415 fsck_after_every_transaction = Some(instance.clone());
416 filesystem_options.post_commit_hook =
417 Some(Box::new(move || Box::pin(instance.clone().run())));
418 }
419
420 if !read_only && !self.format {
421 device.flush().await.context("Device flush failed")?;
424 }
425
426 let filesystem = Arc::new_cyclic(|weak: &Weak<FxFilesystem>| {
427 let weak = weak.clone();
428 FxFilesystem {
429 device,
430 block_size,
431 objects: objects.clone(),
432 journal,
433 commit_mutex: futures::lock::Mutex::new(()),
434 lock_manager: LockManager::new(),
435 flush_task: Mutex::new(None),
436 trim_task: Mutex::new(None),
437 closed: AtomicBool::new(true),
438 shutdown_event: Event::new(),
439 trace: self.trace,
440 graveyard: Graveyard::new(objects.clone()),
441 completed_transactions: metrics::detail().create_uint("completed_transactions", 0),
442 options: filesystem_options,
443 in_flight_transactions: AtomicU64::new(0),
444 transaction_limit_event: Event::new(),
445 _stores_node: metrics::register_fs(move || {
446 let weak = weak.clone();
447 Box::pin(async move {
448 if let Some(fs) = weak.upgrade() {
449 fs.populate_stores_node().await
450 } else {
451 Err(anyhow!("Filesystem has been dropped"))
452 }
453 })
454 }),
455 }
456 });
457
458 filesystem.journal().set_image_builder_mode(image_builder_mode);
459
460 filesystem.journal.set_trace(self.trace);
461 if self.format {
462 filesystem.journal.init_empty(filesystem.clone()).await?;
463 if image_builder_mode.is_none() {
464 filesystem.journal.init_superblocks().await?;
467
468 filesystem.graveyard.clone().reap_async();
470 }
471
472 let root_store = filesystem.root_store();
474 root_store.set_trace(self.trace);
475 let root_directory =
476 Directory::open(&root_store, root_store.root_directory_object_id())
477 .await
478 .context("Unable to open root volume directory")?;
479 let mut transaction = filesystem
480 .clone()
481 .new_transaction(
482 lock_keys![LockKey::object(
483 root_store.store_object_id(),
484 root_directory.object_id()
485 )],
486 transaction::Options::default(),
487 )
488 .await?;
489 let volume_directory =
490 root_directory.create_child_dir(&mut transaction, VOLUMES_DIRECTORY).await?;
491 transaction.commit().await?;
492 objects.set_volume_directory(volume_directory);
493 } else {
494 filesystem
495 .journal
496 .replay(filesystem.clone(), self.on_new_allocator)
497 .await
498 .context("Journal replay failed")?;
499 filesystem.root_store().set_trace(self.trace);
500
501 if !read_only {
502 for store in objects.unlocked_stores() {
506 filesystem.graveyard.initial_reap(&store).await?;
507 }
508 }
509 }
510
511 if let Some(fsck_after_every_transaction) = fsck_after_every_transaction {
513 fsck_after_every_transaction
514 .fs
515 .set(Arc::downgrade(&filesystem))
516 .unwrap_or_else(|_| unreachable!());
517 }
518
519 filesystem.closed.store(false, Ordering::SeqCst);
520
521 if !read_only && image_builder_mode.is_none() {
522 filesystem.graveyard.clone().reap_async();
524
525 if let Some((delay, interval)) = filesystem.options.trim_config.clone() {
526 filesystem.start_trim_task(delay, interval);
527 }
528 }
529
530 Ok(filesystem.into())
531 }
532}
533
534pub struct FxFilesystem {
535 block_size: u64,
536 objects: Arc<ObjectManager>,
537 journal: Arc<Journal>,
538 commit_mutex: futures::lock::Mutex<()>,
539 lock_manager: LockManager,
540 flush_task: Mutex<Option<fasync::Task<()>>>,
541 trim_task: Mutex<Option<fasync::Task<()>>>,
542 closed: AtomicBool,
543 shutdown_event: Event,
545 trace: bool,
546 graveyard: Arc<Graveyard>,
547 completed_transactions: UintProperty,
548 options: Options,
549
550 in_flight_transactions: AtomicU64,
552
553 transaction_limit_event: Event,
556
557 device: DeviceHolder,
560
561 _stores_node: LazyNode,
563}
564
565#[fxfs_trace::trace]
566impl FxFilesystem {
567 pub async fn new_empty(device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
568 FxFilesystemBuilder::new().format(true).open(device).await
569 }
570
571 pub async fn open(device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
572 FxFilesystemBuilder::new().open(device).await
573 }
574
575 pub fn root_parent_store(&self) -> Arc<ObjectStore> {
576 self.objects.root_parent_store()
577 }
578
579 pub async fn close(&self) -> Result<(), Error> {
580 assert_eq!(self.closed.swap(true, Ordering::SeqCst), false);
581 self.shutdown_event.notify(usize::MAX);
582 debug_assert_not_too_long!(self.graveyard.wait_for_reap());
583 let trim_task = self.trim_task.lock().take();
584 if let Some(task) = trim_task {
585 debug_assert_not_too_long!(task);
586 }
587 self.journal.stop_compactions().await;
588 let sync_status = if self.options().image_builder_mode.is_some() || self.options().read_only
589 {
590 Ok(None)
593 } else {
594 self.journal.sync(SyncOptions { flush_device: true, ..Default::default() }).await
595 };
596 match &sync_status {
597 Ok(None) => {}
598 Ok(checkpoint) => info!(
599 "Filesystem closed (checkpoint={}, metadata_reservation={:?}, \
600 reservation_required={}, borrowed={})",
601 checkpoint.as_ref().unwrap().0.file_offset,
602 self.object_manager().metadata_reservation(),
603 self.object_manager().required_reservation(),
604 self.object_manager().borrowed_metadata_space(),
605 ),
606 Err(e) => error!(error:? = e; "Failed to sync filesystem; data may be lost"),
607 }
608 self.journal.terminate();
609 let flush_task = self.flush_task.lock().take();
610 if let Some(task) = flush_task {
611 debug_assert_not_too_long!(task);
612 }
613 self.device().close().await.context("Failed to close device")?;
616 sync_status.map(|_| ())
617 }
618
619 pub fn device(&self) -> Arc<dyn Device> {
620 Arc::clone(&self.device)
621 }
622
623 pub fn root_store(&self) -> Arc<ObjectStore> {
624 self.objects.root_store()
625 }
626
627 pub fn allocator(&self) -> Arc<Allocator> {
628 self.objects.allocator()
629 }
630
631 pub fn enable_allocations(&self) {
635 self.allocator().enable_allocations();
636 }
637
638 pub fn object_manager(&self) -> &Arc<ObjectManager> {
639 &self.objects
640 }
641
642 pub fn journal(&self) -> &Arc<Journal> {
643 &self.journal
644 }
645
646 pub async fn sync(&self, options: SyncOptions<'_>) -> Result<(), Error> {
647 self.journal.sync(options).await.map(|_| ())
648 }
649
650 pub fn block_size(&self) -> u64 {
651 self.block_size
652 }
653
654 pub fn get_info(&self) -> Info {
655 Info {
656 total_bytes: self.device.size(),
657 used_bytes: self.object_manager().allocator().get_used_bytes().0,
658 }
659 }
660
661 pub fn super_block_header(&self) -> SuperBlockHeader {
662 self.journal.super_block_header()
663 }
664
665 pub fn graveyard(&self) -> &Arc<Graveyard> {
666 &self.graveyard
667 }
668
669 pub fn trace(&self) -> bool {
670 self.trace
671 }
672
673 pub fn options(&self) -> &Options {
674 &self.options
675 }
676
677 pub async fn txn_guard(self: Arc<Self>) -> TxnGuard<'static> {
684 TxnGuard::Owned(
685 self.lock_manager
686 .read_lock(lock_keys!(LockKey::Filesystem))
687 .await
688 .into_owned(self.clone()),
689 )
690 }
691
692 pub async fn new_transaction<'a>(
693 self: Arc<Self>,
694 locks: LockKeys,
695 options: transaction::Options<'a>,
696 ) -> Result<Transaction<'a>, Error> {
697 let guard = if let Some(guard) = options.txn_guard.as_ref() {
698 TxnGuard::Borrowed(guard)
699 } else {
700 self.txn_guard().await
701 };
702 Transaction::new(guard, options, locks).await
703 }
704
705 #[trace]
706 pub async fn commit_transaction(
707 &self,
708 transaction: &mut Transaction<'_>,
709 callback: &mut (dyn FnMut(u64) + Send),
710 ) -> Result<u64, Error> {
711 if let Some(hook) = self.options.pre_commit_hook.as_ref() {
712 hook(transaction)?;
713 }
714 debug_assert_not_too_long!(self.lock_manager.commit_prepare(&transaction));
715 self.maybe_start_flush_task();
716 let _guard = debug_assert_not_too_long!(self.commit_mutex.lock());
717 let journal_offset = if self.journal().image_builder_mode().is_some() {
718 let journal_checkpoint =
719 JournalCheckpoint { file_offset: 0, checksum: 0, version: LATEST_VERSION };
720 let maybe_mutation = self
721 .object_manager()
722 .apply_transaction(transaction, &journal_checkpoint)
723 .expect("Transactions must not fail in image_builder_mode");
724 if let Some(mutation) = maybe_mutation {
725 assert!(matches!(mutation, Mutation::UpdateBorrowed(_)));
726 }
730 self.object_manager().did_commit_transaction(transaction, &journal_checkpoint, 0);
731 0
732 } else {
733 self.journal.commit(transaction).await?
734 };
735 self.completed_transactions.add(1);
736
737 callback(journal_offset);
742
743 if let Some(hook) = self.options.post_commit_hook.as_ref() {
744 hook().await;
745 }
746
747 Ok(journal_offset)
748 }
749
750 pub fn lock_manager(&self) -> &LockManager {
751 &self.lock_manager
752 }
753
754 pub(crate) fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
755 if !matches!(transaction.metadata_reservation, MetadataReservation::None) {
756 self.sub_transaction();
757 }
758 if let MetadataReservation::Hold(hold_amount) =
760 std::mem::replace(&mut transaction.metadata_reservation, MetadataReservation::None)
761 {
762 let hold = transaction
763 .allocator_reservation
764 .unwrap()
765 .reserve(0)
766 .expect("Zero should always succeed.");
767 hold.add(hold_amount);
768 }
769 self.objects.drop_transaction(transaction);
770 self.lock_manager.drop_transaction(transaction);
771 }
772
773 fn maybe_start_flush_task(&self) {
774 let mut flush_task = self.flush_task.lock();
775 if flush_task.is_none() {
776 let journal = self.journal.clone();
777 *flush_task = Some(fasync::Task::spawn(journal.flush_task()));
778 }
779 }
780
781 async fn do_trim(&self) -> Result<usize, Error> {
783 const MAX_EXTENTS_PER_BATCH: usize = 8;
784 const MAX_EXTENT_SIZE: usize = 256 * 1024;
785 let mut offset = 0;
786 let mut bytes_trimmed = 0;
787 loop {
788 if self.closed.load(Ordering::Relaxed) {
789 info!("Filesystem is closed, nothing to trim");
790 return Ok(bytes_trimmed);
791 }
792 let allocator = self.allocator();
793 let trimmable_extents =
794 allocator.take_for_trimming(offset, MAX_EXTENT_SIZE, MAX_EXTENTS_PER_BATCH).await?;
795 for device_range in trimmable_extents.extents() {
796 self.device.trim(device_range.clone()).await?;
797 bytes_trimmed += device_range.length()? as usize;
798 }
799 if let Some(device_range) = trimmable_extents.extents().last() {
800 offset = device_range.end;
801 } else {
802 break;
803 }
804 }
805 Ok(bytes_trimmed)
806 }
807
808 fn start_trim_task(
809 self: &Arc<Self>,
810 delay: std::time::Duration,
811 interval: std::time::Duration,
812 ) {
813 if !self.device.supports_trim() {
814 info!("Device does not support trim; not scheduling trimming");
815 return;
816 }
817 let this = self.clone();
818 let mut next_timer = delay;
819 *self.trim_task.lock() = Some(fasync::Task::spawn(async move {
820 loop {
821 let shutdown_listener = this.shutdown_event.listen();
822 if this.closed.load(Ordering::SeqCst) {
828 return;
829 }
830 futures::select!(
831 () = fasync::Timer::new(next_timer.clone()).fuse() => {},
832 () = shutdown_listener.fuse() => return,
833 );
834 let start_time = std::time::Instant::now();
835 let res = this.do_trim().await;
836 let duration = std::time::Instant::now() - start_time;
837 next_timer = interval.clone();
838 match res {
839 Ok(bytes_trimmed) => info!(
840 "Trimmed {bytes_trimmed} bytes in {duration:?}. Next trim in \
841 {next_timer:?}",
842 ),
843 Err(e) => error!(e:?; "Failed to trim"),
844 }
845 }
846 }));
847 }
848
849 pub(crate) async fn reservation_for_transaction<'a>(
850 self: &Arc<Self>,
851 options: transaction::Options<'a>,
852 ) -> Result<(MetadataReservation, Option<&'a Reservation>, Option<Hold<'a>>), Error> {
853 if self.options.image_builder_mode.is_some() {
854 return Ok((MetadataReservation::Borrowed, None, None));
857 }
858 if !options.skip_journal_checks {
859 self.maybe_start_flush_task();
860 self.journal.check_journal_space().await?;
861 }
862
863 let mut hold = None;
877 let metadata_reservation = if options.borrow_metadata_space {
878 MetadataReservation::Borrowed
879 } else {
880 match options.allocator_reservation {
881 Some(reservation) => {
882 hold = Some(
883 reservation
884 .reserve(TRANSACTION_METADATA_MAX_AMOUNT)
885 .ok_or(FxfsError::NoSpace)?,
886 );
887 MetadataReservation::Hold(TRANSACTION_METADATA_MAX_AMOUNT)
888 }
889 None => {
890 let reservation = self
891 .allocator()
892 .reserve(None, TRANSACTION_METADATA_MAX_AMOUNT)
893 .ok_or(FxfsError::NoSpace)?;
894 MetadataReservation::Reservation(reservation)
895 }
896 }
897 };
898 Ok((metadata_reservation, options.allocator_reservation, hold))
899 }
900
901 pub(crate) async fn add_transaction(&self, skip_journal_checks: bool) {
902 if skip_journal_checks {
903 self.in_flight_transactions.fetch_add(1, Ordering::Relaxed);
904 } else {
905 let inc = || {
906 let mut in_flights = self.in_flight_transactions.load(Ordering::Relaxed);
907 while in_flights < MAX_IN_FLIGHT_TRANSACTIONS {
908 match self.in_flight_transactions.compare_exchange_weak(
909 in_flights,
910 in_flights + 1,
911 Ordering::Relaxed,
912 Ordering::Relaxed,
913 ) {
914 Ok(_) => return true,
915 Err(x) => in_flights = x,
916 }
917 }
918 return false;
919 };
920 while !inc() {
921 let listener = self.transaction_limit_event.listen();
922 if inc() {
923 break;
924 }
925 listener.await;
926 }
927 }
928 }
929
930 pub(crate) fn sub_transaction(&self) {
931 let old = self.in_flight_transactions.fetch_sub(1, Ordering::Relaxed);
932 assert!(old != 0);
933 if old <= MAX_IN_FLIGHT_TRANSACTIONS {
934 self.transaction_limit_event.notify(usize::MAX);
935 }
936 }
937
938 pub async fn truncate_guard(&self, store_id: u64, object_id: u64) -> TruncateGuard<'_> {
939 let keys = lock_keys![LockKey::truncate(store_id, object_id,)];
940 TruncateGuard(self.lock_manager().write_lock(keys).await)
941 }
942
943 async fn populate_stores_node(&self) -> Result<Inspector, Error> {
944 let inspector = fuchsia_inspect::Inspector::default();
945 let root = inspector.root();
946 root.record_child("__root", |n| self.root_store().record_data(n));
947 let object_manager = self.object_manager();
948 let volume_directory = object_manager.volume_directory();
949 let layer_set = volume_directory.store().tree().layer_set();
950 let mut merger = layer_set.merger();
951 let mut iter = volume_directory.iter(&mut merger).await?;
952 while let Some((name, id, _)) = iter.get() {
953 if let Some(store) = object_manager.store(id) {
954 root.record_child(name.to_string(), |n| store.record_data(n));
955 }
956 iter.advance().await?;
957 }
958 Ok(inspector)
959 }
960}
961
962pub enum TxnGuard<'a> {
963 Borrowed(&'a TxnGuard<'a>),
964 Owned(ReadGuard<'static>),
965}
966
967impl TxnGuard<'_> {
968 pub fn fs(&self) -> &Arc<FxFilesystem> {
969 match self {
970 TxnGuard::Borrowed(b) => b.fs(),
971 TxnGuard::Owned(o) => o.fs().unwrap(),
972 }
973 }
974}
975
976#[allow(dead_code)]
978pub struct TruncateGuard<'a>(WriteGuard<'a>);
979
980pub async fn mkfs(device: DeviceHolder) -> Result<DeviceHolder, Error> {
982 let fs = FxFilesystem::new_empty(device).await?;
983 fs.close().await?;
984 Ok(fs.take_device().await)
985}
986
987pub async fn mkfs_with_volume(
991 device: DeviceHolder,
992 volume_name: &str,
993 crypt: Option<Arc<dyn Crypt>>,
994) -> Result<DeviceHolder, Error> {
995 let fs = FxFilesystem::new_empty(device).await?;
996 {
997 let root_volume = root_volume(fs.clone()).await.expect("Open root_volume failed");
1000 root_volume
1001 .new_volume(
1002 volume_name,
1003 NewChildStoreOptions {
1004 options: StoreOptions { crypt, ..StoreOptions::default() },
1005 ..Default::default()
1006 },
1007 )
1008 .await
1009 .expect("Create volume failed");
1010 }
1011 fs.close().await?;
1012 Ok(fs.take_device().await)
1013}
1014
1015struct FsckAfterEveryTransaction {
1016 fs: OnceLock<Weak<FxFilesystem>>,
1017 old_hook: PostCommitHook,
1018}
1019
1020impl FsckAfterEveryTransaction {
1021 fn new(old_hook: PostCommitHook) -> Arc<Self> {
1022 Arc::new(Self { fs: OnceLock::new(), old_hook })
1023 }
1024
1025 async fn run(self: Arc<Self>) {
1026 if let Some(fs) = self.fs.get().and_then(Weak::upgrade) {
1027 let options = FsckOptions {
1028 fail_on_warning: true,
1029 no_lock: true,
1030 quiet: true,
1031 ..Default::default()
1032 };
1033 fsck_with_options(fs.clone(), &options).await.expect("fsck failed");
1034 let object_manager = fs.object_manager();
1035 for store in object_manager.unlocked_stores() {
1036 let store_id = store.store_object_id();
1037 if !object_manager.is_system_store(store_id) {
1038 fsck_volume_with_options(fs.as_ref(), &options, store_id, None)
1039 .await
1040 .expect("fsck_volume_with_options failed");
1041 }
1042 }
1043 }
1044 if let Some(old_hook) = self.old_hook.as_ref() {
1045 old_hook().await;
1046 }
1047 }
1048}
1049
1050#[cfg(test)]
1051mod tests {
1052 use super::{FxFilesystem, FxFilesystemBuilder, FxfsError, SyncOptions};
1053 use crate::fsck::{fsck, fsck_volume};
1054 use crate::log::*;
1055 use crate::lsm_tree::Operation;
1056 use crate::lsm_tree::types::Item;
1057 use crate::object_handle::{
1058 INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle, WriteObjectHandle,
1059 };
1060 use crate::object_store::directory::{Directory, replace_child};
1061 use crate::object_store::journal::JournalOptions;
1062 use crate::object_store::journal::super_block::SuperBlockInstance;
1063 use crate::object_store::transaction::{LockKey, Options, lock_keys};
1064 use crate::object_store::volume::root_volume;
1065 use crate::object_store::{
1066 HandleOptions, NewChildStoreOptions, ObjectDescriptor, ObjectStore, StoreOptions,
1067 };
1068 use crate::range::RangeExt;
1069 use fuchsia_async as fasync;
1070 use fuchsia_sync::Mutex;
1071 use futures::future::join_all;
1072 use futures::stream::{FuturesUnordered, TryStreamExt};
1073 use fxfs_insecure_crypto::new_insecure_crypt;
1074 use rustc_hash::FxHashMap as HashMap;
1075 use std::ops::Range;
1076 use std::sync::Arc;
1077 use std::sync::atomic::{self, AtomicU32};
1078 use std::time::Duration;
1079 use storage_device::DeviceHolder;
1080 use storage_device::fake_device::{self, FakeDevice};
1081 use test_case::test_case;
1082
1083 const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
1084
1085 #[fuchsia::test(threads = 10)]
1086 async fn test_compaction() {
1087 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1088
1089 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1091 let root_store = fs.root_store();
1092 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1093 .await
1094 .expect("open failed");
1095
1096 let mut tasks = Vec::new();
1097 for i in 0..2 {
1098 let mut transaction = fs
1099 .clone()
1100 .new_transaction(
1101 lock_keys![LockKey::object(
1102 root_store.store_object_id(),
1103 root_directory.object_id()
1104 )],
1105 Options::default(),
1106 )
1107 .await
1108 .expect("new_transaction failed");
1109 let handle = root_directory
1110 .create_child_file(&mut transaction, &format!("{}", i))
1111 .await
1112 .expect("create_child_file failed");
1113 transaction.commit().await.expect("commit failed");
1114 tasks.push(fasync::Task::spawn(async move {
1115 const TEST_DATA: &[u8] = b"hello";
1116 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
1117 buf.as_mut_slice().copy_from_slice(TEST_DATA);
1118 for _ in 0..1500 {
1119 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1120 }
1121 }));
1122 }
1123 join_all(tasks).await;
1124 fs.sync(SyncOptions::default()).await.expect("sync failed");
1125
1126 fsck(fs.clone()).await.expect("fsck failed");
1127 fs.close().await.expect("Close failed");
1128 }
1129
1130 #[fuchsia::test]
1131 async fn test_enable_allocations() {
1132 {
1134 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1135 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1136 fs.enable_allocations();
1137 let root_store = fs.root_store();
1138 let root_directory =
1139 Directory::open(&root_store, root_store.root_directory_object_id())
1140 .await
1141 .expect("open failed");
1142 let mut transaction = fs
1143 .clone()
1144 .new_transaction(
1145 lock_keys![LockKey::object(
1146 root_store.store_object_id(),
1147 root_directory.object_id()
1148 )],
1149 Options::default(),
1150 )
1151 .await
1152 .expect("new_transaction failed");
1153 root_directory
1154 .create_child_file(&mut transaction, "test")
1155 .await
1156 .expect("create_child_file failed");
1157 transaction.commit().await.expect("commit failed");
1158 fs.close().await.expect("close failed");
1159 }
1160
1161 {
1163 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1164 let fs = FxFilesystemBuilder::new()
1165 .format(true)
1166 .image_builder_mode(Some(SuperBlockInstance::A))
1167 .open(device)
1168 .await
1169 .expect("open failed");
1170 let root_store = fs.root_store();
1171 let root_directory =
1172 Directory::open(&root_store, root_store.root_directory_object_id())
1173 .await
1174 .expect("open failed");
1175
1176 let mut transaction = fs
1177 .clone()
1178 .new_transaction(
1179 lock_keys![LockKey::object(
1180 root_store.store_object_id(),
1181 root_directory.object_id()
1182 )],
1183 Options::default(),
1184 )
1185 .await
1186 .expect("new_transaction failed");
1187 let handle = root_directory
1188 .create_child_file(&mut transaction, "test_fail")
1189 .await
1190 .expect("create_child_file failed");
1191 transaction.commit().await.expect("commit failed");
1192
1193 assert!(
1195 FxfsError::Unavailable
1196 .matches(&handle.allocate(0..4096).await.expect_err("allocate should fail"))
1197 );
1198
1199 fs.enable_allocations();
1201 handle.allocate(0..4096).await.expect("allocate should work after enable_allocations");
1202
1203 fs.finalize().await.expect("finalize failed");
1206 fs.close().await.expect("close failed");
1207 }
1208 }
1212
1213 #[fuchsia::test(threads = 10)]
1214 async fn test_replay_is_identical() {
1215 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1216 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1217
1218 fs.close().await.expect("close failed");
1221 let device = fs.take_device().await;
1222 device.reopen(false);
1223
1224 struct Mutations<K, V>(Mutex<Vec<(Operation, Item<K, V>)>>);
1225
1226 impl<K: Clone, V: Clone> Mutations<K, V> {
1227 fn new() -> Self {
1228 Mutations(Mutex::new(Vec::new()))
1229 }
1230
1231 fn push(&self, operation: Operation, item: &Item<K, V>) {
1232 self.0.lock().push((operation, item.clone()));
1233 }
1234 }
1235
1236 let open_fs = |device,
1237 object_mutations: Arc<Mutex<HashMap<_, _>>>,
1238 allocator_mutations: Arc<Mutations<_, _>>| async {
1239 FxFilesystemBuilder::new()
1240 .journal_options(JournalOptions { reclaim_size: u64::MAX, ..Default::default() })
1241 .on_new_allocator(move |allocator| {
1242 let allocator_mutations = allocator_mutations.clone();
1243 allocator.tree().set_mutation_callback(Some(Box::new(move |op, item| {
1244 allocator_mutations.push(op, item)
1245 })));
1246 })
1247 .on_new_store(move |store| {
1248 let mutations = Arc::new(Mutations::new());
1249 object_mutations.lock().insert(store.store_object_id(), mutations.clone());
1250 store.tree().set_mutation_callback(Some(Box::new(move |op, item| {
1251 mutations.push(op, item)
1252 })));
1253 })
1254 .open(device)
1255 .await
1256 .expect("open failed")
1257 };
1258
1259 let allocator_mutations = Arc::new(Mutations::new());
1260 let object_mutations = Arc::new(Mutex::new(HashMap::default()));
1261 let fs = open_fs(device, object_mutations.clone(), allocator_mutations.clone()).await;
1262
1263 let root_store = fs.root_store();
1264 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1265 .await
1266 .expect("open failed");
1267
1268 let mut transaction = fs
1269 .clone()
1270 .new_transaction(
1271 lock_keys![LockKey::object(
1272 root_store.store_object_id(),
1273 root_directory.object_id()
1274 )],
1275 Options::default(),
1276 )
1277 .await
1278 .expect("new_transaction failed");
1279 let object = root_directory
1280 .create_child_file(&mut transaction, "test")
1281 .await
1282 .expect("create_child_file failed");
1283 transaction.commit().await.expect("commit failed");
1284
1285 let buf = object.allocate_buffer(10000).await;
1287 object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1288
1289 object.write_or_append(Some(5000), buf.as_ref()).await.expect("write failed");
1291
1292 object.truncate(3000).await.expect("truncate failed");
1294
1295 let mut transaction = fs
1297 .clone()
1298 .new_transaction(
1299 lock_keys![
1300 LockKey::object(root_store.store_object_id(), root_directory.object_id()),
1301 LockKey::object(root_store.store_object_id(), object.object_id()),
1302 ],
1303 Options::default(),
1304 )
1305 .await
1306 .expect("new_transaction failed");
1307
1308 replace_child(&mut transaction, None, (&root_directory, "test"))
1309 .await
1310 .expect("replace_child failed");
1311
1312 transaction.commit().await.expect("commit failed");
1313
1314 root_store
1316 .tombstone_object(object.object_id(), Options::default())
1317 .await
1318 .expect("tombstone failed");
1319
1320 fs.close().await.expect("close failed");
1322
1323 let metadata_reservation_amount = fs.object_manager().metadata_reservation().amount();
1324
1325 let device = fs.take_device().await;
1326 device.reopen(false);
1327
1328 let replayed_object_mutations = Arc::new(Mutex::new(HashMap::default()));
1329 let replayed_allocator_mutations = Arc::new(Mutations::new());
1330 let fs = open_fs(
1331 device,
1332 replayed_object_mutations.clone(),
1333 replayed_allocator_mutations.clone(),
1334 )
1335 .await;
1336
1337 let m1 = object_mutations.lock();
1338 let m2 = replayed_object_mutations.lock();
1339 assert_eq!(m1.len(), m2.len());
1340 for (store_id, mutations) in &*m1 {
1341 let mutations = mutations.0.lock();
1342 let replayed = m2.get(&store_id).expect("Found unexpected store").0.lock();
1343 assert_eq!(mutations.len(), replayed.len());
1344 for ((op1, i1), (op2, i2)) in mutations.iter().zip(replayed.iter()) {
1345 assert_eq!(op1, op2);
1346 assert_eq!(i1.key, i2.key);
1347 assert_eq!(i1.value, i2.value);
1348 assert_eq!(i1.sequence, i2.sequence);
1349 }
1350 }
1351
1352 let a1 = allocator_mutations.0.lock();
1353 let a2 = replayed_allocator_mutations.0.lock();
1354 assert_eq!(a1.len(), a2.len());
1355 for ((op1, i1), (op2, i2)) in a1.iter().zip(a2.iter()) {
1356 assert_eq!(op1, op2);
1357 assert_eq!(i1.key, i2.key);
1358 assert_eq!(i1.value, i2.value);
1359 assert_eq!(i1.sequence, i2.sequence);
1360 }
1361
1362 assert_eq!(
1363 fs.object_manager().metadata_reservation().amount(),
1364 metadata_reservation_amount
1365 );
1366 }
1367
1368 #[fuchsia::test]
1369 async fn test_max_in_flight_transactions() {
1370 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1371 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1372
1373 let transactions = FuturesUnordered::new();
1374 for _ in 0..super::MAX_IN_FLIGHT_TRANSACTIONS {
1375 transactions.push(fs.clone().new_transaction(lock_keys![], Options::default()));
1376 }
1377 let mut transactions: Vec<_> = transactions.try_collect().await.unwrap();
1378
1379 let mut fut = std::pin::pin!(fs.clone().new_transaction(lock_keys![], Options::default()));
1381 assert!(futures::poll!(&mut fut).is_pending());
1382
1383 transactions.pop();
1385
1386 assert!(futures::poll!(&mut fut).is_ready());
1387 }
1388
1389 #[fuchsia::test(threads = 10)]
1391 async fn test_continuously_trim() {
1392 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1393 let fs = FxFilesystemBuilder::new()
1394 .trim_config(Some((Duration::ZERO, Duration::ZERO)))
1395 .format(true)
1396 .open(device)
1397 .await
1398 .expect("open failed");
1399 fasync::Timer::new(Duration::from_millis(10)).await;
1401
1402 let root_store = fs.root_store();
1405 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1406 .await
1407 .expect("open failed");
1408 for _ in 0..100 {
1409 let mut transaction = fs
1410 .clone()
1411 .new_transaction(
1412 lock_keys![LockKey::object(
1413 root_store.store_object_id(),
1414 root_directory.object_id()
1415 )],
1416 Options::default(),
1417 )
1418 .await
1419 .expect("new_transaction failed");
1420 let object = root_directory
1421 .create_child_file(&mut transaction, "test")
1422 .await
1423 .expect("create_child_file failed");
1424 transaction.commit().await.expect("commit failed");
1425
1426 {
1427 let buf = object.allocate_buffer(1024).await;
1428 object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1429 }
1430 std::mem::drop(object);
1431
1432 let mut transaction = root_directory
1433 .acquire_context_for_replace(None, "test", true)
1434 .await
1435 .expect("acquire_context_for_replace failed")
1436 .transaction;
1437 replace_child(&mut transaction, None, (&root_directory, "test"))
1438 .await
1439 .expect("replace_child failed");
1440 transaction.commit().await.expect("commit failed");
1441 }
1442 fs.close().await.expect("close failed");
1443 }
1444
1445 #[test_case(true; "test power fail with barriers")]
1446 #[test_case(false; "test power fail with checksums")]
1447 #[fuchsia::test]
1448 async fn test_power_fail(barriers_enabled: bool) {
1449 for _ in 0..10 {
1452 let (store_id, device, test_file_object_id) = {
1453 let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
1454 let fs = if barriers_enabled {
1455 FxFilesystemBuilder::new()
1456 .barriers_enabled(true)
1457 .format(true)
1458 .open(device)
1459 .await
1460 .expect("new filesystem failed")
1461 } else {
1462 FxFilesystem::new_empty(device).await.expect("new_empty failed")
1463 };
1464 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
1465
1466 fs.sync(SyncOptions { flush_device: true, ..SyncOptions::default() })
1467 .await
1468 .expect("sync failed");
1469
1470 let store = root_volume
1471 .new_volume(
1472 "test",
1473 NewChildStoreOptions {
1474 options: StoreOptions {
1475 crypt: Some(Arc::new(new_insecure_crypt())),
1476 ..StoreOptions::default()
1477 },
1478 ..Default::default()
1479 },
1480 )
1481 .await
1482 .expect("new_volume failed");
1483 let root_directory = Directory::open(&store, store.root_directory_object_id())
1484 .await
1485 .expect("open failed");
1486
1487 async fn create_files(store: &Arc<ObjectStore>, prefix: &str) {
1489 let fs = store.filesystem();
1490 let root_directory = Directory::open(store, store.root_directory_object_id())
1491 .await
1492 .expect("open failed");
1493 for i in 0..100 {
1494 let mut transaction = fs
1495 .clone()
1496 .new_transaction(
1497 lock_keys![LockKey::object(
1498 store.store_object_id(),
1499 store.root_directory_object_id()
1500 )],
1501 Options::default(),
1502 )
1503 .await
1504 .expect("new_transaction failed");
1505 root_directory
1506 .create_child_file(&mut transaction, &format!("{prefix} {i}"))
1507 .await
1508 .expect("create_child_file failed");
1509 transaction.commit().await.expect("commit failed");
1510 }
1511 }
1512
1513 create_files(&store, "A").await;
1515
1516 let mut transaction = fs
1519 .clone()
1520 .new_transaction(
1521 lock_keys![LockKey::object(
1522 store.store_object_id(),
1523 store.root_directory_object_id()
1524 )],
1525 Options::default(),
1526 )
1527 .await
1528 .expect("new_transaction failed");
1529 let object = root_directory
1530 .create_child_file(&mut transaction, "test")
1531 .await
1532 .expect("create_child_file failed");
1533 transaction.commit().await.expect("commit failed");
1534
1535 let mut transaction =
1536 object.new_transaction().await.expect("new_transaction failed");
1537 let mut buffer = object.allocate_buffer(4096).await;
1538 buffer.as_mut_slice().fill(0xed);
1539 object
1540 .txn_write(&mut transaction, 0, buffer.as_ref())
1541 .await
1542 .expect("txn_write failed");
1543 transaction.commit().await.expect("commit failed");
1544
1545 create_files(&store, "B").await;
1547
1548 fs.sync(SyncOptions::default()).await.expect("sync failed");
1551
1552 fasync::Timer::new(Duration::from_millis(10)).await;
1558
1559 (
1560 store.store_object_id(),
1561 fs.device().snapshot().expect("snapshot failed"),
1562 object.object_id(),
1563 )
1564 };
1565
1566 device
1569 .discard_random_since_last_flush()
1570 .expect("discard_random_since_last_flush failed");
1571
1572 let fs = FxFilesystem::open(device).await.expect("open failed");
1573 fsck(fs.clone()).await.expect("fsck failed");
1574
1575 let mut check_test_file = false;
1576
1577 let object_id = if fs.object_manager().store(store_id).is_some() {
1580 fsck_volume(&fs, store_id, Some(Arc::new(new_insecure_crypt())))
1581 .await
1582 .expect("fsck_volume failed");
1583
1584 let store = root_volume(fs.clone())
1588 .await
1589 .expect("root_volume failed")
1590 .volume(
1591 "test",
1592 StoreOptions {
1593 crypt: Some(Arc::new(new_insecure_crypt())),
1594 ..StoreOptions::default()
1595 },
1596 )
1597 .await
1598 .expect("volume failed");
1599
1600 let root_directory = Directory::open(&store, store.root_directory_object_id())
1601 .await
1602 .expect("open failed");
1603
1604 let mut transaction = fs
1605 .clone()
1606 .new_transaction(
1607 lock_keys![LockKey::object(
1608 store.store_object_id(),
1609 store.root_directory_object_id()
1610 )],
1611 Options::default(),
1612 )
1613 .await
1614 .expect("new_transaction failed");
1615 let object = root_directory
1616 .create_child_file(&mut transaction, &format!("C"))
1617 .await
1618 .expect("create_child_file failed");
1619 transaction.commit().await.expect("commit failed");
1620
1621 if let Ok(test_file) = ObjectStore::open_object(
1623 &store,
1624 test_file_object_id,
1625 HandleOptions::default(),
1626 None,
1627 )
1628 .await
1629 {
1630 let mut buffer = test_file.allocate_buffer(4096).await;
1632 let bytes = test_file.read(0, buffer.as_mut()).await.expect("read failed");
1633 if bytes == 4096 {
1634 let expected = [0xed; 4096];
1635 assert_eq!(buffer.as_slice(), &expected);
1636 } else {
1637 assert_eq!(bytes, 0);
1639 }
1640
1641 let mut transaction =
1643 test_file.new_transaction().await.expect("new_transaction failed");
1644 buffer.as_mut_slice().fill(0x37);
1645 test_file
1646 .txn_write(&mut transaction, 0, buffer.as_ref())
1647 .await
1648 .expect("txn_write failed");
1649 transaction.commit().await.expect("commit failed");
1650 check_test_file = true;
1651 }
1652
1653 object.object_id()
1654 } else {
1655 INVALID_OBJECT_ID
1656 };
1657
1658 fs.close().await.expect("close failed");
1660 let device = fs.take_device().await;
1661 device.reopen(false);
1662
1663 let fs = FxFilesystem::open(device).await.expect("open failed");
1664 fsck(fs.clone()).await.expect("fsck failed");
1665
1666 if object_id != INVALID_OBJECT_ID {
1669 fsck_volume(&fs, store_id, Some(Arc::new(new_insecure_crypt())))
1670 .await
1671 .expect("fsck_volume failed");
1672
1673 let store = root_volume(fs.clone())
1674 .await
1675 .expect("root_volume failed")
1676 .volume(
1677 "test",
1678 StoreOptions {
1679 crypt: Some(Arc::new(new_insecure_crypt())),
1680 ..StoreOptions::default()
1681 },
1682 )
1683 .await
1684 .expect("volume failed");
1685 ObjectStore::open_object(&store, object_id, HandleOptions::default(), None)
1687 .await
1688 .expect("open_object failed");
1689
1690 if check_test_file {
1692 info!("Checking test file for modification");
1693 let test_file = ObjectStore::open_object(
1694 &store,
1695 test_file_object_id,
1696 HandleOptions::default(),
1697 None,
1698 )
1699 .await
1700 .expect("open_object failed");
1701 let mut buffer = test_file.allocate_buffer(4096).await;
1702 assert_eq!(
1703 test_file.read(0, buffer.as_mut()).await.expect("read failed"),
1704 4096
1705 );
1706 let expected = [0x37; 4096];
1707 assert_eq!(buffer.as_slice(), &expected);
1708 }
1709 }
1710
1711 fs.close().await.expect("close failed");
1712 }
1713 }
1714
1715 #[fuchsia::test]
1716 async fn test_barrier_not_emitted_when_transaction_has_no_data() {
1717 let barrier_count = Arc::new(AtomicU32::new(0));
1718
1719 struct Observer(Arc<AtomicU32>);
1720
1721 impl fake_device::Observer for Observer {
1722 fn barrier(&self) {
1723 self.0.fetch_add(1, atomic::Ordering::Relaxed);
1724 }
1725 }
1726
1727 let mut fake_device = FakeDevice::new(8192, 4096);
1728 fake_device.set_observer(Box::new(Observer(barrier_count.clone())));
1729 let device = DeviceHolder::new(fake_device);
1730 let fs = FxFilesystemBuilder::new()
1731 .barriers_enabled(true)
1732 .format(true)
1733 .open(device)
1734 .await
1735 .expect("new filesystem failed");
1736
1737 {
1738 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1739 root_vol
1740 .new_volume(
1741 "test",
1742 NewChildStoreOptions {
1743 options: StoreOptions {
1744 crypt: Some(Arc::new(new_insecure_crypt())),
1745 ..StoreOptions::default()
1746 },
1747 ..NewChildStoreOptions::default()
1748 },
1749 )
1750 .await
1751 .expect("there is no test volume");
1752 fs.close().await.expect("close failed");
1753 }
1754 let device = fs.take_device().await;
1757 device.reopen(false);
1758 let fs = FxFilesystemBuilder::new()
1759 .barriers_enabled(true)
1760 .open(device)
1761 .await
1762 .expect("new filesystem failed");
1763 let expected_barrier_count = barrier_count.load(atomic::Ordering::Relaxed);
1764
1765 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1766 let store = root_vol
1767 .volume(
1768 "test",
1769 StoreOptions {
1770 crypt: Some(Arc::new(new_insecure_crypt())),
1771 ..StoreOptions::default()
1772 },
1773 )
1774 .await
1775 .expect("there is no test volume");
1776
1777 let fs = store.filesystem();
1779 let root_directory =
1780 Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
1781 for i in 0..100 {
1782 let mut transaction = fs
1783 .clone()
1784 .new_transaction(
1785 lock_keys![LockKey::object(
1786 store.store_object_id(),
1787 store.root_directory_object_id()
1788 )],
1789 Options::default(),
1790 )
1791 .await
1792 .expect("new_transaction failed");
1793 root_directory
1794 .create_child_file(&mut transaction, &format!("A {i}"))
1795 .await
1796 .expect("create_child_file failed");
1797 transaction.commit().await.expect("commit failed");
1798 }
1799
1800 fs.close().await.expect("close failed");
1802 assert_eq!(expected_barrier_count, barrier_count.load(atomic::Ordering::Relaxed));
1804 }
1805
1806 #[fuchsia::test]
1807 async fn test_barrier_emitted_when_transaction_includes_data() {
1808 let barrier_count = Arc::new(AtomicU32::new(0));
1809
1810 struct Observer(Arc<AtomicU32>);
1811
1812 impl fake_device::Observer for Observer {
1813 fn barrier(&self) {
1814 self.0.fetch_add(1, atomic::Ordering::Relaxed);
1815 }
1816 }
1817
1818 let mut fake_device = FakeDevice::new(8192, 4096);
1819 fake_device.set_observer(Box::new(Observer(barrier_count.clone())));
1820 let device = DeviceHolder::new(fake_device);
1821 let fs = FxFilesystemBuilder::new()
1822 .barriers_enabled(true)
1823 .format(true)
1824 .open(device)
1825 .await
1826 .expect("new filesystem failed");
1827
1828 {
1829 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1830 root_vol
1831 .new_volume(
1832 "test",
1833 NewChildStoreOptions {
1834 options: StoreOptions {
1835 crypt: Some(Arc::new(new_insecure_crypt())),
1836 ..StoreOptions::default()
1837 },
1838 ..NewChildStoreOptions::default()
1839 },
1840 )
1841 .await
1842 .expect("there is no test volume");
1843 fs.close().await.expect("close failed");
1844 }
1845 let device = fs.take_device().await;
1848 device.reopen(false);
1849 let fs = FxFilesystemBuilder::new()
1850 .barriers_enabled(true)
1851 .open(device)
1852 .await
1853 .expect("new filesystem failed");
1854 let expected_barrier_count = barrier_count.load(atomic::Ordering::Relaxed);
1855
1856 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1857 let store = root_vol
1858 .volume(
1859 "test",
1860 StoreOptions {
1861 crypt: Some(Arc::new(new_insecure_crypt())),
1862 ..StoreOptions::default()
1863 },
1864 )
1865 .await
1866 .expect("there is no test volume");
1867
1868 let fs: Arc<FxFilesystem> = store.filesystem();
1870 let root_directory =
1871 Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
1872
1873 let mut transaction = fs
1874 .clone()
1875 .new_transaction(
1876 lock_keys![LockKey::object(
1877 store.store_object_id(),
1878 store.root_directory_object_id()
1879 )],
1880 Options::default(),
1881 )
1882 .await
1883 .expect("new_transaction failed");
1884 let object = root_directory
1885 .create_child_file(&mut transaction, "test")
1886 .await
1887 .expect("create_child_file failed");
1888 transaction.commit().await.expect("commit failed");
1889
1890 let mut transaction = object.new_transaction().await.expect("new_transaction failed");
1891 let mut buffer = object.allocate_buffer(4096).await;
1892 buffer.as_mut_slice().fill(0xed);
1893 object.txn_write(&mut transaction, 0, buffer.as_ref()).await.expect("txn_write failed");
1894 transaction.commit().await.expect("commit failed");
1895
1896 fs.close().await.expect("close failed");
1898 assert!(expected_barrier_count < barrier_count.load(atomic::Ordering::Relaxed));
1900 }
1901
1902 #[test_case(true; "fail when original filesystem has barriers enabled")]
1903 #[test_case(false; "fail when original filesystem has barriers disabled")]
1904 #[fuchsia::test]
1905 async fn test_switching_barrier_mode_on_existing_filesystem(original_barrier_mode: bool) {
1906 let crypt = Some(Arc::new(new_insecure_crypt()) as Arc<dyn fxfs_crypto::Crypt>);
1907 let fake_device = FakeDevice::new(8192, 4096);
1908 let device = DeviceHolder::new(fake_device);
1909 let fs: super::OpenFxFilesystem = FxFilesystemBuilder::new()
1910 .barriers_enabled(original_barrier_mode)
1911 .format(true)
1912 .open(device)
1913 .await
1914 .expect("new filesystem failed");
1915
1916 {
1918 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1919 let store = root_vol
1920 .new_volume(
1921 "test",
1922 NewChildStoreOptions {
1923 options: StoreOptions { crypt: crypt.clone(), ..Default::default() },
1924 ..Default::default()
1925 },
1926 )
1927 .await
1928 .expect("creating test volume");
1929 let root_dir = Directory::open(&store, store.root_directory_object_id())
1930 .await
1931 .expect("open failed");
1932 let mut transaction = fs
1933 .clone()
1934 .new_transaction(
1935 lock_keys![LockKey::object(
1936 store.store_object_id(),
1937 store.root_directory_object_id()
1938 )],
1939 Default::default(),
1940 )
1941 .await
1942 .expect("new_transaction failed");
1943 let object = root_dir
1944 .create_child_file(&mut transaction, "file")
1945 .await
1946 .expect("create_child_file failed");
1947 transaction.commit().await.expect("commit failed");
1948 let mut buffer = object.allocate_buffer(4096).await;
1949 buffer.as_mut_slice().fill(0xA7);
1950 let new_size = object.write_or_append(None, buffer.as_ref()).await.unwrap();
1951 assert_eq!(new_size, 4096);
1952 }
1953
1954 fs.close().await.expect("close failed");
1956 let device = fs.take_device().await;
1957 device.reopen(false);
1958 let fs = FxFilesystemBuilder::new()
1959 .barriers_enabled(!original_barrier_mode)
1960 .open(device)
1961 .await
1962 .expect("new filesystem failed");
1963 {
1964 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1965 let store = root_vol
1966 .volume("test", StoreOptions { crypt: crypt.clone(), ..Default::default() })
1967 .await
1968 .expect("opening test volume");
1969 let root_dir = Directory::open(&store, store.root_directory_object_id())
1970 .await
1971 .expect("open failed");
1972 let (object_id, _, _) =
1973 root_dir.lookup("file").await.expect("lookup failed").expect("missing file");
1974 let test_file = ObjectStore::open_object(&store, object_id, Default::default(), None)
1975 .await
1976 .expect("open failed");
1977 let mut buffer = test_file.allocate_buffer(4096).await;
1979 buffer.as_mut_slice().fill(0xA8);
1980 let new_size = test_file.write_or_append(None, buffer.as_ref()).await.unwrap();
1981 assert_eq!(new_size, 8192);
1982 }
1983
1984 fs.close().await.expect("close failed");
1987 let device = fs.take_device().await;
1988 device.reopen(false);
1989 let fs = FxFilesystemBuilder::new()
1990 .barriers_enabled(original_barrier_mode)
1991 .open(device)
1992 .await
1993 .expect("new filesystem failed");
1994 {
1995 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1996 let store = root_vol
1997 .volume("test", StoreOptions { crypt: crypt.clone(), ..Default::default() })
1998 .await
1999 .expect("opening test volume");
2000 let root_dir = Directory::open(&store, store.root_directory_object_id())
2001 .await
2002 .expect("open failed");
2003 let (object_id, _, _) =
2004 root_dir.lookup("file").await.expect("lookup failed").expect("missing file");
2005 let test_file = ObjectStore::open_object(&store, object_id, Default::default(), None)
2006 .await
2007 .expect("open failed");
2008 let mut buffer = test_file.allocate_buffer(8192).await;
2009 assert_eq!(
2010 test_file.read(0, buffer.as_mut()).await.expect("read failed"),
2011 8192,
2012 "short read"
2013 );
2014 assert_eq!(buffer.as_slice()[0..4096], [0xA7; 4096]);
2015 assert_eq!(buffer.as_slice()[4096..8192], [0xA8; 4096]);
2016 }
2017 fs.close().await.expect("close failed");
2018 }
2019
2020 #[fuchsia::test]
2021 async fn test_image_builder_mode_no_early_writes() {
2022 const BLOCK_SIZE: u32 = 4096;
2023 let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
2024 device.reopen(true);
2025 let fs = FxFilesystemBuilder::new()
2026 .format(true)
2027 .image_builder_mode(Some(SuperBlockInstance::A))
2028 .open(device)
2029 .await
2030 .expect("open failed");
2031 fs.enable_allocations();
2032 fs.close().await.expect("closed");
2035 }
2036
2037 #[fuchsia::test]
2038 async fn test_image_builder_mode() {
2039 const BLOCK_SIZE: u32 = 4096;
2040 const EXISTING_FILE_RANGE: Range<u64> = 4096 * 1024..4096 * 1025;
2041 let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
2042
2043 {
2045 let mut write_buf =
2046 device.allocate_buffer(EXISTING_FILE_RANGE.length().unwrap() as usize).await;
2047 write_buf.as_mut_slice().fill(0xf0);
2048 device.write(EXISTING_FILE_RANGE.start, write_buf.as_ref()).await.expect("write");
2049 }
2050
2051 device.reopen(true);
2052
2053 let device = {
2054 let fs = FxFilesystemBuilder::new()
2055 .format(true)
2056 .image_builder_mode(Some(SuperBlockInstance::B))
2057 .open(device)
2058 .await
2059 .expect("open failed");
2060 fs.enable_allocations();
2061 {
2062 let root_store = fs.root_store();
2063 let root_directory =
2064 Directory::open(&root_store, root_store.root_directory_object_id())
2065 .await
2066 .expect("open failed");
2067 let handle;
2069 {
2070 let mut transaction = fs
2071 .clone()
2072 .new_transaction(
2073 lock_keys![LockKey::object(
2074 root_directory.store().store_object_id(),
2075 root_directory.object_id()
2076 )],
2077 Options::default(),
2078 )
2079 .await
2080 .expect("new transaction");
2081 handle = root_directory
2082 .create_child_file(&mut transaction, "test")
2083 .await
2084 .expect("create file");
2085 handle.extend(&mut transaction, EXISTING_FILE_RANGE).await.expect("extend");
2086 transaction.commit().await.expect("commit");
2087 }
2088 }
2089 fs.device().reopen(false);
2090 fs.finalize().await.expect("finalize");
2091 fs.close().await.expect("close");
2092 fs.take_device().await
2093 };
2094 device.reopen(false);
2095 let fs = FxFilesystem::open(device).await.expect("open failed");
2096 fsck(fs.clone()).await.expect("fsck failed");
2097
2098 let root_store = fs.root_store();
2100 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
2101 .await
2102 .expect("open failed");
2103 let (object_id, descriptor, _) =
2104 root_directory.lookup("test").await.expect("lookup failed").unwrap();
2105 assert_eq!(descriptor, ObjectDescriptor::File);
2106 let test_file =
2107 ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
2108 .await
2109 .expect("open failed");
2110 let mut read_buf =
2111 test_file.allocate_buffer(EXISTING_FILE_RANGE.length().unwrap() as usize).await;
2112 test_file.read(0, read_buf.as_mut()).await.expect("read failed");
2113 assert_eq!(read_buf.as_slice(), [0xf0; 4096]);
2114 fs.close().await.expect("closed");
2115 }
2116
2117 #[fuchsia::test]
2118 async fn test_read_only_mount_on_full_filesystem() {
2119 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2120 let fs =
2121 FxFilesystemBuilder::new().format(true).open(device).await.expect("new_empty failed");
2122 let root_store = fs.root_store();
2123 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
2124 .await
2125 .expect("open failed");
2126
2127 let mut transaction = fs
2128 .clone()
2129 .new_transaction(
2130 lock_keys![LockKey::object(
2131 root_store.store_object_id(),
2132 root_directory.object_id()
2133 )],
2134 Options::default(),
2135 )
2136 .await
2137 .expect("new_transaction failed");
2138 let handle = root_directory
2139 .create_child_file(&mut transaction, "test")
2140 .await
2141 .expect("create_child_file failed");
2142 transaction.commit().await.expect("commit failed");
2143
2144 let mut buf = handle.allocate_buffer(4096).await;
2145 buf.as_mut_slice().fill(0xaa);
2146 loop {
2147 if handle.write_or_append(None, buf.as_ref()).await.is_err() {
2148 break;
2149 }
2150 }
2151
2152 let max_offset = fs.allocator().maximum_offset();
2153 fs.close().await.expect("Close failed");
2154
2155 let device = fs.take_device().await;
2156 device.reopen(false);
2157 let mut buffer = device
2158 .allocate_buffer(
2159 crate::round::round_up(max_offset, TEST_DEVICE_BLOCK_SIZE).unwrap() as usize
2160 )
2161 .await;
2162 device.read(0, buffer.as_mut()).await.expect("read failed");
2163
2164 let device = DeviceHolder::new(
2165 FakeDevice::from_image(&buffer.as_slice()[..], TEST_DEVICE_BLOCK_SIZE)
2166 .expect("from_image failed"),
2167 );
2168 let fs =
2169 FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
2170 fs.close().await.expect("Close failed");
2171 }
2172}