1use crate::errors::FxfsError;
6use crate::fsck::{FsckOptions, fsck_volume_with_options, fsck_with_options};
7use crate::log::*;
8use crate::metrics;
9use crate::object_store::allocator::{Allocator, Hold, Reservation};
10use crate::object_store::directory::Directory;
11use crate::object_store::graveyard::Graveyard;
12use crate::object_store::journal::super_block::{SuperBlockHeader, SuperBlockInstance};
13use crate::object_store::journal::{self, Journal, JournalCheckpoint, JournalOptions};
14use crate::object_store::object_manager::ObjectManager;
15use crate::object_store::transaction::{
16 self, AssocObj, LockKey, LockKeys, LockManager, MetadataReservation, Mutation, ReadGuard,
17 TRANSACTION_METADATA_MAX_AMOUNT, Transaction, WriteGuard, lock_keys,
18};
19use crate::object_store::volume::{VOLUMES_DIRECTORY, root_volume};
20use crate::object_store::{NewChildStoreOptions, ObjectStore, StoreOptions};
21use crate::range::RangeExt;
22use crate::serialized_types::{LATEST_VERSION, Version};
23use anyhow::{Context, Error, anyhow, bail, ensure};
24use async_trait::async_trait;
25use event_listener::Event;
26use fuchsia_async as fasync;
27use fuchsia_inspect::{Inspector, LazyNode, NumericProperty as _, UintProperty};
28use fuchsia_sync::Mutex;
29use futures::FutureExt;
30use fxfs_crypto::Crypt;
31use static_assertions::const_assert;
32use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
33use std::sync::{Arc, OnceLock, Weak};
34use storage_device::{Device, DeviceHolder};
35
36pub const MIN_BLOCK_SIZE: u64 = 4096;
37pub const MAX_BLOCK_SIZE: u64 = u16::MAX as u64 + 1;
38
39pub const MAX_FILE_SIZE: u64 = i64::MAX as u64 - 4095;
44const_assert!(9223372036854771712 == MAX_FILE_SIZE);
45
46const MAX_IN_FLIGHT_TRANSACTIONS: u64 = 4;
48
49const TRIM_AFTER_BOOT_TIMER: std::time::Duration = std::time::Duration::from_secs(60 * 60);
53
54const TRIM_INTERVAL_TIMER: std::time::Duration = std::time::Duration::from_secs(60 * 60 * 24);
56
57pub struct Info {
59 pub total_bytes: u64,
60 pub used_bytes: u64,
61}
62
63pub type PostCommitHook =
64 Option<Box<dyn Fn() -> futures::future::BoxFuture<'static, ()> + Send + Sync>>;
65
66pub type PreCommitHook = Option<Box<dyn Fn(&Transaction<'_>) -> Result<(), Error> + Send + Sync>>;
67
68pub struct Options {
69 pub read_only: bool,
71
72 pub roll_metadata_key_byte_count: u64,
76
77 pub pre_commit_hook: PreCommitHook,
80
81 pub post_commit_hook: PostCommitHook,
84
85 pub skip_initial_reap: bool,
88
89 pub trim_config: Option<(std::time::Duration, std::time::Duration)>,
94
95 pub image_builder_mode: Option<SuperBlockInstance>,
98
99 pub inline_crypto_enabled: bool,
106
107 pub barriers_enabled: bool,
112}
113
114impl Default for Options {
115 fn default() -> Self {
116 Options {
117 roll_metadata_key_byte_count: 128 * 1024 * 1024,
118 read_only: false,
119 pre_commit_hook: None,
120 post_commit_hook: None,
121 skip_initial_reap: false,
122 trim_config: Some((TRIM_AFTER_BOOT_TIMER, TRIM_INTERVAL_TIMER)),
123 image_builder_mode: None,
124 inline_crypto_enabled: false,
125 barriers_enabled: false,
126 }
127 }
128}
129
130pub struct ApplyContext<'a, 'b> {
132 pub mode: ApplyMode<'a, 'b>,
134
135 pub checkpoint: JournalCheckpoint,
137}
138
139pub enum ApplyMode<'a, 'b> {
142 Replay,
143 Live(&'a Transaction<'b>),
144}
145
146impl ApplyMode<'_, '_> {
147 pub fn is_replay(&self) -> bool {
148 matches!(self, ApplyMode::Replay)
149 }
150
151 pub fn is_live(&self) -> bool {
152 matches!(self, ApplyMode::Live(_))
153 }
154}
155
156#[async_trait]
159pub trait JournalingObject: Send + Sync {
160 fn apply_mutation(
164 &self,
165 mutation: Mutation,
166 context: &ApplyContext<'_, '_>,
167 assoc_obj: AssocObj<'_>,
168 ) -> Result<(), Error>;
169
170 fn drop_mutation(&self, mutation: Mutation, transaction: &Transaction<'_>);
172
173 async fn flush(&self) -> Result<Version, Error>;
177
178 fn write_mutation(&self, mutation: &Mutation, mut writer: journal::Writer<'_>) {
181 writer.write(mutation.clone());
182 }
183}
184
185#[derive(Default)]
186pub struct SyncOptions<'a> {
187 pub flush_device: bool,
194
195 pub precondition: Option<Box<dyn FnOnce() -> bool + 'a + Send>>,
198}
199
200pub struct OpenFxFilesystem(Arc<FxFilesystem>);
201
202impl OpenFxFilesystem {
203 pub async fn take_device(self) -> DeviceHolder {
206 let fut = self.device.take_when_dropped();
207 std::mem::drop(self);
208 debug_assert_not_too_long!(fut)
209 }
210
211 pub async fn finalize(&self) -> Result<(), Error> {
213 ensure!(
214 self.journal().image_builder_mode().is_some(),
215 "finalize() only valid in image_builder_mode."
216 );
217 self.journal().allocate_journal().await?;
218 self.journal().set_image_builder_mode(None);
219 self.journal().compact().await?;
220 Ok(())
221 }
222}
223
224impl From<Arc<FxFilesystem>> for OpenFxFilesystem {
225 fn from(fs: Arc<FxFilesystem>) -> Self {
226 Self(fs)
227 }
228}
229
230impl Drop for OpenFxFilesystem {
231 fn drop(&mut self) {
232 if self.options.image_builder_mode.is_some()
233 && self.journal().image_builder_mode().is_some()
234 {
235 error!("OpenFxFilesystem in image_builder_mode dropped without calling finalize().");
236 }
237 if !self.options.read_only && !self.closed.load(Ordering::SeqCst) {
238 error!("OpenFxFilesystem dropped without first being closed. Data loss may occur.");
239 }
240 }
241}
242
243impl std::ops::Deref for OpenFxFilesystem {
244 type Target = Arc<FxFilesystem>;
245
246 fn deref(&self) -> &Self::Target {
247 &self.0
248 }
249}
250
251pub struct FxFilesystemBuilder {
252 format: bool,
253 trace: bool,
254 options: Options,
255 journal_options: JournalOptions,
256 on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
257 on_new_store: Option<Box<dyn Fn(&ObjectStore) + Send + Sync>>,
258 fsck_after_every_transaction: bool,
259}
260
261impl FxFilesystemBuilder {
262 pub fn new() -> Self {
263 Self {
264 format: false,
265 trace: false,
266 options: Options::default(),
267 journal_options: JournalOptions::default(),
268 on_new_allocator: None,
269 on_new_store: None,
270 fsck_after_every_transaction: false,
271 }
272 }
273
274 pub fn format(mut self, format: bool) -> Self {
276 self.format = format;
277 self
278 }
279
280 pub fn trace(mut self, trace: bool) -> Self {
282 self.trace = trace;
283 self
284 }
285
286 pub fn read_only(mut self, read_only: bool) -> Self {
289 self.options.read_only = read_only;
290 self
291 }
292
293 pub fn image_builder_mode(mut self, mode: Option<SuperBlockInstance>) -> Self {
300 self.options.image_builder_mode = mode;
301 self
302 }
303
304 pub fn roll_metadata_key_byte_count(mut self, roll_metadata_key_byte_count: u64) -> Self {
306 self.options.roll_metadata_key_byte_count = roll_metadata_key_byte_count;
307 self
308 }
309
310 pub fn pre_commit_hook(
312 mut self,
313 hook: impl Fn(&Transaction<'_>) -> Result<(), Error> + Send + Sync + 'static,
314 ) -> Self {
315 self.options.pre_commit_hook = Some(Box::new(hook));
316 self
317 }
318
319 pub fn post_commit_hook(
322 mut self,
323 hook: impl Fn() -> futures::future::BoxFuture<'static, ()> + Send + Sync + 'static,
324 ) -> Self {
325 self.options.post_commit_hook = Some(Box::new(hook));
326 self
327 }
328
329 pub fn skip_initial_reap(mut self, skip_initial_reap: bool) -> Self {
332 self.options.skip_initial_reap = skip_initial_reap;
333 self
334 }
335
336 pub fn journal_options(mut self, journal_options: JournalOptions) -> Self {
338 self.journal_options = journal_options;
339 self
340 }
341
342 pub fn on_new_allocator(
344 mut self,
345 on_new_allocator: impl Fn(Arc<Allocator>) + Send + Sync + 'static,
346 ) -> Self {
347 self.on_new_allocator = Some(Box::new(on_new_allocator));
348 self
349 }
350
351 pub fn on_new_store(
353 mut self,
354 on_new_store: impl Fn(&ObjectStore) + Send + Sync + 'static,
355 ) -> Self {
356 self.on_new_store = Some(Box::new(on_new_store));
357 self
358 }
359
360 pub fn fsck_after_every_transaction(mut self, fsck_after_every_transaction: bool) -> Self {
362 self.fsck_after_every_transaction = fsck_after_every_transaction;
363 self
364 }
365
366 pub fn trim_config(
367 mut self,
368 delay_and_interval: Option<(std::time::Duration, std::time::Duration)>,
369 ) -> Self {
370 self.options.trim_config = delay_and_interval;
371 self
372 }
373
374 pub fn inline_crypto_enabled(mut self, inline_crypto_enabled: bool) -> Self {
376 self.options.inline_crypto_enabled = inline_crypto_enabled;
377 self
378 }
379
380 pub fn barriers_enabled(mut self, barriers_enabled: bool) -> Self {
383 self.options.barriers_enabled = barriers_enabled;
384 self.journal_options.barriers_enabled = barriers_enabled;
385 self
386 }
387
388 pub async fn open(self, device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
390 let read_only = self.options.read_only;
391 if self.format && read_only {
392 bail!("Cannot initialize a filesystem as read-only");
393 }
394
395 if self.options.inline_crypto_enabled && !self.options.barriers_enabled {
397 bail!("A filesystem using inline encryption requires barriers");
398 }
399
400 let objects = Arc::new(ObjectManager::new(self.on_new_store));
401 let journal = Arc::new(Journal::new(objects.clone(), self.journal_options));
402
403 let image_builder_mode = self.options.image_builder_mode;
404
405 let block_size = std::cmp::max(device.block_size().into(), MIN_BLOCK_SIZE);
406 assert_eq!(block_size % MIN_BLOCK_SIZE, 0);
407 assert!(block_size <= MAX_BLOCK_SIZE, "Max supported block size is 64KiB");
408
409 let mut fsck_after_every_transaction = None;
410 let mut filesystem_options = self.options;
411 if self.fsck_after_every_transaction {
412 let instance =
413 FsckAfterEveryTransaction::new(filesystem_options.post_commit_hook.take());
414 fsck_after_every_transaction = Some(instance.clone());
415 filesystem_options.post_commit_hook =
416 Some(Box::new(move || instance.clone().run().boxed()));
417 }
418
419 if !read_only && !self.format {
420 device.flush().await.context("Device flush failed")?;
423 }
424
425 let filesystem = Arc::new_cyclic(|weak: &Weak<FxFilesystem>| {
426 let weak = weak.clone();
427 FxFilesystem {
428 device,
429 block_size,
430 objects: objects.clone(),
431 journal,
432 commit_mutex: futures::lock::Mutex::new(()),
433 lock_manager: LockManager::new(),
434 flush_task: Mutex::new(None),
435 trim_task: Mutex::new(None),
436 closed: AtomicBool::new(true),
437 shutdown_event: Event::new(),
438 trace: self.trace,
439 graveyard: Graveyard::new(objects.clone()),
440 completed_transactions: metrics::detail().create_uint("completed_transactions", 0),
441 options: filesystem_options,
442 in_flight_transactions: AtomicU64::new(0),
443 transaction_limit_event: Event::new(),
444 _stores_node: metrics::register_fs(move || {
445 let weak = weak.clone();
446 Box::pin(async move {
447 if let Some(fs) = weak.upgrade() {
448 fs.populate_stores_node().await
449 } else {
450 Err(anyhow!("Filesystem has been dropped"))
451 }
452 })
453 }),
454 }
455 });
456
457 filesystem.journal().set_image_builder_mode(image_builder_mode);
458
459 filesystem.journal.set_trace(self.trace);
460 if self.format {
461 filesystem.journal.init_empty(filesystem.clone()).await?;
462 if image_builder_mode.is_none() {
463 filesystem.journal.init_superblocks().await?;
466
467 filesystem.graveyard.clone().reap_async();
469 }
470
471 let root_store = filesystem.root_store();
473 root_store.set_trace(self.trace);
474 let root_directory =
475 Directory::open(&root_store, root_store.root_directory_object_id())
476 .await
477 .context("Unable to open root volume directory")?;
478 let mut transaction = filesystem
479 .clone()
480 .new_transaction(
481 lock_keys![LockKey::object(
482 root_store.store_object_id(),
483 root_directory.object_id()
484 )],
485 transaction::Options::default(),
486 )
487 .await?;
488 let volume_directory =
489 root_directory.create_child_dir(&mut transaction, VOLUMES_DIRECTORY).await?;
490 transaction.commit().await?;
491 objects.set_volume_directory(volume_directory);
492 } else {
493 filesystem
494 .journal
495 .replay(filesystem.clone(), self.on_new_allocator)
496 .await
497 .context("Journal replay failed")?;
498 filesystem.root_store().set_trace(self.trace);
499
500 if !read_only {
501 for store in objects.unlocked_stores() {
505 filesystem.graveyard.initial_reap(&store).await?;
506 }
507 }
508 }
509
510 if let Some(fsck_after_every_transaction) = fsck_after_every_transaction {
512 fsck_after_every_transaction
513 .fs
514 .set(Arc::downgrade(&filesystem))
515 .unwrap_or_else(|_| unreachable!());
516 }
517
518 filesystem.closed.store(false, Ordering::SeqCst);
519
520 if !read_only && image_builder_mode.is_none() {
521 filesystem.graveyard.clone().reap_async();
523
524 if let Some((delay, interval)) = filesystem.options.trim_config.clone() {
525 filesystem.start_trim_task(delay, interval);
526 }
527 }
528
529 Ok(filesystem.into())
530 }
531}
532
533pub struct FxFilesystem {
534 block_size: u64,
535 objects: Arc<ObjectManager>,
536 journal: Arc<Journal>,
537 commit_mutex: futures::lock::Mutex<()>,
538 lock_manager: LockManager,
539 flush_task: Mutex<Option<fasync::Task<()>>>,
540 trim_task: Mutex<Option<fasync::Task<()>>>,
541 closed: AtomicBool,
542 shutdown_event: Event,
544 trace: bool,
545 graveyard: Arc<Graveyard>,
546 completed_transactions: UintProperty,
547 options: Options,
548
549 in_flight_transactions: AtomicU64,
551
552 transaction_limit_event: Event,
555
556 device: DeviceHolder,
559
560 _stores_node: LazyNode,
562}
563
564#[fxfs_trace::trace]
565impl FxFilesystem {
566 pub async fn new_empty(device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
567 FxFilesystemBuilder::new().format(true).open(device).await
568 }
569
570 pub async fn open(device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
571 FxFilesystemBuilder::new().open(device).await
572 }
573
574 pub fn root_parent_store(&self) -> Arc<ObjectStore> {
575 self.objects.root_parent_store()
576 }
577
578 pub async fn close(&self) -> Result<(), Error> {
579 assert_eq!(self.closed.swap(true, Ordering::SeqCst), false);
580 self.shutdown_event.notify(usize::MAX);
581 debug_assert_not_too_long!(self.graveyard.wait_for_reap());
582 let trim_task = self.trim_task.lock().take();
583 if let Some(task) = trim_task {
584 debug_assert_not_too_long!(task);
585 }
586 self.journal.stop_compactions().await;
587 let sync_status =
588 self.journal.sync(SyncOptions { flush_device: true, ..Default::default() }).await;
589 match &sync_status {
590 Ok(checkpoint) => info!(
591 "Filesystem closed (checkpoint={}, metadata_reservation={:?}, \
592 reservation_required={}, borrowed={})",
593 checkpoint.as_ref().unwrap().0.file_offset,
594 self.object_manager().metadata_reservation(),
595 self.object_manager().required_reservation(),
596 self.object_manager().borrowed_metadata_space(),
597 ),
598 Err(e) => error!(error:? = e; "Failed to sync filesystem; data may be lost"),
599 }
600 self.journal.terminate();
601 let flush_task = self.flush_task.lock().take();
602 if let Some(task) = flush_task {
603 debug_assert_not_too_long!(task);
604 }
605 self.device().close().await.context("Failed to close device")?;
608 sync_status.map(|_| ())
609 }
610
611 pub fn device(&self) -> Arc<dyn Device> {
612 Arc::clone(&self.device)
613 }
614
615 pub fn root_store(&self) -> Arc<ObjectStore> {
616 self.objects.root_store()
617 }
618
619 pub fn allocator(&self) -> Arc<Allocator> {
620 self.objects.allocator()
621 }
622
623 pub fn object_manager(&self) -> &Arc<ObjectManager> {
624 &self.objects
625 }
626
627 pub fn journal(&self) -> &Arc<Journal> {
628 &self.journal
629 }
630
631 pub async fn sync(&self, options: SyncOptions<'_>) -> Result<(), Error> {
632 self.journal.sync(options).await.map(|_| ())
633 }
634
635 pub fn block_size(&self) -> u64 {
636 self.block_size
637 }
638
639 pub fn get_info(&self) -> Info {
640 Info {
641 total_bytes: self.device.size(),
642 used_bytes: self.object_manager().allocator().get_used_bytes().0,
643 }
644 }
645
646 pub fn super_block_header(&self) -> SuperBlockHeader {
647 self.journal.super_block_header()
648 }
649
650 pub fn graveyard(&self) -> &Arc<Graveyard> {
651 &self.graveyard
652 }
653
654 pub fn trace(&self) -> bool {
655 self.trace
656 }
657
658 pub fn options(&self) -> &Options {
659 &self.options
660 }
661
662 pub async fn txn_guard(self: Arc<Self>) -> TxnGuard<'static> {
669 TxnGuard::Owned(
670 self.lock_manager
671 .read_lock(lock_keys!(LockKey::Filesystem))
672 .await
673 .into_owned(self.clone()),
674 )
675 }
676
677 pub async fn new_transaction<'a>(
678 self: Arc<Self>,
679 locks: LockKeys,
680 options: transaction::Options<'a>,
681 ) -> Result<Transaction<'a>, Error> {
682 let guard = if let Some(guard) = options.txn_guard.as_ref() {
683 TxnGuard::Borrowed(guard)
684 } else {
685 self.txn_guard().await
686 };
687 Transaction::new(guard, options, locks).await
688 }
689
690 #[trace]
691 pub async fn commit_transaction(
692 &self,
693 transaction: &mut Transaction<'_>,
694 callback: &mut (dyn FnMut(u64) + Send),
695 ) -> Result<u64, Error> {
696 if let Some(hook) = self.options.pre_commit_hook.as_ref() {
697 hook(transaction)?;
698 }
699 debug_assert_not_too_long!(self.lock_manager.commit_prepare(&transaction));
700 self.maybe_start_flush_task();
701 let _guard = debug_assert_not_too_long!(self.commit_mutex.lock());
702 let journal_offset = if self.journal().image_builder_mode().is_some() {
703 let journal_checkpoint =
704 JournalCheckpoint { file_offset: 0, checksum: 0, version: LATEST_VERSION };
705 let maybe_mutation = self
706 .object_manager()
707 .apply_transaction(transaction, &journal_checkpoint)
708 .expect("Transactions must not fail in image_builder_mode");
709 if let Some(mutation) = maybe_mutation {
710 assert!(matches!(mutation, Mutation::UpdateBorrowed(_)));
711 }
715 self.object_manager().did_commit_transaction(transaction, &journal_checkpoint, 0);
716 0
717 } else {
718 self.journal.commit(transaction).await?
719 };
720 self.completed_transactions.add(1);
721
722 callback(journal_offset);
727
728 if let Some(hook) = self.options.post_commit_hook.as_ref() {
729 hook().await;
730 }
731
732 Ok(journal_offset)
733 }
734
735 pub fn lock_manager(&self) -> &LockManager {
736 &self.lock_manager
737 }
738
739 pub(crate) fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
740 if !matches!(transaction.metadata_reservation, MetadataReservation::None) {
741 self.sub_transaction();
742 }
743 if let MetadataReservation::Hold(hold_amount) =
745 std::mem::replace(&mut transaction.metadata_reservation, MetadataReservation::None)
746 {
747 let hold = transaction
748 .allocator_reservation
749 .unwrap()
750 .reserve(0)
751 .expect("Zero should always succeed.");
752 hold.add(hold_amount);
753 }
754 self.objects.drop_transaction(transaction);
755 self.lock_manager.drop_transaction(transaction);
756 }
757
758 fn maybe_start_flush_task(&self) {
759 let mut flush_task = self.flush_task.lock();
760 if flush_task.is_none() {
761 let journal = self.journal.clone();
762 *flush_task = Some(fasync::Task::spawn(journal.flush_task()));
763 }
764 }
765
766 async fn do_trim(&self) -> Result<usize, Error> {
768 const MAX_EXTENTS_PER_BATCH: usize = 8;
769 const MAX_EXTENT_SIZE: usize = 256 * 1024;
770 let mut offset = 0;
771 let mut bytes_trimmed = 0;
772 loop {
773 if self.closed.load(Ordering::Relaxed) {
774 info!("Filesystem is closed, nothing to trim");
775 return Ok(bytes_trimmed);
776 }
777 let allocator = self.allocator();
778 let trimmable_extents =
779 allocator.take_for_trimming(offset, MAX_EXTENT_SIZE, MAX_EXTENTS_PER_BATCH).await?;
780 for device_range in trimmable_extents.extents() {
781 self.device.trim(device_range.clone()).await?;
782 bytes_trimmed += device_range.length()? as usize;
783 }
784 if let Some(device_range) = trimmable_extents.extents().last() {
785 offset = device_range.end;
786 } else {
787 break;
788 }
789 }
790 Ok(bytes_trimmed)
791 }
792
793 fn start_trim_task(
794 self: &Arc<Self>,
795 delay: std::time::Duration,
796 interval: std::time::Duration,
797 ) {
798 if !self.device.supports_trim() {
799 info!("Device does not support trim; not scheduling trimming");
800 return;
801 }
802 let this = self.clone();
803 let mut next_timer = delay;
804 *self.trim_task.lock() = Some(fasync::Task::spawn(async move {
805 loop {
806 let shutdown_listener = this.shutdown_event.listen();
807 if this.closed.load(Ordering::SeqCst) {
813 return;
814 }
815 futures::select!(
816 () = fasync::Timer::new(next_timer.clone()).fuse() => {},
817 () = shutdown_listener.fuse() => return,
818 );
819 let start_time = std::time::Instant::now();
820 let res = this.do_trim().await;
821 let duration = std::time::Instant::now() - start_time;
822 next_timer = interval.clone();
823 match res {
824 Ok(bytes_trimmed) => info!(
825 "Trimmed {bytes_trimmed} bytes in {duration:?}. Next trim in \
826 {next_timer:?}",
827 ),
828 Err(e) => error!(e:?; "Failed to trim"),
829 }
830 }
831 }));
832 }
833
834 pub(crate) async fn reservation_for_transaction<'a>(
835 self: &Arc<Self>,
836 options: transaction::Options<'a>,
837 ) -> Result<(MetadataReservation, Option<&'a Reservation>, Option<Hold<'a>>), Error> {
838 if self.options.image_builder_mode.is_some() {
839 return Ok((MetadataReservation::Borrowed, None, None));
842 }
843 if !options.skip_journal_checks {
844 self.maybe_start_flush_task();
845 self.journal.check_journal_space().await?;
846 }
847
848 let mut hold = None;
862 let metadata_reservation = if options.borrow_metadata_space {
863 MetadataReservation::Borrowed
864 } else {
865 match options.allocator_reservation {
866 Some(reservation) => {
867 hold = Some(
868 reservation
869 .reserve(TRANSACTION_METADATA_MAX_AMOUNT)
870 .ok_or(FxfsError::NoSpace)?,
871 );
872 MetadataReservation::Hold(TRANSACTION_METADATA_MAX_AMOUNT)
873 }
874 None => {
875 let reservation = self
876 .allocator()
877 .reserve(None, TRANSACTION_METADATA_MAX_AMOUNT)
878 .ok_or(FxfsError::NoSpace)?;
879 MetadataReservation::Reservation(reservation)
880 }
881 }
882 };
883 Ok((metadata_reservation, options.allocator_reservation, hold))
884 }
885
886 pub(crate) async fn add_transaction(&self, skip_journal_checks: bool) {
887 if skip_journal_checks {
888 self.in_flight_transactions.fetch_add(1, Ordering::Relaxed);
889 } else {
890 let inc = || {
891 let mut in_flights = self.in_flight_transactions.load(Ordering::Relaxed);
892 while in_flights < MAX_IN_FLIGHT_TRANSACTIONS {
893 match self.in_flight_transactions.compare_exchange_weak(
894 in_flights,
895 in_flights + 1,
896 Ordering::Relaxed,
897 Ordering::Relaxed,
898 ) {
899 Ok(_) => return true,
900 Err(x) => in_flights = x,
901 }
902 }
903 return false;
904 };
905 while !inc() {
906 let listener = self.transaction_limit_event.listen();
907 if inc() {
908 break;
909 }
910 listener.await;
911 }
912 }
913 }
914
915 pub(crate) fn sub_transaction(&self) {
916 let old = self.in_flight_transactions.fetch_sub(1, Ordering::Relaxed);
917 assert!(old != 0);
918 if old <= MAX_IN_FLIGHT_TRANSACTIONS {
919 self.transaction_limit_event.notify(usize::MAX);
920 }
921 }
922
923 pub async fn truncate_guard(&self, store_id: u64, object_id: u64) -> TruncateGuard<'_> {
924 let keys = lock_keys![LockKey::truncate(store_id, object_id,)];
925 TruncateGuard(self.lock_manager().write_lock(keys).await)
926 }
927
928 async fn populate_stores_node(&self) -> Result<Inspector, Error> {
929 let inspector = fuchsia_inspect::Inspector::default();
930 let root = inspector.root();
931 root.record_child("__root", |n| self.root_store().record_data(n));
932 let object_manager = self.object_manager();
933 let volume_directory = object_manager.volume_directory();
934 let layer_set = volume_directory.store().tree().layer_set();
935 let mut merger = layer_set.merger();
936 let mut iter = volume_directory.iter(&mut merger).await?;
937 while let Some((name, id, _)) = iter.get() {
938 if let Some(store) = object_manager.store(id) {
939 root.record_child(name.to_string(), |n| store.record_data(n));
940 }
941 iter.advance().await?;
942 }
943 Ok(inspector)
944 }
945}
946
947pub enum TxnGuard<'a> {
948 Borrowed(&'a TxnGuard<'a>),
949 Owned(ReadGuard<'static>),
950}
951
952impl TxnGuard<'_> {
953 pub fn fs(&self) -> &Arc<FxFilesystem> {
954 match self {
955 TxnGuard::Borrowed(b) => b.fs(),
956 TxnGuard::Owned(o) => o.fs().unwrap(),
957 }
958 }
959}
960
961#[allow(dead_code)]
963pub struct TruncateGuard<'a>(WriteGuard<'a>);
964
965pub async fn mkfs(device: DeviceHolder) -> Result<DeviceHolder, Error> {
967 let fs = FxFilesystem::new_empty(device).await?;
968 fs.close().await?;
969 Ok(fs.take_device().await)
970}
971
972pub async fn mkfs_with_volume(
976 device: DeviceHolder,
977 volume_name: &str,
978 crypt: Option<Arc<dyn Crypt>>,
979) -> Result<DeviceHolder, Error> {
980 let fs = FxFilesystem::new_empty(device).await?;
981 {
982 let root_volume = root_volume(fs.clone()).await.expect("Open root_volume failed");
985 root_volume
986 .new_volume(
987 volume_name,
988 NewChildStoreOptions {
989 options: StoreOptions { crypt, ..StoreOptions::default() },
990 ..Default::default()
991 },
992 )
993 .await
994 .expect("Create volume failed");
995 }
996 fs.close().await?;
997 Ok(fs.take_device().await)
998}
999
1000struct FsckAfterEveryTransaction {
1001 fs: OnceLock<Weak<FxFilesystem>>,
1002 old_hook: PostCommitHook,
1003}
1004
1005impl FsckAfterEveryTransaction {
1006 fn new(old_hook: PostCommitHook) -> Arc<Self> {
1007 Arc::new(Self { fs: OnceLock::new(), old_hook })
1008 }
1009
1010 async fn run(self: Arc<Self>) {
1011 if let Some(fs) = self.fs.get().and_then(Weak::upgrade) {
1012 let options = FsckOptions {
1013 fail_on_warning: true,
1014 no_lock: true,
1015 quiet: true,
1016 ..Default::default()
1017 };
1018 fsck_with_options(fs.clone(), &options).await.expect("fsck failed");
1019 let object_manager = fs.object_manager();
1020 for store in object_manager.unlocked_stores() {
1021 let store_id = store.store_object_id();
1022 if !object_manager.is_system_store(store_id) {
1023 fsck_volume_with_options(fs.as_ref(), &options, store_id, None)
1024 .await
1025 .expect("fsck_volume_with_options failed");
1026 }
1027 }
1028 }
1029 if let Some(old_hook) = self.old_hook.as_ref() {
1030 old_hook().await;
1031 }
1032 }
1033}
1034
1035#[cfg(test)]
1036mod tests {
1037 use super::{FxFilesystem, FxFilesystemBuilder, SyncOptions};
1038 use crate::fsck::{fsck, fsck_volume};
1039 use crate::log::*;
1040 use crate::lsm_tree::Operation;
1041 use crate::lsm_tree::types::Item;
1042 use crate::object_handle::{
1043 INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle, WriteObjectHandle,
1044 };
1045 use crate::object_store::directory::{Directory, replace_child};
1046 use crate::object_store::journal::JournalOptions;
1047 use crate::object_store::journal::super_block::SuperBlockInstance;
1048 use crate::object_store::transaction::{LockKey, Options, lock_keys};
1049 use crate::object_store::volume::root_volume;
1050 use crate::object_store::{
1051 HandleOptions, NewChildStoreOptions, ObjectDescriptor, ObjectStore, StoreOptions,
1052 };
1053 use crate::range::RangeExt;
1054 use fuchsia_async as fasync;
1055 use fuchsia_sync::Mutex;
1056 use futures::future::join_all;
1057 use futures::stream::{FuturesUnordered, TryStreamExt};
1058 use fxfs_insecure_crypto::InsecureCrypt;
1059 use rustc_hash::FxHashMap as HashMap;
1060 use std::ops::Range;
1061 use std::sync::Arc;
1062 use std::sync::atomic::{self, AtomicU32};
1063 use std::time::Duration;
1064 use storage_device::DeviceHolder;
1065 use storage_device::fake_device::{self, FakeDevice};
1066 use test_case::test_case;
1067
1068 const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
1069
1070 #[fuchsia::test(threads = 10)]
1071 async fn test_compaction() {
1072 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1073
1074 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1076 let root_store = fs.root_store();
1077 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1078 .await
1079 .expect("open failed");
1080
1081 let mut tasks = Vec::new();
1082 for i in 0..2 {
1083 let mut transaction = fs
1084 .clone()
1085 .new_transaction(
1086 lock_keys![LockKey::object(
1087 root_store.store_object_id(),
1088 root_directory.object_id()
1089 )],
1090 Options::default(),
1091 )
1092 .await
1093 .expect("new_transaction failed");
1094 let handle = root_directory
1095 .create_child_file(&mut transaction, &format!("{}", i))
1096 .await
1097 .expect("create_child_file failed");
1098 transaction.commit().await.expect("commit failed");
1099 tasks.push(fasync::Task::spawn(async move {
1100 const TEST_DATA: &[u8] = b"hello";
1101 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
1102 buf.as_mut_slice().copy_from_slice(TEST_DATA);
1103 for _ in 0..1500 {
1104 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1105 }
1106 }));
1107 }
1108 join_all(tasks).await;
1109 fs.sync(SyncOptions::default()).await.expect("sync failed");
1110
1111 fsck(fs.clone()).await.expect("fsck failed");
1112 fs.close().await.expect("Close failed");
1113 }
1114
1115 #[fuchsia::test(threads = 10)]
1116 async fn test_replay_is_identical() {
1117 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1118 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1119
1120 fs.close().await.expect("close failed");
1123 let device = fs.take_device().await;
1124 device.reopen(false);
1125
1126 struct Mutations<K, V>(Mutex<Vec<(Operation, Item<K, V>)>>);
1127
1128 impl<K: Clone, V: Clone> Mutations<K, V> {
1129 fn new() -> Self {
1130 Mutations(Mutex::new(Vec::new()))
1131 }
1132
1133 fn push(&self, operation: Operation, item: &Item<K, V>) {
1134 self.0.lock().push((operation, item.clone()));
1135 }
1136 }
1137
1138 let open_fs = |device,
1139 object_mutations: Arc<Mutex<HashMap<_, _>>>,
1140 allocator_mutations: Arc<Mutations<_, _>>| async {
1141 FxFilesystemBuilder::new()
1142 .journal_options(JournalOptions { reclaim_size: u64::MAX, ..Default::default() })
1143 .on_new_allocator(move |allocator| {
1144 let allocator_mutations = allocator_mutations.clone();
1145 allocator.tree().set_mutation_callback(Some(Box::new(move |op, item| {
1146 allocator_mutations.push(op, item)
1147 })));
1148 })
1149 .on_new_store(move |store| {
1150 let mutations = Arc::new(Mutations::new());
1151 object_mutations.lock().insert(store.store_object_id(), mutations.clone());
1152 store.tree().set_mutation_callback(Some(Box::new(move |op, item| {
1153 mutations.push(op, item)
1154 })));
1155 })
1156 .open(device)
1157 .await
1158 .expect("open failed")
1159 };
1160
1161 let allocator_mutations = Arc::new(Mutations::new());
1162 let object_mutations = Arc::new(Mutex::new(HashMap::default()));
1163 let fs = open_fs(device, object_mutations.clone(), allocator_mutations.clone()).await;
1164
1165 let root_store = fs.root_store();
1166 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1167 .await
1168 .expect("open failed");
1169
1170 let mut transaction = fs
1171 .clone()
1172 .new_transaction(
1173 lock_keys![LockKey::object(
1174 root_store.store_object_id(),
1175 root_directory.object_id()
1176 )],
1177 Options::default(),
1178 )
1179 .await
1180 .expect("new_transaction failed");
1181 let object = root_directory
1182 .create_child_file(&mut transaction, "test")
1183 .await
1184 .expect("create_child_file failed");
1185 transaction.commit().await.expect("commit failed");
1186
1187 let buf = object.allocate_buffer(10000).await;
1189 object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1190
1191 object.write_or_append(Some(5000), buf.as_ref()).await.expect("write failed");
1193
1194 object.truncate(3000).await.expect("truncate failed");
1196
1197 let mut transaction = fs
1199 .clone()
1200 .new_transaction(
1201 lock_keys![
1202 LockKey::object(root_store.store_object_id(), root_directory.object_id()),
1203 LockKey::object(root_store.store_object_id(), object.object_id()),
1204 ],
1205 Options::default(),
1206 )
1207 .await
1208 .expect("new_transaction failed");
1209
1210 replace_child(&mut transaction, None, (&root_directory, "test"))
1211 .await
1212 .expect("replace_child failed");
1213
1214 transaction.commit().await.expect("commit failed");
1215
1216 root_store
1218 .tombstone_object(object.object_id(), Options::default())
1219 .await
1220 .expect("tombstone failed");
1221
1222 fs.close().await.expect("close failed");
1224
1225 let metadata_reservation_amount = fs.object_manager().metadata_reservation().amount();
1226
1227 let device = fs.take_device().await;
1228 device.reopen(false);
1229
1230 let replayed_object_mutations = Arc::new(Mutex::new(HashMap::default()));
1231 let replayed_allocator_mutations = Arc::new(Mutations::new());
1232 let fs = open_fs(
1233 device,
1234 replayed_object_mutations.clone(),
1235 replayed_allocator_mutations.clone(),
1236 )
1237 .await;
1238
1239 let m1 = object_mutations.lock();
1240 let m2 = replayed_object_mutations.lock();
1241 assert_eq!(m1.len(), m2.len());
1242 for (store_id, mutations) in &*m1 {
1243 let mutations = mutations.0.lock();
1244 let replayed = m2.get(&store_id).expect("Found unexpected store").0.lock();
1245 assert_eq!(mutations.len(), replayed.len());
1246 for ((op1, i1), (op2, i2)) in mutations.iter().zip(replayed.iter()) {
1247 assert_eq!(op1, op2);
1248 assert_eq!(i1.key, i2.key);
1249 assert_eq!(i1.value, i2.value);
1250 assert_eq!(i1.sequence, i2.sequence);
1251 }
1252 }
1253
1254 let a1 = allocator_mutations.0.lock();
1255 let a2 = replayed_allocator_mutations.0.lock();
1256 assert_eq!(a1.len(), a2.len());
1257 for ((op1, i1), (op2, i2)) in a1.iter().zip(a2.iter()) {
1258 assert_eq!(op1, op2);
1259 assert_eq!(i1.key, i2.key);
1260 assert_eq!(i1.value, i2.value);
1261 assert_eq!(i1.sequence, i2.sequence);
1262 }
1263
1264 assert_eq!(
1265 fs.object_manager().metadata_reservation().amount(),
1266 metadata_reservation_amount
1267 );
1268 }
1269
1270 #[fuchsia::test]
1271 async fn test_max_in_flight_transactions() {
1272 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1273 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1274
1275 let transactions = FuturesUnordered::new();
1276 for _ in 0..super::MAX_IN_FLIGHT_TRANSACTIONS {
1277 transactions.push(fs.clone().new_transaction(lock_keys![], Options::default()));
1278 }
1279 let mut transactions: Vec<_> = transactions.try_collect().await.unwrap();
1280
1281 let mut fut = std::pin::pin!(fs.clone().new_transaction(lock_keys![], Options::default()));
1283 assert!(futures::poll!(&mut fut).is_pending());
1284
1285 transactions.pop();
1287
1288 assert!(futures::poll!(&mut fut).is_ready());
1289 }
1290
1291 #[fuchsia::test(threads = 10)]
1293 async fn test_continuously_trim() {
1294 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1295 let fs = FxFilesystemBuilder::new()
1296 .trim_config(Some((Duration::ZERO, Duration::ZERO)))
1297 .format(true)
1298 .open(device)
1299 .await
1300 .expect("open failed");
1301 fasync::Timer::new(Duration::from_millis(10)).await;
1303
1304 let root_store = fs.root_store();
1307 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1308 .await
1309 .expect("open failed");
1310 for _ in 0..100 {
1311 let mut transaction = fs
1312 .clone()
1313 .new_transaction(
1314 lock_keys![LockKey::object(
1315 root_store.store_object_id(),
1316 root_directory.object_id()
1317 )],
1318 Options::default(),
1319 )
1320 .await
1321 .expect("new_transaction failed");
1322 let object = root_directory
1323 .create_child_file(&mut transaction, "test")
1324 .await
1325 .expect("create_child_file failed");
1326 transaction.commit().await.expect("commit failed");
1327
1328 {
1329 let buf = object.allocate_buffer(1024).await;
1330 object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1331 }
1332 std::mem::drop(object);
1333
1334 let mut transaction = root_directory
1335 .acquire_context_for_replace(None, "test", true)
1336 .await
1337 .expect("acquire_context_for_replace failed")
1338 .transaction;
1339 replace_child(&mut transaction, None, (&root_directory, "test"))
1340 .await
1341 .expect("replace_child failed");
1342 transaction.commit().await.expect("commit failed");
1343 }
1344 fs.close().await.expect("close failed");
1345 }
1346
1347 #[test_case(true; "test power fail with barriers")]
1348 #[test_case(false; "test power fail with checksums")]
1349 #[fuchsia::test]
1350 async fn test_power_fail(barriers_enabled: bool) {
1351 for _ in 0..10 {
1354 let (store_id, device, test_file_object_id) = {
1355 let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
1356 let fs = if barriers_enabled {
1357 FxFilesystemBuilder::new()
1358 .barriers_enabled(true)
1359 .format(true)
1360 .open(device)
1361 .await
1362 .expect("new filesystem failed")
1363 } else {
1364 FxFilesystem::new_empty(device).await.expect("new_empty failed")
1365 };
1366 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
1367
1368 fs.sync(SyncOptions { flush_device: true, ..SyncOptions::default() })
1369 .await
1370 .expect("sync failed");
1371
1372 let store = root_volume
1373 .new_volume(
1374 "test",
1375 NewChildStoreOptions {
1376 options: StoreOptions {
1377 crypt: Some(Arc::new(InsecureCrypt::new())),
1378 ..StoreOptions::default()
1379 },
1380 ..Default::default()
1381 },
1382 )
1383 .await
1384 .expect("new_volume failed");
1385 let root_directory = Directory::open(&store, store.root_directory_object_id())
1386 .await
1387 .expect("open failed");
1388
1389 async fn create_files(store: &Arc<ObjectStore>, prefix: &str) {
1391 let fs = store.filesystem();
1392 let root_directory = Directory::open(store, store.root_directory_object_id())
1393 .await
1394 .expect("open failed");
1395 for i in 0..100 {
1396 let mut transaction = fs
1397 .clone()
1398 .new_transaction(
1399 lock_keys![LockKey::object(
1400 store.store_object_id(),
1401 store.root_directory_object_id()
1402 )],
1403 Options::default(),
1404 )
1405 .await
1406 .expect("new_transaction failed");
1407 root_directory
1408 .create_child_file(&mut transaction, &format!("{prefix} {i}"))
1409 .await
1410 .expect("create_child_file failed");
1411 transaction.commit().await.expect("commit failed");
1412 }
1413 }
1414
1415 create_files(&store, "A").await;
1417
1418 let mut transaction = fs
1421 .clone()
1422 .new_transaction(
1423 lock_keys![LockKey::object(
1424 store.store_object_id(),
1425 store.root_directory_object_id()
1426 )],
1427 Options::default(),
1428 )
1429 .await
1430 .expect("new_transaction failed");
1431 let object = root_directory
1432 .create_child_file(&mut transaction, "test")
1433 .await
1434 .expect("create_child_file failed");
1435 transaction.commit().await.expect("commit failed");
1436
1437 let mut transaction =
1438 object.new_transaction().await.expect("new_transaction failed");
1439 let mut buffer = object.allocate_buffer(4096).await;
1440 buffer.as_mut_slice().fill(0xed);
1441 object
1442 .txn_write(&mut transaction, 0, buffer.as_ref())
1443 .await
1444 .expect("txn_write failed");
1445 transaction.commit().await.expect("commit failed");
1446
1447 create_files(&store, "B").await;
1449
1450 fs.sync(SyncOptions::default()).await.expect("sync failed");
1453
1454 fasync::Timer::new(Duration::from_millis(10)).await;
1460
1461 (
1462 store.store_object_id(),
1463 fs.device().snapshot().expect("snapshot failed"),
1464 object.object_id(),
1465 )
1466 };
1467
1468 device
1471 .discard_random_since_last_flush()
1472 .expect("discard_random_since_last_flush failed");
1473
1474 let fs = FxFilesystem::open(device).await.expect("open failed");
1475 fsck(fs.clone()).await.expect("fsck failed");
1476
1477 let mut check_test_file = false;
1478
1479 let object_id = if fs.object_manager().store(store_id).is_some() {
1482 fsck_volume(&fs, store_id, Some(Arc::new(InsecureCrypt::new())))
1483 .await
1484 .expect("fsck_volume failed");
1485
1486 let store = root_volume(fs.clone())
1490 .await
1491 .expect("root_volume failed")
1492 .volume(
1493 "test",
1494 StoreOptions {
1495 crypt: Some(Arc::new(InsecureCrypt::new())),
1496 ..StoreOptions::default()
1497 },
1498 )
1499 .await
1500 .expect("volume failed");
1501
1502 let root_directory = Directory::open(&store, store.root_directory_object_id())
1503 .await
1504 .expect("open failed");
1505
1506 let mut transaction = fs
1507 .clone()
1508 .new_transaction(
1509 lock_keys![LockKey::object(
1510 store.store_object_id(),
1511 store.root_directory_object_id()
1512 )],
1513 Options::default(),
1514 )
1515 .await
1516 .expect("new_transaction failed");
1517 let object = root_directory
1518 .create_child_file(&mut transaction, &format!("C"))
1519 .await
1520 .expect("create_child_file failed");
1521 transaction.commit().await.expect("commit failed");
1522
1523 if let Ok(test_file) = ObjectStore::open_object(
1525 &store,
1526 test_file_object_id,
1527 HandleOptions::default(),
1528 None,
1529 )
1530 .await
1531 {
1532 let mut buffer = test_file.allocate_buffer(4096).await;
1534 let bytes = test_file.read(0, buffer.as_mut()).await.expect("read failed");
1535 if bytes == 4096 {
1536 let expected = [0xed; 4096];
1537 assert_eq!(buffer.as_slice(), &expected);
1538 } else {
1539 assert_eq!(bytes, 0);
1541 }
1542
1543 let mut transaction =
1545 test_file.new_transaction().await.expect("new_transaction failed");
1546 buffer.as_mut_slice().fill(0x37);
1547 test_file
1548 .txn_write(&mut transaction, 0, buffer.as_ref())
1549 .await
1550 .expect("txn_write failed");
1551 transaction.commit().await.expect("commit failed");
1552 check_test_file = true;
1553 }
1554
1555 object.object_id()
1556 } else {
1557 INVALID_OBJECT_ID
1558 };
1559
1560 fs.close().await.expect("close failed");
1562 let device = fs.take_device().await;
1563 device.reopen(false);
1564
1565 let fs = FxFilesystem::open(device).await.expect("open failed");
1566 fsck(fs.clone()).await.expect("fsck failed");
1567
1568 if object_id != INVALID_OBJECT_ID {
1571 fsck_volume(&fs, store_id, Some(Arc::new(InsecureCrypt::new())))
1572 .await
1573 .expect("fsck_volume failed");
1574
1575 let store = root_volume(fs.clone())
1576 .await
1577 .expect("root_volume failed")
1578 .volume(
1579 "test",
1580 StoreOptions {
1581 crypt: Some(Arc::new(InsecureCrypt::new())),
1582 ..StoreOptions::default()
1583 },
1584 )
1585 .await
1586 .expect("volume failed");
1587 ObjectStore::open_object(&store, object_id, HandleOptions::default(), None)
1589 .await
1590 .expect("open_object failed");
1591
1592 if check_test_file {
1594 info!("Checking test file for modification");
1595 let test_file = ObjectStore::open_object(
1596 &store,
1597 test_file_object_id,
1598 HandleOptions::default(),
1599 None,
1600 )
1601 .await
1602 .expect("open_object failed");
1603 let mut buffer = test_file.allocate_buffer(4096).await;
1604 assert_eq!(
1605 test_file.read(0, buffer.as_mut()).await.expect("read failed"),
1606 4096
1607 );
1608 let expected = [0x37; 4096];
1609 assert_eq!(buffer.as_slice(), &expected);
1610 }
1611 }
1612
1613 fs.close().await.expect("close failed");
1614 }
1615 }
1616
1617 #[fuchsia::test]
1618 async fn test_barrier_not_emitted_when_transaction_has_no_data() {
1619 let barrier_count = Arc::new(AtomicU32::new(0));
1620
1621 struct Observer(Arc<AtomicU32>);
1622
1623 impl fake_device::Observer for Observer {
1624 fn barrier(&self) {
1625 self.0.fetch_add(1, atomic::Ordering::Relaxed);
1626 }
1627 }
1628
1629 let mut fake_device = FakeDevice::new(8192, 4096);
1630 fake_device.set_observer(Box::new(Observer(barrier_count.clone())));
1631 let device = DeviceHolder::new(fake_device);
1632 let fs = FxFilesystemBuilder::new()
1633 .barriers_enabled(true)
1634 .format(true)
1635 .open(device)
1636 .await
1637 .expect("new filesystem failed");
1638
1639 {
1640 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1641 root_vol
1642 .new_volume(
1643 "test",
1644 NewChildStoreOptions {
1645 options: StoreOptions {
1646 crypt: Some(Arc::new(InsecureCrypt::new())),
1647 ..StoreOptions::default()
1648 },
1649 ..NewChildStoreOptions::default()
1650 },
1651 )
1652 .await
1653 .expect("there is no test volume");
1654 fs.close().await.expect("close failed");
1655 }
1656 let device = fs.take_device().await;
1659 device.reopen(false);
1660 let fs = FxFilesystemBuilder::new()
1661 .barriers_enabled(true)
1662 .open(device)
1663 .await
1664 .expect("new filesystem failed");
1665 let expected_barrier_count = barrier_count.load(atomic::Ordering::Relaxed);
1666
1667 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1668 let store = root_vol
1669 .volume(
1670 "test",
1671 StoreOptions {
1672 crypt: Some(Arc::new(InsecureCrypt::new())),
1673 ..StoreOptions::default()
1674 },
1675 )
1676 .await
1677 .expect("there is no test volume");
1678
1679 let fs = store.filesystem();
1681 let root_directory =
1682 Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
1683 for i in 0..100 {
1684 let mut transaction = fs
1685 .clone()
1686 .new_transaction(
1687 lock_keys![LockKey::object(
1688 store.store_object_id(),
1689 store.root_directory_object_id()
1690 )],
1691 Options::default(),
1692 )
1693 .await
1694 .expect("new_transaction failed");
1695 root_directory
1696 .create_child_file(&mut transaction, &format!("A {i}"))
1697 .await
1698 .expect("create_child_file failed");
1699 transaction.commit().await.expect("commit failed");
1700 }
1701
1702 fs.close().await.expect("close failed");
1704 assert_eq!(expected_barrier_count, barrier_count.load(atomic::Ordering::Relaxed));
1706 }
1707
1708 #[fuchsia::test]
1709 async fn test_barrier_emitted_when_transaction_includes_data() {
1710 let barrier_count = Arc::new(AtomicU32::new(0));
1711
1712 struct Observer(Arc<AtomicU32>);
1713
1714 impl fake_device::Observer for Observer {
1715 fn barrier(&self) {
1716 self.0.fetch_add(1, atomic::Ordering::Relaxed);
1717 }
1718 }
1719
1720 let mut fake_device = FakeDevice::new(8192, 4096);
1721 fake_device.set_observer(Box::new(Observer(barrier_count.clone())));
1722 let device = DeviceHolder::new(fake_device);
1723 let fs = FxFilesystemBuilder::new()
1724 .barriers_enabled(true)
1725 .format(true)
1726 .open(device)
1727 .await
1728 .expect("new filesystem failed");
1729
1730 {
1731 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1732 root_vol
1733 .new_volume(
1734 "test",
1735 NewChildStoreOptions {
1736 options: StoreOptions {
1737 crypt: Some(Arc::new(InsecureCrypt::new())),
1738 ..StoreOptions::default()
1739 },
1740 ..NewChildStoreOptions::default()
1741 },
1742 )
1743 .await
1744 .expect("there is no test volume");
1745 fs.close().await.expect("close failed");
1746 }
1747 let device = fs.take_device().await;
1750 device.reopen(false);
1751 let fs = FxFilesystemBuilder::new()
1752 .barriers_enabled(true)
1753 .open(device)
1754 .await
1755 .expect("new filesystem failed");
1756 let expected_barrier_count = barrier_count.load(atomic::Ordering::Relaxed);
1757
1758 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1759 let store = root_vol
1760 .volume(
1761 "test",
1762 StoreOptions {
1763 crypt: Some(Arc::new(InsecureCrypt::new())),
1764 ..StoreOptions::default()
1765 },
1766 )
1767 .await
1768 .expect("there is no test volume");
1769
1770 let fs: Arc<FxFilesystem> = store.filesystem();
1772 let root_directory =
1773 Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
1774
1775 let mut transaction = fs
1776 .clone()
1777 .new_transaction(
1778 lock_keys![LockKey::object(
1779 store.store_object_id(),
1780 store.root_directory_object_id()
1781 )],
1782 Options::default(),
1783 )
1784 .await
1785 .expect("new_transaction failed");
1786 let object = root_directory
1787 .create_child_file(&mut transaction, "test")
1788 .await
1789 .expect("create_child_file failed");
1790 transaction.commit().await.expect("commit failed");
1791
1792 let mut transaction = object.new_transaction().await.expect("new_transaction failed");
1793 let mut buffer = object.allocate_buffer(4096).await;
1794 buffer.as_mut_slice().fill(0xed);
1795 object.txn_write(&mut transaction, 0, buffer.as_ref()).await.expect("txn_write failed");
1796 transaction.commit().await.expect("commit failed");
1797
1798 fs.close().await.expect("close failed");
1800 assert!(expected_barrier_count < barrier_count.load(atomic::Ordering::Relaxed));
1802 }
1803
1804 #[fuchsia::test]
1805 async fn test_image_builder_mode_no_early_writes() {
1806 const BLOCK_SIZE: u32 = 4096;
1807 let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
1808 device.reopen(true);
1809 let fs = FxFilesystemBuilder::new()
1810 .format(true)
1811 .image_builder_mode(Some(SuperBlockInstance::A))
1812 .open(device)
1813 .await
1814 .expect("open failed");
1815 fs.close().await.expect("closed");
1818 }
1819
1820 #[fuchsia::test]
1821 async fn test_image_builder_mode() {
1822 const BLOCK_SIZE: u32 = 4096;
1823 const EXISTING_FILE_RANGE: Range<u64> = 4096 * 1024..4096 * 1025;
1824 let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
1825
1826 {
1828 let mut write_buf =
1829 device.allocate_buffer(EXISTING_FILE_RANGE.length().unwrap() as usize).await;
1830 write_buf.as_mut_slice().fill(0xf0);
1831 device.write(EXISTING_FILE_RANGE.start, write_buf.as_ref()).await.expect("write");
1832 }
1833
1834 device.reopen(true);
1835
1836 let device = {
1837 let fs = FxFilesystemBuilder::new()
1838 .format(true)
1839 .image_builder_mode(Some(SuperBlockInstance::B))
1840 .open(device)
1841 .await
1842 .expect("open failed");
1843 {
1844 let root_store = fs.root_store();
1845 let root_directory =
1846 Directory::open(&root_store, root_store.root_directory_object_id())
1847 .await
1848 .expect("open failed");
1849 let handle;
1851 {
1852 let mut transaction = fs
1853 .clone()
1854 .new_transaction(
1855 lock_keys![LockKey::object(
1856 root_directory.store().store_object_id(),
1857 root_directory.object_id()
1858 )],
1859 Options::default(),
1860 )
1861 .await
1862 .expect("new transaction");
1863 handle = root_directory
1864 .create_child_file(&mut transaction, "test")
1865 .await
1866 .expect("create file");
1867 handle.extend(&mut transaction, EXISTING_FILE_RANGE).await.expect("extend");
1868 transaction.commit().await.expect("commit");
1869 }
1870 }
1871 fs.device().reopen(false);
1872 fs.finalize().await.expect("finalize");
1873 fs.close().await.expect("close");
1874 fs.take_device().await
1875 };
1876 device.reopen(false);
1877 let fs = FxFilesystem::open(device).await.expect("open failed");
1878 fsck(fs.clone()).await.expect("fsck failed");
1879
1880 let root_store = fs.root_store();
1882 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1883 .await
1884 .expect("open failed");
1885 let (object_id, descriptor, _) =
1886 root_directory.lookup("test").await.expect("lookup failed").unwrap();
1887 assert_eq!(descriptor, ObjectDescriptor::File);
1888 let test_file =
1889 ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
1890 .await
1891 .expect("open failed");
1892 let mut read_buf =
1893 test_file.allocate_buffer(EXISTING_FILE_RANGE.length().unwrap() as usize).await;
1894 test_file.read(0, read_buf.as_mut()).await.expect("read failed");
1895 assert_eq!(read_buf.as_slice(), [0xf0; 4096]);
1896 fs.close().await.expect("closed");
1897 }
1898}