1use crate::errors::FxfsError;
6use crate::fsck::{FsckOptions, fsck_volume_with_options, fsck_with_options};
7use crate::log::*;
8use crate::metrics;
9use crate::object_store::allocator::{Allocator, Hold, Reservation};
10use crate::object_store::directory::Directory;
11use crate::object_store::graveyard::Graveyard;
12use crate::object_store::journal::super_block::{SuperBlockHeader, SuperBlockInstance};
13use crate::object_store::journal::{self, Journal, JournalCheckpoint, JournalOptions};
14use crate::object_store::object_manager::ObjectManager;
15use crate::object_store::transaction::{
16 self, AssocObj, LockKey, LockKeys, LockManager, MetadataReservation, Mutation, ReadGuard,
17 TRANSACTION_METADATA_MAX_AMOUNT, Transaction, WriteGuard, lock_keys,
18};
19use crate::object_store::volume::{VOLUMES_DIRECTORY, root_volume};
20use crate::object_store::{NewChildStoreOptions, ObjectStore, StoreOptions};
21use crate::range::RangeExt;
22use crate::serialized_types::{LATEST_VERSION, Version};
23use anyhow::{Context, Error, anyhow, bail};
24use async_trait::async_trait;
25use event_listener::Event;
26use fuchsia_async as fasync;
27use fuchsia_inspect::{Inspector, LazyNode, NumericProperty as _, UintProperty};
28use fuchsia_sync::Mutex;
29use futures::FutureExt;
30use fxfs_crypto::Crypt;
31use static_assertions::const_assert;
32use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
33use std::sync::{Arc, OnceLock, Weak};
34use storage_device::{Device, DeviceHolder};
35
36pub const MIN_BLOCK_SIZE: u64 = 4096;
37pub const MAX_BLOCK_SIZE: u64 = u16::MAX as u64 + 1;
38
39pub const MAX_FILE_SIZE: u64 = i64::MAX as u64 - 4095;
44const_assert!(9223372036854771712 == MAX_FILE_SIZE);
45
46const MAX_IN_FLIGHT_TRANSACTIONS: u64 = 4;
48
49const TRIM_AFTER_BOOT_TIMER: std::time::Duration = std::time::Duration::from_secs(60 * 60);
53
54const TRIM_INTERVAL_TIMER: std::time::Duration = std::time::Duration::from_secs(60 * 60 * 24);
56
57pub struct Info {
59 pub total_bytes: u64,
60 pub used_bytes: u64,
61}
62
63pub type PostCommitHook =
64 Option<Box<dyn Fn() -> futures::future::BoxFuture<'static, ()> + Send + Sync>>;
65
66pub type PreCommitHook = Option<Box<dyn Fn(&Transaction<'_>) -> Result<(), Error> + Send + Sync>>;
67
68pub struct Options {
69 pub read_only: bool,
71
72 pub roll_metadata_key_byte_count: u64,
76
77 pub pre_commit_hook: PreCommitHook,
80
81 pub post_commit_hook: PostCommitHook,
84
85 pub skip_initial_reap: bool,
88
89 pub trim_config: Option<(std::time::Duration, std::time::Duration)>,
94
95 pub image_builder_mode: Option<SuperBlockInstance>,
98
99 pub inline_crypto_enabled: bool,
106
107 pub barriers_enabled: bool,
112}
113
114impl Default for Options {
115 fn default() -> Self {
116 Options {
117 roll_metadata_key_byte_count: 128 * 1024 * 1024,
118 read_only: false,
119 pre_commit_hook: None,
120 post_commit_hook: None,
121 skip_initial_reap: false,
122 trim_config: Some((TRIM_AFTER_BOOT_TIMER, TRIM_INTERVAL_TIMER)),
123 image_builder_mode: None,
124 inline_crypto_enabled: false,
125 barriers_enabled: false,
126 }
127 }
128}
129
130pub struct ApplyContext<'a, 'b> {
132 pub mode: ApplyMode<'a, 'b>,
134
135 pub checkpoint: JournalCheckpoint,
137}
138
139pub enum ApplyMode<'a, 'b> {
142 Replay,
143 Live(&'a Transaction<'b>),
144}
145
146impl ApplyMode<'_, '_> {
147 pub fn is_replay(&self) -> bool {
148 matches!(self, ApplyMode::Replay)
149 }
150
151 pub fn is_live(&self) -> bool {
152 matches!(self, ApplyMode::Live(_))
153 }
154}
155
156#[async_trait]
159pub trait JournalingObject: Send + Sync {
160 fn apply_mutation(
164 &self,
165 mutation: Mutation,
166 context: &ApplyContext<'_, '_>,
167 assoc_obj: AssocObj<'_>,
168 ) -> Result<(), Error>;
169
170 fn drop_mutation(&self, mutation: Mutation, transaction: &Transaction<'_>);
172
173 async fn flush(&self) -> Result<Version, Error>;
177
178 fn write_mutation(&self, mutation: &Mutation, mut writer: journal::Writer<'_>) {
181 writer.write(mutation.clone());
182 }
183}
184
185#[derive(Default)]
186pub struct SyncOptions<'a> {
187 pub flush_device: bool,
194
195 pub precondition: Option<Box<dyn FnOnce() -> bool + 'a + Send>>,
198}
199
200pub struct OpenFxFilesystem(Arc<FxFilesystem>);
201
202impl OpenFxFilesystem {
203 pub async fn take_device(self) -> DeviceHolder {
206 let fut = self.device.take_when_dropped();
207 std::mem::drop(self);
208 debug_assert_not_too_long!(fut)
209 }
210}
211
212impl From<Arc<FxFilesystem>> for OpenFxFilesystem {
213 fn from(fs: Arc<FxFilesystem>) -> Self {
214 Self(fs)
215 }
216}
217
218impl Drop for OpenFxFilesystem {
219 fn drop(&mut self) {
220 if self.options.image_builder_mode.is_some()
221 && self.journal().image_builder_mode().is_some()
222 {
223 error!("OpenFxFilesystem in image_builder_mode dropped without calling close().");
224 }
225 if !self.options.read_only && !self.closed.load(Ordering::SeqCst) {
226 error!("OpenFxFilesystem dropped without first being closed. Data loss may occur.");
227 }
228 }
229}
230
231impl std::ops::Deref for OpenFxFilesystem {
232 type Target = Arc<FxFilesystem>;
233
234 fn deref(&self) -> &Self::Target {
235 &self.0
236 }
237}
238
239pub struct FxFilesystemBuilder {
240 format: bool,
241 trace: bool,
242 options: Options,
243 journal_options: JournalOptions,
244 on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
245 on_new_store: Option<Box<dyn Fn(&ObjectStore) + Send + Sync>>,
246 fsck_after_every_transaction: bool,
247}
248
249impl FxFilesystemBuilder {
250 pub fn new() -> Self {
251 Self {
252 format: false,
253 trace: false,
254 options: Options::default(),
255 journal_options: JournalOptions::default(),
256 on_new_allocator: None,
257 on_new_store: None,
258 fsck_after_every_transaction: false,
259 }
260 }
261
262 pub fn format(mut self, format: bool) -> Self {
264 self.format = format;
265 self
266 }
267
268 pub fn trace(mut self, trace: bool) -> Self {
270 self.trace = trace;
271 self
272 }
273
274 pub fn read_only(mut self, read_only: bool) -> Self {
277 self.options.read_only = read_only;
278 self
279 }
280
281 pub fn image_builder_mode(mut self, mode: Option<SuperBlockInstance>) -> Self {
288 self.options.image_builder_mode = mode;
289 self
290 }
291
292 pub fn roll_metadata_key_byte_count(mut self, roll_metadata_key_byte_count: u64) -> Self {
294 self.options.roll_metadata_key_byte_count = roll_metadata_key_byte_count;
295 self
296 }
297
298 pub fn pre_commit_hook(
300 mut self,
301 hook: impl Fn(&Transaction<'_>) -> Result<(), Error> + Send + Sync + 'static,
302 ) -> Self {
303 self.options.pre_commit_hook = Some(Box::new(hook));
304 self
305 }
306
307 pub fn post_commit_hook(
310 mut self,
311 hook: impl Fn() -> futures::future::BoxFuture<'static, ()> + Send + Sync + 'static,
312 ) -> Self {
313 self.options.post_commit_hook = Some(Box::new(hook));
314 self
315 }
316
317 pub fn skip_initial_reap(mut self, skip_initial_reap: bool) -> Self {
320 self.options.skip_initial_reap = skip_initial_reap;
321 self
322 }
323
324 pub fn journal_options(mut self, journal_options: JournalOptions) -> Self {
326 self.journal_options = journal_options;
327 self
328 }
329
330 pub fn on_new_allocator(
332 mut self,
333 on_new_allocator: impl Fn(Arc<Allocator>) + Send + Sync + 'static,
334 ) -> Self {
335 self.on_new_allocator = Some(Box::new(on_new_allocator));
336 self
337 }
338
339 pub fn on_new_store(
341 mut self,
342 on_new_store: impl Fn(&ObjectStore) + Send + Sync + 'static,
343 ) -> Self {
344 self.on_new_store = Some(Box::new(on_new_store));
345 self
346 }
347
348 pub fn fsck_after_every_transaction(mut self, fsck_after_every_transaction: bool) -> Self {
350 self.fsck_after_every_transaction = fsck_after_every_transaction;
351 self
352 }
353
354 pub fn trim_config(
355 mut self,
356 delay_and_interval: Option<(std::time::Duration, std::time::Duration)>,
357 ) -> Self {
358 self.options.trim_config = delay_and_interval;
359 self
360 }
361
362 pub fn inline_crypto_enabled(mut self, inline_crypto_enabled: bool) -> Self {
364 self.options.inline_crypto_enabled = inline_crypto_enabled;
365 self
366 }
367
368 pub fn barriers_enabled(mut self, barriers_enabled: bool) -> Self {
371 self.options.barriers_enabled = barriers_enabled;
372 self.journal_options.barriers_enabled = barriers_enabled;
373 self
374 }
375
376 pub async fn open(self, device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
378 let read_only = self.options.read_only;
379 if self.format && read_only {
380 bail!("Cannot initialize a filesystem as read-only");
381 }
382
383 if self.options.inline_crypto_enabled && !self.options.barriers_enabled {
385 bail!("A filesystem using inline encryption requires barriers");
386 }
387
388 let objects = Arc::new(ObjectManager::new(self.on_new_store));
389 let journal = Arc::new(Journal::new(objects.clone(), self.journal_options));
390
391 let image_builder_mode = self.options.image_builder_mode;
392
393 let block_size = std::cmp::max(device.block_size().into(), MIN_BLOCK_SIZE);
394 assert_eq!(block_size % MIN_BLOCK_SIZE, 0);
395 assert!(block_size <= MAX_BLOCK_SIZE, "Max supported block size is 64KiB");
396
397 let mut fsck_after_every_transaction = None;
398 let mut filesystem_options = self.options;
399 if self.fsck_after_every_transaction {
400 let instance =
401 FsckAfterEveryTransaction::new(filesystem_options.post_commit_hook.take());
402 fsck_after_every_transaction = Some(instance.clone());
403 filesystem_options.post_commit_hook =
404 Some(Box::new(move || Box::pin(instance.clone().run())));
405 }
406
407 if !read_only && !self.format {
408 device.flush().await.context("Device flush failed")?;
411 }
412
413 let filesystem = Arc::new_cyclic(|weak: &Weak<FxFilesystem>| {
414 let weak = weak.clone();
415 FxFilesystem {
416 device,
417 block_size,
418 objects: objects.clone(),
419 journal,
420 commit_mutex: futures::lock::Mutex::new(()),
421 lock_manager: LockManager::new(),
422 flush_task: Mutex::new(None),
423 trim_task: Mutex::new(None),
424 closed: AtomicBool::new(true),
425 shutdown_event: Event::new(),
426 trace: self.trace,
427 graveyard: Graveyard::new(objects.clone()),
428 completed_transactions: metrics::detail().create_uint("completed_transactions", 0),
429 options: filesystem_options,
430 in_flight_transactions: AtomicU64::new(0),
431 transaction_limit_event: Event::new(),
432 _stores_node: metrics::register_fs(move || {
433 let weak = weak.clone();
434 Box::pin(async move {
435 if let Some(fs) = weak.upgrade() {
436 fs.populate_stores_node().await
437 } else {
438 Err(anyhow!("Filesystem has been dropped"))
439 }
440 })
441 }),
442 }
443 });
444
445 filesystem.journal().set_image_builder_mode(image_builder_mode);
446
447 filesystem.journal.set_trace(self.trace);
448 if self.format {
449 filesystem.journal.init_empty(filesystem.clone()).await?;
450 if image_builder_mode.is_none() {
451 filesystem.journal.init_superblocks().await?;
454
455 filesystem.graveyard.clone().reap_async();
457 }
458
459 let root_store = filesystem.root_store();
461 root_store.set_trace(self.trace);
462 let root_directory =
463 Directory::open(&root_store, root_store.root_directory_object_id())
464 .await
465 .context("Unable to open root volume directory")?;
466 let mut transaction = filesystem
467 .clone()
468 .new_transaction(
469 lock_keys![LockKey::object(
470 root_store.store_object_id(),
471 root_directory.object_id()
472 )],
473 transaction::Options::default(),
474 )
475 .await?;
476 let volume_directory =
477 root_directory.create_child_dir(&mut transaction, VOLUMES_DIRECTORY).await?;
478 transaction.commit().await?;
479 objects.set_volume_directory(volume_directory);
480 } else {
481 filesystem
482 .journal
483 .replay(filesystem.clone(), self.on_new_allocator)
484 .await
485 .context("Journal replay failed")?;
486 filesystem.root_store().set_trace(self.trace);
487
488 if !read_only {
489 for store in objects.unlocked_stores() {
493 filesystem.graveyard.initial_reap(&store).await?;
494 }
495 }
496 }
497
498 if let Some(fsck_after_every_transaction) = fsck_after_every_transaction {
500 fsck_after_every_transaction
501 .fs
502 .set(Arc::downgrade(&filesystem))
503 .unwrap_or_else(|_| unreachable!());
504 }
505
506 filesystem.closed.store(false, Ordering::SeqCst);
507
508 if !read_only && image_builder_mode.is_none() {
509 filesystem.graveyard.clone().reap_async();
511
512 if let Some((delay, interval)) = filesystem.options.trim_config.clone() {
513 filesystem.start_trim_task(delay, interval);
514 }
515 }
516
517 Ok(filesystem.into())
518 }
519}
520
521pub struct FxFilesystem {
522 block_size: u64,
523 objects: Arc<ObjectManager>,
524 journal: Arc<Journal>,
525 commit_mutex: futures::lock::Mutex<()>,
526 lock_manager: LockManager,
527 flush_task: Mutex<Option<fasync::Task<()>>>,
528 trim_task: Mutex<Option<fasync::Task<()>>>,
529 closed: AtomicBool,
530 shutdown_event: Event,
532 trace: bool,
533 graveyard: Arc<Graveyard>,
534 completed_transactions: UintProperty,
535 options: Options,
536
537 in_flight_transactions: AtomicU64,
539
540 transaction_limit_event: Event,
543
544 device: DeviceHolder,
547
548 _stores_node: LazyNode,
550}
551
552#[fxfs_trace::trace]
553impl FxFilesystem {
554 pub async fn new_empty(device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
555 FxFilesystemBuilder::new().format(true).open(device).await
556 }
557
558 pub async fn open(device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
559 FxFilesystemBuilder::new().open(device).await
560 }
561
562 pub fn root_parent_store(&self) -> Arc<ObjectStore> {
563 self.objects.root_parent_store()
564 }
565
566 pub async fn close(&self) -> Result<(), Error> {
567 if self.journal().image_builder_mode().is_some() {
568 self.journal().allocate_journal().await?;
569 self.journal().set_image_builder_mode(None);
570 self.journal().compact().await?;
571 }
572 assert_eq!(self.closed.swap(true, Ordering::SeqCst), false);
573 self.shutdown_event.notify(usize::MAX);
574 debug_assert_not_too_long!(self.graveyard.wait_for_reap());
575 let trim_task = self.trim_task.lock().take();
576 if let Some(task) = trim_task {
577 debug_assert_not_too_long!(task);
578 }
579 self.journal.stop_compactions().await;
580 let sync_status =
581 if self.journal().image_builder_mode().is_some() || self.options().read_only {
582 Ok(None)
583 } else {
584 self.journal.sync(SyncOptions { flush_device: true, ..Default::default() }).await
585 };
586 match &sync_status {
587 Ok(None) => {}
588 Ok(checkpoint) => info!(
589 "Filesystem closed (checkpoint={}, metadata_reservation={:?}, \
590 reservation_required={}, borrowed={})",
591 checkpoint.as_ref().unwrap().0.file_offset,
592 self.object_manager().metadata_reservation(),
593 self.object_manager().required_reservation(),
594 self.object_manager().borrowed_metadata_space(),
595 ),
596 Err(e) => error!(error:? = e; "Failed to sync filesystem; data may be lost"),
597 }
598 self.journal.terminate();
599 let flush_task = self.flush_task.lock().take();
600 if let Some(task) = flush_task {
601 debug_assert_not_too_long!(task);
602 }
603 self.device().close().await.context("Failed to close device")?;
606 sync_status.map(|_| ())
607 }
608
609 pub fn device(&self) -> Arc<dyn Device> {
610 Arc::clone(&self.device)
611 }
612
613 pub fn root_store(&self) -> Arc<ObjectStore> {
614 self.objects.root_store()
615 }
616
617 pub fn allocator(&self) -> Arc<Allocator> {
618 self.objects.allocator()
619 }
620
621 pub fn enable_allocations(&self) {
625 self.allocator().enable_allocations();
626 }
627
628 pub fn object_manager(&self) -> &Arc<ObjectManager> {
629 &self.objects
630 }
631
632 pub fn journal(&self) -> &Arc<Journal> {
633 &self.journal
634 }
635
636 pub async fn sync(&self, options: SyncOptions<'_>) -> Result<(), Error> {
637 self.journal.sync(options).await.map(|_| ())
638 }
639
640 pub fn block_size(&self) -> u64 {
641 self.block_size
642 }
643
644 pub fn get_info(&self) -> Info {
645 Info {
646 total_bytes: self.device.size(),
647 used_bytes: self.object_manager().allocator().get_used_bytes().0,
648 }
649 }
650
651 pub fn super_block_header(&self) -> SuperBlockHeader {
652 self.journal.super_block_header()
653 }
654
655 pub fn graveyard(&self) -> &Arc<Graveyard> {
656 &self.graveyard
657 }
658
659 pub fn trace(&self) -> bool {
660 self.trace
661 }
662
663 pub fn options(&self) -> &Options {
664 &self.options
665 }
666
667 pub async fn txn_guard(self: Arc<Self>) -> TxnGuard<'static> {
674 TxnGuard::Owned(
675 self.lock_manager
676 .read_lock(lock_keys!(LockKey::Filesystem))
677 .await
678 .into_owned(self.clone()),
679 )
680 }
681
682 pub async fn new_transaction<'a>(
683 self: Arc<Self>,
684 locks: LockKeys,
685 options: transaction::Options<'a>,
686 ) -> Result<Transaction<'a>, Error> {
687 let guard = if let Some(guard) = options.txn_guard.as_ref() {
688 TxnGuard::Borrowed(guard)
689 } else {
690 self.txn_guard().await
691 };
692 Transaction::new(guard, options, locks).await
693 }
694
695 #[trace]
696 pub async fn commit_transaction(
697 &self,
698 transaction: &mut Transaction<'_>,
699 callback: &mut (dyn FnMut(u64) + Send),
700 ) -> Result<u64, Error> {
701 if let Some(hook) = self.options.pre_commit_hook.as_ref() {
702 hook(transaction)?;
703 }
704 debug_assert_not_too_long!(self.lock_manager.commit_prepare(&transaction));
705 self.maybe_start_flush_task();
706 let _guard = debug_assert_not_too_long!(self.commit_mutex.lock());
707 let journal_offset = if self.journal().image_builder_mode().is_some() {
708 let journal_checkpoint =
709 JournalCheckpoint { file_offset: 0, checksum: 0, version: LATEST_VERSION };
710 let maybe_mutation = self
711 .object_manager()
712 .apply_transaction(transaction, &journal_checkpoint)
713 .expect("Transactions must not fail in image_builder_mode");
714 if let Some(mutation) = maybe_mutation {
715 assert!(matches!(mutation, Mutation::UpdateBorrowed(_)));
716 }
720 self.object_manager().did_commit_transaction(transaction, &journal_checkpoint, 0);
721 0
722 } else {
723 self.journal.commit(transaction).await?
724 };
725 self.completed_transactions.add(1);
726
727 callback(journal_offset);
732
733 if let Some(hook) = self.options.post_commit_hook.as_ref() {
734 hook().await;
735 }
736
737 Ok(journal_offset)
738 }
739
740 pub fn lock_manager(&self) -> &LockManager {
741 &self.lock_manager
742 }
743
744 pub(crate) fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
745 if !matches!(transaction.metadata_reservation, MetadataReservation::None) {
746 self.sub_transaction();
747 }
748 if let MetadataReservation::Hold(hold_amount) =
750 std::mem::replace(&mut transaction.metadata_reservation, MetadataReservation::None)
751 {
752 let hold = transaction
753 .allocator_reservation
754 .unwrap()
755 .reserve(0)
756 .expect("Zero should always succeed.");
757 hold.add(hold_amount);
758 }
759 self.objects.drop_transaction(transaction);
760 self.lock_manager.drop_transaction(transaction);
761 }
762
763 fn maybe_start_flush_task(&self) {
764 if self.journal.image_builder_mode().is_some() {
765 return;
766 }
767 let mut flush_task = self.flush_task.lock();
768 if flush_task.is_none() {
769 let journal = self.journal.clone();
770 *flush_task = Some(fasync::Task::spawn(journal.flush_task()));
771 }
772 }
773
774 async fn do_trim(&self) -> Result<usize, Error> {
776 const MAX_EXTENTS_PER_BATCH: usize = 8;
777 const MAX_EXTENT_SIZE: usize = 256 * 1024;
778 let mut offset = 0;
779 let mut bytes_trimmed = 0;
780 loop {
781 if self.closed.load(Ordering::Relaxed) {
782 info!("Filesystem is closed, nothing to trim");
783 return Ok(bytes_trimmed);
784 }
785 let allocator = self.allocator();
786 let trimmable_extents =
787 allocator.take_for_trimming(offset, MAX_EXTENT_SIZE, MAX_EXTENTS_PER_BATCH).await?;
788 for device_range in trimmable_extents.extents() {
789 self.device.trim(device_range.clone()).await?;
790 bytes_trimmed += device_range.length()? as usize;
791 }
792 if let Some(device_range) = trimmable_extents.extents().last() {
793 offset = device_range.end;
794 } else {
795 break;
796 }
797 }
798 Ok(bytes_trimmed)
799 }
800
801 fn start_trim_task(
802 self: &Arc<Self>,
803 delay: std::time::Duration,
804 interval: std::time::Duration,
805 ) {
806 if !self.device.supports_trim() {
807 info!("Device does not support trim; not scheduling trimming");
808 return;
809 }
810 let this = self.clone();
811 let mut next_timer = delay;
812 *self.trim_task.lock() = Some(fasync::Task::spawn(async move {
813 loop {
814 let shutdown_listener = this.shutdown_event.listen();
815 if this.closed.load(Ordering::SeqCst) {
821 return;
822 }
823 futures::select!(
824 () = fasync::Timer::new(next_timer.clone()).fuse() => {},
825 () = shutdown_listener.fuse() => return,
826 );
827 let start_time = std::time::Instant::now();
828 let res = this.do_trim().await;
829 let duration = std::time::Instant::now() - start_time;
830 next_timer = interval.clone();
831 match res {
832 Ok(bytes_trimmed) => info!(
833 "Trimmed {bytes_trimmed} bytes in {duration:?}. Next trim in \
834 {next_timer:?}",
835 ),
836 Err(e) => error!(e:?; "Failed to trim"),
837 }
838 }
839 }));
840 }
841
842 pub(crate) async fn reservation_for_transaction<'a>(
843 self: &Arc<Self>,
844 options: transaction::Options<'a>,
845 ) -> Result<(MetadataReservation, Option<&'a Reservation>, Option<Hold<'a>>), Error> {
846 if self.options.image_builder_mode.is_some() {
847 return Ok((MetadataReservation::Borrowed, None, None));
850 }
851 if !options.skip_journal_checks {
852 self.maybe_start_flush_task();
853 self.journal.check_journal_space().await?;
854 }
855
856 let mut hold = None;
870 let metadata_reservation = if options.borrow_metadata_space {
871 MetadataReservation::Borrowed
872 } else {
873 match options.allocator_reservation {
874 Some(reservation) => {
875 hold = Some(
876 reservation
877 .reserve(TRANSACTION_METADATA_MAX_AMOUNT)
878 .ok_or(FxfsError::NoSpace)?,
879 );
880 MetadataReservation::Hold(TRANSACTION_METADATA_MAX_AMOUNT)
881 }
882 None => {
883 let reservation = self
884 .allocator()
885 .reserve(None, TRANSACTION_METADATA_MAX_AMOUNT)
886 .ok_or(FxfsError::NoSpace)?;
887 MetadataReservation::Reservation(reservation)
888 }
889 }
890 };
891 Ok((metadata_reservation, options.allocator_reservation, hold))
892 }
893
894 pub(crate) async fn add_transaction(&self, skip_journal_checks: bool) {
895 if skip_journal_checks {
896 self.in_flight_transactions.fetch_add(1, Ordering::Relaxed);
897 } else {
898 let inc = || {
899 let mut in_flights = self.in_flight_transactions.load(Ordering::Relaxed);
900 while in_flights < MAX_IN_FLIGHT_TRANSACTIONS {
901 match self.in_flight_transactions.compare_exchange_weak(
902 in_flights,
903 in_flights + 1,
904 Ordering::Relaxed,
905 Ordering::Relaxed,
906 ) {
907 Ok(_) => return true,
908 Err(x) => in_flights = x,
909 }
910 }
911 return false;
912 };
913 while !inc() {
914 let listener = self.transaction_limit_event.listen();
915 if inc() {
916 break;
917 }
918 listener.await;
919 }
920 }
921 }
922
923 pub(crate) fn sub_transaction(&self) {
924 let old = self.in_flight_transactions.fetch_sub(1, Ordering::Relaxed);
925 assert!(old != 0);
926 if old <= MAX_IN_FLIGHT_TRANSACTIONS {
927 self.transaction_limit_event.notify(usize::MAX);
928 }
929 }
930
931 pub async fn truncate_guard(&self, store_id: u64, object_id: u64) -> TruncateGuard<'_> {
932 let keys = lock_keys![LockKey::truncate(store_id, object_id,)];
933 TruncateGuard(self.lock_manager().write_lock(keys).await)
934 }
935
936 async fn populate_stores_node(&self) -> Result<Inspector, Error> {
937 let inspector = fuchsia_inspect::Inspector::default();
938 let root = inspector.root();
939 root.record_child("__root", |n| self.root_store().record_data(n));
940 let object_manager = self.object_manager();
941 let volume_directory = object_manager.volume_directory();
942 let layer_set = volume_directory.store().tree().layer_set();
943 let mut merger = layer_set.merger();
944 let mut iter = volume_directory.iter(&mut merger).await?;
945 while let Some((name, id, _)) = iter.get() {
946 if let Some(store) = object_manager.store(id) {
947 root.record_child(name.to_string(), |n| store.record_data(n));
948 }
949 iter.advance().await?;
950 }
951 Ok(inspector)
952 }
953}
954
955pub enum TxnGuard<'a> {
956 Borrowed(&'a TxnGuard<'a>),
957 Owned(ReadGuard<'static>),
958}
959
960impl TxnGuard<'_> {
961 pub fn fs(&self) -> &Arc<FxFilesystem> {
962 match self {
963 TxnGuard::Borrowed(b) => b.fs(),
964 TxnGuard::Owned(o) => o.fs().unwrap(),
965 }
966 }
967}
968
969#[allow(dead_code)]
971pub struct TruncateGuard<'a>(WriteGuard<'a>);
972
973pub async fn mkfs(device: DeviceHolder) -> Result<DeviceHolder, Error> {
975 let fs = FxFilesystem::new_empty(device).await?;
976 fs.close().await?;
977 Ok(fs.take_device().await)
978}
979
980pub async fn mkfs_with_volume(
984 device: DeviceHolder,
985 volume_name: &str,
986 crypt: Option<Arc<dyn Crypt>>,
987) -> Result<DeviceHolder, Error> {
988 let fs = FxFilesystem::new_empty(device).await?;
989 {
990 let root_volume = root_volume(fs.clone()).await.expect("Open root_volume failed");
993 root_volume
994 .new_volume(
995 volume_name,
996 NewChildStoreOptions {
997 options: StoreOptions { crypt, ..StoreOptions::default() },
998 ..Default::default()
999 },
1000 )
1001 .await
1002 .expect("Create volume failed");
1003 }
1004 fs.close().await?;
1005 Ok(fs.take_device().await)
1006}
1007
1008struct FsckAfterEveryTransaction {
1009 fs: OnceLock<Weak<FxFilesystem>>,
1010 old_hook: PostCommitHook,
1011}
1012
1013impl FsckAfterEveryTransaction {
1014 fn new(old_hook: PostCommitHook) -> Arc<Self> {
1015 Arc::new(Self { fs: OnceLock::new(), old_hook })
1016 }
1017
1018 async fn run(self: Arc<Self>) {
1019 if let Some(fs) = self.fs.get().and_then(Weak::upgrade) {
1020 let options = FsckOptions {
1021 fail_on_warning: true,
1022 no_lock: true,
1023 quiet: true,
1024 ..Default::default()
1025 };
1026 fsck_with_options(fs.clone(), &options).await.expect("fsck failed");
1027 let object_manager = fs.object_manager();
1028 for store in object_manager.unlocked_stores() {
1029 let store_id = store.store_object_id();
1030 if !object_manager.is_system_store(store_id) {
1031 fsck_volume_with_options(fs.as_ref(), &options, store_id, None)
1032 .await
1033 .expect("fsck_volume_with_options failed");
1034 }
1035 }
1036 }
1037 if let Some(old_hook) = self.old_hook.as_ref() {
1038 old_hook().await;
1039 }
1040 }
1041}
1042
1043#[cfg(test)]
1044mod tests {
1045 use super::{FxFilesystem, FxFilesystemBuilder, FxfsError, SyncOptions};
1046 use crate::fsck::{fsck, fsck_volume};
1047 use crate::log::*;
1048 use crate::lsm_tree::Operation;
1049 use crate::lsm_tree::types::Item;
1050 use crate::object_handle::{
1051 INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle, WriteObjectHandle,
1052 };
1053 use crate::object_store::directory::{Directory, replace_child};
1054 use crate::object_store::journal::JournalOptions;
1055 use crate::object_store::journal::super_block::SuperBlockInstance;
1056 use crate::object_store::transaction::{LockKey, Options, lock_keys};
1057 use crate::object_store::volume::root_volume;
1058 use crate::object_store::{
1059 HandleOptions, NewChildStoreOptions, ObjectDescriptor, ObjectStore, StoreOptions,
1060 };
1061 use crate::range::RangeExt;
1062 use fuchsia_async as fasync;
1063 use fuchsia_sync::Mutex;
1064 use futures::future::join_all;
1065 use futures::stream::{FuturesUnordered, TryStreamExt};
1066 use fxfs_insecure_crypto::new_insecure_crypt;
1067 use rustc_hash::FxHashMap as HashMap;
1068 use std::ops::Range;
1069 use std::sync::Arc;
1070 use std::sync::atomic::{self, AtomicU32};
1071 use std::time::Duration;
1072 use storage_device::DeviceHolder;
1073 use storage_device::fake_device::{self, FakeDevice};
1074 use test_case::test_case;
1075
1076 const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
1077
1078 #[fuchsia::test(threads = 10)]
1079 async fn test_compaction() {
1080 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1081
1082 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1084 let root_store = fs.root_store();
1085 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1086 .await
1087 .expect("open failed");
1088
1089 let mut tasks = Vec::new();
1090 for i in 0..2 {
1091 let mut transaction = fs
1092 .clone()
1093 .new_transaction(
1094 lock_keys![LockKey::object(
1095 root_store.store_object_id(),
1096 root_directory.object_id()
1097 )],
1098 Options::default(),
1099 )
1100 .await
1101 .expect("new_transaction failed");
1102 let handle = root_directory
1103 .create_child_file(&mut transaction, &format!("{}", i))
1104 .await
1105 .expect("create_child_file failed");
1106 transaction.commit().await.expect("commit failed");
1107 tasks.push(fasync::Task::spawn(async move {
1108 const TEST_DATA: &[u8] = b"hello";
1109 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
1110 buf.as_mut_slice().copy_from_slice(TEST_DATA);
1111 for _ in 0..1500 {
1112 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1113 }
1114 }));
1115 }
1116 join_all(tasks).await;
1117 fs.sync(SyncOptions::default()).await.expect("sync failed");
1118
1119 fsck(fs.clone()).await.expect("fsck failed");
1120 fs.close().await.expect("Close failed");
1121 }
1122
1123 #[fuchsia::test]
1124 async fn test_enable_allocations() {
1125 {
1127 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1128 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1129 fs.enable_allocations();
1130 let root_store = fs.root_store();
1131 let root_directory =
1132 Directory::open(&root_store, root_store.root_directory_object_id())
1133 .await
1134 .expect("open failed");
1135 let mut transaction = fs
1136 .clone()
1137 .new_transaction(
1138 lock_keys![LockKey::object(
1139 root_store.store_object_id(),
1140 root_directory.object_id()
1141 )],
1142 Options::default(),
1143 )
1144 .await
1145 .expect("new_transaction failed");
1146 root_directory
1147 .create_child_file(&mut transaction, "test")
1148 .await
1149 .expect("create_child_file failed");
1150 transaction.commit().await.expect("commit failed");
1151 fs.close().await.expect("close failed");
1152 }
1153
1154 {
1156 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1157 let fs = FxFilesystemBuilder::new()
1158 .format(true)
1159 .image_builder_mode(Some(SuperBlockInstance::A))
1160 .open(device)
1161 .await
1162 .expect("open failed");
1163 let root_store = fs.root_store();
1164 let root_directory =
1165 Directory::open(&root_store, root_store.root_directory_object_id())
1166 .await
1167 .expect("open failed");
1168
1169 let mut transaction = fs
1170 .clone()
1171 .new_transaction(
1172 lock_keys![LockKey::object(
1173 root_store.store_object_id(),
1174 root_directory.object_id()
1175 )],
1176 Options::default(),
1177 )
1178 .await
1179 .expect("new_transaction failed");
1180 let handle = root_directory
1181 .create_child_file(&mut transaction, "test_fail")
1182 .await
1183 .expect("create_child_file failed");
1184 transaction.commit().await.expect("commit failed");
1185
1186 assert!(
1188 FxfsError::Unavailable
1189 .matches(&handle.allocate(0..4096).await.expect_err("allocate should fail"))
1190 );
1191
1192 fs.enable_allocations();
1194 handle.allocate(0..4096).await.expect("allocate should work after enable_allocations");
1195
1196 fs.close().await.expect("close failed");
1200 }
1201 }
1205
1206 #[fuchsia::test(threads = 10)]
1207 async fn test_replay_is_identical() {
1208 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1209 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1210
1211 fs.close().await.expect("close failed");
1214 let device = fs.take_device().await;
1215 device.reopen(false);
1216
1217 struct Mutations<K, V>(Mutex<Vec<(Operation, Item<K, V>)>>);
1218
1219 impl<K: Clone, V: Clone> Mutations<K, V> {
1220 fn new() -> Self {
1221 Mutations(Mutex::new(Vec::new()))
1222 }
1223
1224 fn push(&self, operation: Operation, item: &Item<K, V>) {
1225 self.0.lock().push((operation, item.clone()));
1226 }
1227 }
1228
1229 let open_fs = |device,
1230 object_mutations: Arc<Mutex<HashMap<_, _>>>,
1231 allocator_mutations: Arc<Mutations<_, _>>| async {
1232 FxFilesystemBuilder::new()
1233 .journal_options(JournalOptions { reclaim_size: u64::MAX, ..Default::default() })
1234 .on_new_allocator(move |allocator| {
1235 let allocator_mutations = allocator_mutations.clone();
1236 allocator.tree().set_mutation_callback(Some(Box::new(move |op, item| {
1237 allocator_mutations.push(op, item)
1238 })));
1239 })
1240 .on_new_store(move |store| {
1241 let mutations = Arc::new(Mutations::new());
1242 object_mutations.lock().insert(store.store_object_id(), mutations.clone());
1243 store.tree().set_mutation_callback(Some(Box::new(move |op, item| {
1244 mutations.push(op, item)
1245 })));
1246 })
1247 .open(device)
1248 .await
1249 .expect("open failed")
1250 };
1251
1252 let allocator_mutations = Arc::new(Mutations::new());
1253 let object_mutations = Arc::new(Mutex::new(HashMap::default()));
1254 let fs = open_fs(device, object_mutations.clone(), allocator_mutations.clone()).await;
1255
1256 let root_store = fs.root_store();
1257 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1258 .await
1259 .expect("open failed");
1260
1261 let mut transaction = fs
1262 .clone()
1263 .new_transaction(
1264 lock_keys![LockKey::object(
1265 root_store.store_object_id(),
1266 root_directory.object_id()
1267 )],
1268 Options::default(),
1269 )
1270 .await
1271 .expect("new_transaction failed");
1272 let object = root_directory
1273 .create_child_file(&mut transaction, "test")
1274 .await
1275 .expect("create_child_file failed");
1276 transaction.commit().await.expect("commit failed");
1277
1278 let buf = object.allocate_buffer(10000).await;
1280 object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1281
1282 object.write_or_append(Some(5000), buf.as_ref()).await.expect("write failed");
1284
1285 object.truncate(3000).await.expect("truncate failed");
1287
1288 let mut transaction = fs
1290 .clone()
1291 .new_transaction(
1292 lock_keys![
1293 LockKey::object(root_store.store_object_id(), root_directory.object_id()),
1294 LockKey::object(root_store.store_object_id(), object.object_id()),
1295 ],
1296 Options::default(),
1297 )
1298 .await
1299 .expect("new_transaction failed");
1300
1301 replace_child(&mut transaction, None, (&root_directory, "test"))
1302 .await
1303 .expect("replace_child failed");
1304
1305 transaction.commit().await.expect("commit failed");
1306
1307 root_store
1309 .tombstone_object(object.object_id(), Options::default())
1310 .await
1311 .expect("tombstone failed");
1312
1313 fs.close().await.expect("close failed");
1315
1316 let metadata_reservation_amount = fs.object_manager().metadata_reservation().amount();
1317
1318 let device = fs.take_device().await;
1319 device.reopen(false);
1320
1321 let replayed_object_mutations = Arc::new(Mutex::new(HashMap::default()));
1322 let replayed_allocator_mutations = Arc::new(Mutations::new());
1323 let fs = open_fs(
1324 device,
1325 replayed_object_mutations.clone(),
1326 replayed_allocator_mutations.clone(),
1327 )
1328 .await;
1329
1330 let m1 = object_mutations.lock();
1331 let m2 = replayed_object_mutations.lock();
1332 assert_eq!(m1.len(), m2.len());
1333 for (store_id, mutations) in &*m1 {
1334 let mutations = mutations.0.lock();
1335 let replayed = m2.get(&store_id).expect("Found unexpected store").0.lock();
1336 assert_eq!(mutations.len(), replayed.len());
1337 for ((op1, i1), (op2, i2)) in mutations.iter().zip(replayed.iter()) {
1338 assert_eq!(op1, op2);
1339 assert_eq!(i1.key, i2.key);
1340 assert_eq!(i1.value, i2.value);
1341 assert_eq!(i1.sequence, i2.sequence);
1342 }
1343 }
1344
1345 let a1 = allocator_mutations.0.lock();
1346 let a2 = replayed_allocator_mutations.0.lock();
1347 assert_eq!(a1.len(), a2.len());
1348 for ((op1, i1), (op2, i2)) in a1.iter().zip(a2.iter()) {
1349 assert_eq!(op1, op2);
1350 assert_eq!(i1.key, i2.key);
1351 assert_eq!(i1.value, i2.value);
1352 assert_eq!(i1.sequence, i2.sequence);
1353 }
1354
1355 assert_eq!(
1356 fs.object_manager().metadata_reservation().amount(),
1357 metadata_reservation_amount
1358 );
1359 }
1360
1361 #[fuchsia::test]
1362 async fn test_max_in_flight_transactions() {
1363 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1364 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1365
1366 let transactions = FuturesUnordered::new();
1367 for _ in 0..super::MAX_IN_FLIGHT_TRANSACTIONS {
1368 transactions.push(fs.clone().new_transaction(lock_keys![], Options::default()));
1369 }
1370 let mut transactions: Vec<_> = transactions.try_collect().await.unwrap();
1371
1372 let mut fut = std::pin::pin!(fs.clone().new_transaction(lock_keys![], Options::default()));
1374 assert!(futures::poll!(&mut fut).is_pending());
1375
1376 transactions.pop();
1378
1379 assert!(futures::poll!(&mut fut).is_ready());
1380 }
1381
1382 #[fuchsia::test(threads = 10)]
1384 async fn test_continuously_trim() {
1385 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1386 let fs = FxFilesystemBuilder::new()
1387 .trim_config(Some((Duration::ZERO, Duration::ZERO)))
1388 .format(true)
1389 .open(device)
1390 .await
1391 .expect("open failed");
1392 fasync::Timer::new(Duration::from_millis(10)).await;
1394
1395 let root_store = fs.root_store();
1398 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1399 .await
1400 .expect("open failed");
1401 for _ in 0..100 {
1402 let mut transaction = fs
1403 .clone()
1404 .new_transaction(
1405 lock_keys![LockKey::object(
1406 root_store.store_object_id(),
1407 root_directory.object_id()
1408 )],
1409 Options::default(),
1410 )
1411 .await
1412 .expect("new_transaction failed");
1413 let object = root_directory
1414 .create_child_file(&mut transaction, "test")
1415 .await
1416 .expect("create_child_file failed");
1417 transaction.commit().await.expect("commit failed");
1418
1419 {
1420 let buf = object.allocate_buffer(1024).await;
1421 object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1422 }
1423 std::mem::drop(object);
1424
1425 let mut transaction = root_directory
1426 .acquire_context_for_replace(None, "test", true)
1427 .await
1428 .expect("acquire_context_for_replace failed")
1429 .transaction;
1430 replace_child(&mut transaction, None, (&root_directory, "test"))
1431 .await
1432 .expect("replace_child failed");
1433 transaction.commit().await.expect("commit failed");
1434 }
1435 fs.close().await.expect("close failed");
1436 }
1437
1438 #[test_case(true; "test power fail with barriers")]
1439 #[test_case(false; "test power fail with checksums")]
1440 #[fuchsia::test]
1441 async fn test_power_fail(barriers_enabled: bool) {
1442 for _ in 0..10 {
1445 let (store_id, device, test_file_object_id) = {
1446 let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
1447 let fs = if barriers_enabled {
1448 FxFilesystemBuilder::new()
1449 .barriers_enabled(true)
1450 .format(true)
1451 .open(device)
1452 .await
1453 .expect("new filesystem failed")
1454 } else {
1455 FxFilesystem::new_empty(device).await.expect("new_empty failed")
1456 };
1457 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
1458
1459 fs.sync(SyncOptions { flush_device: true, ..SyncOptions::default() })
1460 .await
1461 .expect("sync failed");
1462
1463 let store = root_volume
1464 .new_volume(
1465 "test",
1466 NewChildStoreOptions {
1467 options: StoreOptions {
1468 crypt: Some(Arc::new(new_insecure_crypt())),
1469 ..StoreOptions::default()
1470 },
1471 ..Default::default()
1472 },
1473 )
1474 .await
1475 .expect("new_volume failed");
1476 let root_directory = Directory::open(&store, store.root_directory_object_id())
1477 .await
1478 .expect("open failed");
1479
1480 async fn create_files(store: &Arc<ObjectStore>, prefix: &str) {
1482 let fs = store.filesystem();
1483 let root_directory = Directory::open(store, store.root_directory_object_id())
1484 .await
1485 .expect("open failed");
1486 for i in 0..100 {
1487 let mut transaction = fs
1488 .clone()
1489 .new_transaction(
1490 lock_keys![LockKey::object(
1491 store.store_object_id(),
1492 store.root_directory_object_id()
1493 )],
1494 Options::default(),
1495 )
1496 .await
1497 .expect("new_transaction failed");
1498 root_directory
1499 .create_child_file(&mut transaction, &format!("{prefix} {i}"))
1500 .await
1501 .expect("create_child_file failed");
1502 transaction.commit().await.expect("commit failed");
1503 }
1504 }
1505
1506 create_files(&store, "A").await;
1508
1509 let mut transaction = fs
1512 .clone()
1513 .new_transaction(
1514 lock_keys![LockKey::object(
1515 store.store_object_id(),
1516 store.root_directory_object_id()
1517 )],
1518 Options::default(),
1519 )
1520 .await
1521 .expect("new_transaction failed");
1522 let object = root_directory
1523 .create_child_file(&mut transaction, "test")
1524 .await
1525 .expect("create_child_file failed");
1526 transaction.commit().await.expect("commit failed");
1527
1528 let mut transaction =
1529 object.new_transaction().await.expect("new_transaction failed");
1530 let mut buffer = object.allocate_buffer(4096).await;
1531 buffer.as_mut_slice().fill(0xed);
1532 object
1533 .txn_write(&mut transaction, 0, buffer.as_ref())
1534 .await
1535 .expect("txn_write failed");
1536 transaction.commit().await.expect("commit failed");
1537
1538 create_files(&store, "B").await;
1540
1541 fs.sync(SyncOptions::default()).await.expect("sync failed");
1544
1545 fasync::Timer::new(Duration::from_millis(10)).await;
1551
1552 (
1553 store.store_object_id(),
1554 fs.device().snapshot().expect("snapshot failed"),
1555 object.object_id(),
1556 )
1557 };
1558
1559 device
1562 .discard_random_since_last_flush()
1563 .expect("discard_random_since_last_flush failed");
1564
1565 let fs = FxFilesystem::open(device).await.expect("open failed");
1566 fsck(fs.clone()).await.expect("fsck failed");
1567
1568 let mut check_test_file = false;
1569
1570 let object_id = if fs.object_manager().store(store_id).is_some() {
1573 fsck_volume(&fs, store_id, Some(Arc::new(new_insecure_crypt())))
1574 .await
1575 .expect("fsck_volume failed");
1576
1577 let store = root_volume(fs.clone())
1581 .await
1582 .expect("root_volume failed")
1583 .volume(
1584 "test",
1585 StoreOptions {
1586 crypt: Some(Arc::new(new_insecure_crypt())),
1587 ..StoreOptions::default()
1588 },
1589 )
1590 .await
1591 .expect("volume failed");
1592
1593 let root_directory = Directory::open(&store, store.root_directory_object_id())
1594 .await
1595 .expect("open failed");
1596
1597 let mut transaction = fs
1598 .clone()
1599 .new_transaction(
1600 lock_keys![LockKey::object(
1601 store.store_object_id(),
1602 store.root_directory_object_id()
1603 )],
1604 Options::default(),
1605 )
1606 .await
1607 .expect("new_transaction failed");
1608 let object = root_directory
1609 .create_child_file(&mut transaction, &format!("C"))
1610 .await
1611 .expect("create_child_file failed");
1612 transaction.commit().await.expect("commit failed");
1613
1614 if let Ok(test_file) = ObjectStore::open_object(
1616 &store,
1617 test_file_object_id,
1618 HandleOptions::default(),
1619 None,
1620 )
1621 .await
1622 {
1623 let mut buffer = test_file.allocate_buffer(4096).await;
1625 let bytes = test_file.read(0, buffer.as_mut()).await.expect("read failed");
1626 if bytes == 4096 {
1627 let expected = [0xed; 4096];
1628 assert_eq!(buffer.as_slice(), &expected);
1629 } else {
1630 assert_eq!(bytes, 0);
1632 }
1633
1634 let mut transaction =
1636 test_file.new_transaction().await.expect("new_transaction failed");
1637 buffer.as_mut_slice().fill(0x37);
1638 test_file
1639 .txn_write(&mut transaction, 0, buffer.as_ref())
1640 .await
1641 .expect("txn_write failed");
1642 transaction.commit().await.expect("commit failed");
1643 check_test_file = true;
1644 }
1645
1646 object.object_id()
1647 } else {
1648 INVALID_OBJECT_ID
1649 };
1650
1651 fs.close().await.expect("close failed");
1653 let device = fs.take_device().await;
1654 device.reopen(false);
1655
1656 let fs = FxFilesystem::open(device).await.expect("open failed");
1657 fsck(fs.clone()).await.expect("fsck failed");
1658
1659 if object_id != INVALID_OBJECT_ID {
1662 fsck_volume(&fs, store_id, Some(Arc::new(new_insecure_crypt())))
1663 .await
1664 .expect("fsck_volume failed");
1665
1666 let store = root_volume(fs.clone())
1667 .await
1668 .expect("root_volume failed")
1669 .volume(
1670 "test",
1671 StoreOptions {
1672 crypt: Some(Arc::new(new_insecure_crypt())),
1673 ..StoreOptions::default()
1674 },
1675 )
1676 .await
1677 .expect("volume failed");
1678 ObjectStore::open_object(&store, object_id, HandleOptions::default(), None)
1680 .await
1681 .expect("open_object failed");
1682
1683 if check_test_file {
1685 info!("Checking test file for modification");
1686 let test_file = ObjectStore::open_object(
1687 &store,
1688 test_file_object_id,
1689 HandleOptions::default(),
1690 None,
1691 )
1692 .await
1693 .expect("open_object failed");
1694 let mut buffer = test_file.allocate_buffer(4096).await;
1695 assert_eq!(
1696 test_file.read(0, buffer.as_mut()).await.expect("read failed"),
1697 4096
1698 );
1699 let expected = [0x37; 4096];
1700 assert_eq!(buffer.as_slice(), &expected);
1701 }
1702 }
1703
1704 fs.close().await.expect("close failed");
1705 }
1706 }
1707
1708 #[fuchsia::test]
1709 async fn test_barrier_not_emitted_when_transaction_has_no_data() {
1710 let barrier_count = Arc::new(AtomicU32::new(0));
1711
1712 struct Observer(Arc<AtomicU32>);
1713
1714 impl fake_device::Observer for Observer {
1715 fn barrier(&self) {
1716 self.0.fetch_add(1, atomic::Ordering::Relaxed);
1717 }
1718 }
1719
1720 let mut fake_device = FakeDevice::new(8192, 4096);
1721 fake_device.set_observer(Box::new(Observer(barrier_count.clone())));
1722 let device = DeviceHolder::new(fake_device);
1723 let fs = FxFilesystemBuilder::new()
1724 .barriers_enabled(true)
1725 .format(true)
1726 .open(device)
1727 .await
1728 .expect("new filesystem failed");
1729
1730 {
1731 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1732 root_vol
1733 .new_volume(
1734 "test",
1735 NewChildStoreOptions {
1736 options: StoreOptions {
1737 crypt: Some(Arc::new(new_insecure_crypt())),
1738 ..StoreOptions::default()
1739 },
1740 ..NewChildStoreOptions::default()
1741 },
1742 )
1743 .await
1744 .expect("there is no test volume");
1745 fs.close().await.expect("close failed");
1746 }
1747 let device = fs.take_device().await;
1750 device.reopen(false);
1751 let fs = FxFilesystemBuilder::new()
1752 .barriers_enabled(true)
1753 .open(device)
1754 .await
1755 .expect("new filesystem failed");
1756 let expected_barrier_count = barrier_count.load(atomic::Ordering::Relaxed);
1757
1758 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1759 let store = root_vol
1760 .volume(
1761 "test",
1762 StoreOptions {
1763 crypt: Some(Arc::new(new_insecure_crypt())),
1764 ..StoreOptions::default()
1765 },
1766 )
1767 .await
1768 .expect("there is no test volume");
1769
1770 let fs = store.filesystem();
1772 let root_directory =
1773 Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
1774 for i in 0..100 {
1775 let mut transaction = fs
1776 .clone()
1777 .new_transaction(
1778 lock_keys![LockKey::object(
1779 store.store_object_id(),
1780 store.root_directory_object_id()
1781 )],
1782 Options::default(),
1783 )
1784 .await
1785 .expect("new_transaction failed");
1786 root_directory
1787 .create_child_file(&mut transaction, &format!("A {i}"))
1788 .await
1789 .expect("create_child_file failed");
1790 transaction.commit().await.expect("commit failed");
1791 }
1792
1793 fs.close().await.expect("close failed");
1795 assert_eq!(expected_barrier_count, barrier_count.load(atomic::Ordering::Relaxed));
1797 }
1798
1799 #[fuchsia::test]
1800 async fn test_barrier_emitted_when_transaction_includes_data() {
1801 let barrier_count = Arc::new(AtomicU32::new(0));
1802
1803 struct Observer(Arc<AtomicU32>);
1804
1805 impl fake_device::Observer for Observer {
1806 fn barrier(&self) {
1807 self.0.fetch_add(1, atomic::Ordering::Relaxed);
1808 }
1809 }
1810
1811 let mut fake_device = FakeDevice::new(8192, 4096);
1812 fake_device.set_observer(Box::new(Observer(barrier_count.clone())));
1813 let device = DeviceHolder::new(fake_device);
1814 let fs = FxFilesystemBuilder::new()
1815 .barriers_enabled(true)
1816 .format(true)
1817 .open(device)
1818 .await
1819 .expect("new filesystem failed");
1820
1821 {
1822 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1823 root_vol
1824 .new_volume(
1825 "test",
1826 NewChildStoreOptions {
1827 options: StoreOptions {
1828 crypt: Some(Arc::new(new_insecure_crypt())),
1829 ..StoreOptions::default()
1830 },
1831 ..NewChildStoreOptions::default()
1832 },
1833 )
1834 .await
1835 .expect("there is no test volume");
1836 fs.close().await.expect("close failed");
1837 }
1838 let device = fs.take_device().await;
1841 device.reopen(false);
1842 let fs = FxFilesystemBuilder::new()
1843 .barriers_enabled(true)
1844 .open(device)
1845 .await
1846 .expect("new filesystem failed");
1847 let expected_barrier_count = barrier_count.load(atomic::Ordering::Relaxed);
1848
1849 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1850 let store = root_vol
1851 .volume(
1852 "test",
1853 StoreOptions {
1854 crypt: Some(Arc::new(new_insecure_crypt())),
1855 ..StoreOptions::default()
1856 },
1857 )
1858 .await
1859 .expect("there is no test volume");
1860
1861 let fs: Arc<FxFilesystem> = store.filesystem();
1863 let root_directory =
1864 Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
1865
1866 let mut transaction = fs
1867 .clone()
1868 .new_transaction(
1869 lock_keys![LockKey::object(
1870 store.store_object_id(),
1871 store.root_directory_object_id()
1872 )],
1873 Options::default(),
1874 )
1875 .await
1876 .expect("new_transaction failed");
1877 let object = root_directory
1878 .create_child_file(&mut transaction, "test")
1879 .await
1880 .expect("create_child_file failed");
1881 transaction.commit().await.expect("commit failed");
1882
1883 let mut transaction = object.new_transaction().await.expect("new_transaction failed");
1884 let mut buffer = object.allocate_buffer(4096).await;
1885 buffer.as_mut_slice().fill(0xed);
1886 object.txn_write(&mut transaction, 0, buffer.as_ref()).await.expect("txn_write failed");
1887 transaction.commit().await.expect("commit failed");
1888
1889 fs.close().await.expect("close failed");
1891 assert!(expected_barrier_count < barrier_count.load(atomic::Ordering::Relaxed));
1893 }
1894
1895 #[test_case(true; "fail when original filesystem has barriers enabled")]
1896 #[test_case(false; "fail when original filesystem has barriers disabled")]
1897 #[fuchsia::test]
1898 async fn test_switching_barrier_mode_on_existing_filesystem(original_barrier_mode: bool) {
1899 let crypt = Some(Arc::new(new_insecure_crypt()) as Arc<dyn fxfs_crypto::Crypt>);
1900 let fake_device = FakeDevice::new(8192, 4096);
1901 let device = DeviceHolder::new(fake_device);
1902 let fs: super::OpenFxFilesystem = FxFilesystemBuilder::new()
1903 .barriers_enabled(original_barrier_mode)
1904 .format(true)
1905 .open(device)
1906 .await
1907 .expect("new filesystem failed");
1908
1909 {
1911 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1912 let store = root_vol
1913 .new_volume(
1914 "test",
1915 NewChildStoreOptions {
1916 options: StoreOptions { crypt: crypt.clone(), ..Default::default() },
1917 ..Default::default()
1918 },
1919 )
1920 .await
1921 .expect("creating test volume");
1922 let root_dir = Directory::open(&store, store.root_directory_object_id())
1923 .await
1924 .expect("open failed");
1925 let mut transaction = fs
1926 .clone()
1927 .new_transaction(
1928 lock_keys![LockKey::object(
1929 store.store_object_id(),
1930 store.root_directory_object_id()
1931 )],
1932 Default::default(),
1933 )
1934 .await
1935 .expect("new_transaction failed");
1936 let object = root_dir
1937 .create_child_file(&mut transaction, "file")
1938 .await
1939 .expect("create_child_file failed");
1940 transaction.commit().await.expect("commit failed");
1941 let mut buffer = object.allocate_buffer(4096).await;
1942 buffer.as_mut_slice().fill(0xA7);
1943 let new_size = object.write_or_append(None, buffer.as_ref()).await.unwrap();
1944 assert_eq!(new_size, 4096);
1945 }
1946
1947 fs.close().await.expect("close failed");
1949 let device = fs.take_device().await;
1950 device.reopen(false);
1951 let fs = FxFilesystemBuilder::new()
1952 .barriers_enabled(!original_barrier_mode)
1953 .open(device)
1954 .await
1955 .expect("new filesystem failed");
1956 {
1957 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1958 let store = root_vol
1959 .volume("test", StoreOptions { crypt: crypt.clone(), ..Default::default() })
1960 .await
1961 .expect("opening test volume");
1962 let root_dir = Directory::open(&store, store.root_directory_object_id())
1963 .await
1964 .expect("open failed");
1965 let (object_id, _, _) =
1966 root_dir.lookup("file").await.expect("lookup failed").expect("missing file");
1967 let test_file = ObjectStore::open_object(&store, object_id, Default::default(), None)
1968 .await
1969 .expect("open failed");
1970 let mut buffer = test_file.allocate_buffer(4096).await;
1972 buffer.as_mut_slice().fill(0xA8);
1973 let new_size = test_file.write_or_append(None, buffer.as_ref()).await.unwrap();
1974 assert_eq!(new_size, 8192);
1975 }
1976
1977 fs.close().await.expect("close failed");
1980 let device = fs.take_device().await;
1981 device.reopen(false);
1982 let fs = FxFilesystemBuilder::new()
1983 .barriers_enabled(original_barrier_mode)
1984 .open(device)
1985 .await
1986 .expect("new filesystem failed");
1987 {
1988 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1989 let store = root_vol
1990 .volume("test", StoreOptions { crypt: crypt.clone(), ..Default::default() })
1991 .await
1992 .expect("opening test volume");
1993 let root_dir = Directory::open(&store, store.root_directory_object_id())
1994 .await
1995 .expect("open failed");
1996 let (object_id, _, _) =
1997 root_dir.lookup("file").await.expect("lookup failed").expect("missing file");
1998 let test_file = ObjectStore::open_object(&store, object_id, Default::default(), None)
1999 .await
2000 .expect("open failed");
2001 let mut buffer = test_file.allocate_buffer(8192).await;
2002 assert_eq!(
2003 test_file.read(0, buffer.as_mut()).await.expect("read failed"),
2004 8192,
2005 "short read"
2006 );
2007 assert_eq!(buffer.as_slice()[0..4096], [0xA7; 4096]);
2008 assert_eq!(buffer.as_slice()[4096..8192], [0xA8; 4096]);
2009 }
2010 fs.close().await.expect("close failed");
2011 }
2012
2013 #[fuchsia::test]
2014 async fn test_image_builder_mode_no_early_writes() {
2015 const BLOCK_SIZE: u32 = 4096;
2016 let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
2017 device.reopen(true);
2018 let fs = FxFilesystemBuilder::new()
2019 .format(true)
2020 .image_builder_mode(Some(SuperBlockInstance::A))
2021 .open(device)
2022 .await
2023 .expect("open failed");
2024 fs.enable_allocations();
2025 fs.device().reopen(false);
2027 fs.close().await.expect("closed");
2028 }
2029
2030 #[fuchsia::test]
2031 async fn test_image_builder_mode() {
2032 const BLOCK_SIZE: u32 = 4096;
2033 const EXISTING_FILE_RANGE: Range<u64> = 4096 * 1024..4096 * 1025;
2034 let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
2035
2036 {
2038 let mut write_buf =
2039 device.allocate_buffer(EXISTING_FILE_RANGE.length().unwrap() as usize).await;
2040 write_buf.as_mut_slice().fill(0xf0);
2041 device.write(EXISTING_FILE_RANGE.start, write_buf.as_ref()).await.expect("write");
2042 }
2043
2044 device.reopen(true);
2045
2046 let device = {
2047 let fs = FxFilesystemBuilder::new()
2048 .format(true)
2049 .image_builder_mode(Some(SuperBlockInstance::B))
2050 .open(device)
2051 .await
2052 .expect("open failed");
2053 fs.enable_allocations();
2054 {
2055 let root_store = fs.root_store();
2056 let root_directory =
2057 Directory::open(&root_store, root_store.root_directory_object_id())
2058 .await
2059 .expect("open failed");
2060 let handle;
2062 {
2063 let mut transaction = fs
2064 .clone()
2065 .new_transaction(
2066 lock_keys![LockKey::object(
2067 root_directory.store().store_object_id(),
2068 root_directory.object_id()
2069 )],
2070 Options::default(),
2071 )
2072 .await
2073 .expect("new transaction");
2074 handle = root_directory
2075 .create_child_file(&mut transaction, "test")
2076 .await
2077 .expect("create file");
2078 handle.extend(&mut transaction, EXISTING_FILE_RANGE).await.expect("extend");
2079 transaction.commit().await.expect("commit");
2080 }
2081 }
2082 fs.device().reopen(false);
2083 fs.close().await.expect("close");
2084 fs.take_device().await
2085 };
2086 device.reopen(false);
2087 let fs = FxFilesystem::open(device).await.expect("open failed");
2088 fsck(fs.clone()).await.expect("fsck failed");
2089
2090 let root_store = fs.root_store();
2092 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
2093 .await
2094 .expect("open failed");
2095 let (object_id, descriptor, _) =
2096 root_directory.lookup("test").await.expect("lookup failed").unwrap();
2097 assert_eq!(descriptor, ObjectDescriptor::File);
2098 let test_file =
2099 ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
2100 .await
2101 .expect("open failed");
2102 let mut read_buf =
2103 test_file.allocate_buffer(EXISTING_FILE_RANGE.length().unwrap() as usize).await;
2104 test_file.read(0, read_buf.as_mut()).await.expect("read failed");
2105 assert_eq!(read_buf.as_slice(), [0xf0; 4096]);
2106 fs.close().await.expect("closed");
2107 }
2108
2109 #[fuchsia::test]
2110 async fn test_read_only_mount_on_full_filesystem() {
2111 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2112 let fs =
2113 FxFilesystemBuilder::new().format(true).open(device).await.expect("new_empty failed");
2114 let root_store = fs.root_store();
2115 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
2116 .await
2117 .expect("open failed");
2118
2119 let mut transaction = fs
2120 .clone()
2121 .new_transaction(
2122 lock_keys![LockKey::object(
2123 root_store.store_object_id(),
2124 root_directory.object_id()
2125 )],
2126 Options::default(),
2127 )
2128 .await
2129 .expect("new_transaction failed");
2130 let handle = root_directory
2131 .create_child_file(&mut transaction, "test")
2132 .await
2133 .expect("create_child_file failed");
2134 transaction.commit().await.expect("commit failed");
2135
2136 let mut buf = handle.allocate_buffer(4096).await;
2137 buf.as_mut_slice().fill(0xaa);
2138 loop {
2139 if handle.write_or_append(None, buf.as_ref()).await.is_err() {
2140 break;
2141 }
2142 }
2143
2144 let max_offset = fs.allocator().maximum_offset();
2145 fs.close().await.expect("Close failed");
2146
2147 let device = fs.take_device().await;
2148 device.reopen(false);
2149 let mut buffer = device
2150 .allocate_buffer(
2151 crate::round::round_up(max_offset, TEST_DEVICE_BLOCK_SIZE).unwrap() as usize
2152 )
2153 .await;
2154 device.read(0, buffer.as_mut()).await.expect("read failed");
2155
2156 let device = DeviceHolder::new(
2157 FakeDevice::from_image(&buffer.as_slice()[..], TEST_DEVICE_BLOCK_SIZE)
2158 .expect("from_image failed"),
2159 );
2160 let fs =
2161 FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
2162 fs.close().await.expect("Close failed");
2163 }
2164
2165 #[test_case(SuperBlockInstance::A; "Superblock instance A")]
2166 #[test_case(SuperBlockInstance::B; "Superblock instance B")]
2167 #[fuchsia::test]
2168 async fn test_image_builder_mode_flush_on_close_sb_a(target_sb: SuperBlockInstance) {
2169 const BLOCK_SIZE: u32 = 4096;
2170 let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
2171
2172 device.reopen(true);
2174 let fs = FxFilesystemBuilder::new()
2175 .format(true)
2176 .image_builder_mode(Some(target_sb))
2177 .open(device)
2178 .await
2179 .expect("open failed");
2180
2181 fs.enable_allocations();
2182
2183 fs.device().reopen(false);
2185
2186 {
2188 let root_store = fs.root_store();
2189 let root_directory =
2190 Directory::open(&root_store, root_store.root_directory_object_id())
2191 .await
2192 .expect("open failed");
2193
2194 let mut transaction = fs
2195 .clone()
2196 .new_transaction(
2197 lock_keys![LockKey::object(
2198 root_directory.store().store_object_id(),
2199 root_directory.object_id()
2200 )],
2201 Options::default(),
2202 )
2203 .await
2204 .expect("new transaction");
2205 let handle = root_directory
2206 .create_child_file(&mut transaction, "post_finalize_file")
2207 .await
2208 .expect("create file");
2209 transaction.commit().await.expect("commit");
2210
2211 let mut buf = handle.allocate_buffer(BLOCK_SIZE as usize).await;
2212 buf.as_mut_slice().fill(0xaa);
2213 handle.write_or_append(None, buf.as_ref()).await.expect("write failed");
2214 }
2215
2216 fs.close().await.expect("close failed");
2218
2219 let other_sb = target_sb.next();
2220
2221 let device = fs.take_device().await;
2223 device.reopen(true); let mut buf = device.allocate_buffer(BLOCK_SIZE as usize).await;
2225
2226 device.read(target_sb.first_extent().start, buf.as_mut()).await.expect("read target_sb");
2227 assert_eq!(&buf.as_slice()[..8], b"FxfsSupr", "target_sb should have magic bytes");
2228
2229 buf.as_mut_slice().fill(0); device.read(other_sb.first_extent().start, buf.as_mut()).await.expect("read other_sb");
2231 assert_eq!(buf.as_slice(), &[0; 4096], "other_sb should be zeroed");
2233 }
2234}