Skip to main content

fxfs/
filesystem.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::errors::FxfsError;
6use crate::fsck::{FsckOptions, fsck_volume_with_options, fsck_with_options};
7use crate::log::*;
8use crate::metrics;
9use crate::object_store::allocator::{Allocator, Hold, Reservation};
10use crate::object_store::directory::Directory;
11use crate::object_store::graveyard::Graveyard;
12use crate::object_store::journal::super_block::{SuperBlockHeader, SuperBlockInstance};
13use crate::object_store::journal::{self, Journal, JournalCheckpoint, JournalOptions};
14use crate::object_store::object_manager::ObjectManager;
15use crate::object_store::transaction::{
16    self, AssocObj, LockKey, LockKeys, LockManager, MetadataReservation, Mutation, ReadGuard,
17    TRANSACTION_METADATA_MAX_AMOUNT, Transaction, WriteGuard, lock_keys,
18};
19use crate::object_store::volume::{VOLUMES_DIRECTORY, root_volume};
20use crate::object_store::{NewChildStoreOptions, ObjectStore, StoreOptions};
21use crate::range::RangeExt;
22use crate::serialized_types::{LATEST_VERSION, Version};
23use anyhow::{Context, Error, anyhow, bail};
24use async_trait::async_trait;
25use event_listener::Event;
26use fuchsia_async as fasync;
27use fuchsia_inspect::{Inspector, LazyNode, NumericProperty as _, UintProperty};
28use fuchsia_sync::Mutex;
29use futures::FutureExt;
30use fxfs_crypto::Crypt;
31use fxfs_trace::{TraceFutureExt, trace_future_args};
32use static_assertions::const_assert;
33use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
34use std::sync::{Arc, OnceLock, Weak};
35use storage_device::{Device, DeviceHolder};
36
37pub const MIN_BLOCK_SIZE: u64 = 4096;
38pub const MAX_BLOCK_SIZE: u64 = u16::MAX as u64 + 1;
39
40// Whilst Fxfs could support up to u64::MAX, off_t is i64 so allowing files larger than that becomes
41// difficult to deal with via the POSIX APIs. Additionally, PagedObjectHandle only sees data get
42// modified in page chunks so to prevent writes at i64::MAX the entire page containing i64::MAX
43// needs to be excluded.
44pub const MAX_FILE_SIZE: u64 = i64::MAX as u64 - 4095;
45const_assert!(9223372036854771712 == MAX_FILE_SIZE);
46
47// The maximum number of transactions that can be in-flight at any time.
48const MAX_IN_FLIGHT_TRANSACTIONS: u64 = 4;
49
50// Start trimming 1 hour after boot.  The idea here is to wait until the initial flurry of
51// activity during boot is finished.  This is a rough heuristic and may need to change later if
52// performance is affected.
53const TRIM_AFTER_BOOT_TIMER: std::time::Duration = std::time::Duration::from_secs(60 * 60);
54
55// After the initial trim, perform another trim every 24 hours.
56const TRIM_INTERVAL_TIMER: std::time::Duration = std::time::Duration::from_secs(60 * 60 * 24);
57
58/// How often to clean the transfer buffer.
59// TODO(https://fxbug.dev/489725256) Configure the task to run when fxfs is idle.
60const CLEAN_TRANSFER_BUFFER_INTERVAL: std::time::Duration = std::time::Duration::from_secs(60);
61
62/// Holds information on an Fxfs Filesystem
63pub struct Info {
64    pub total_bytes: u64,
65    pub used_bytes: u64,
66}
67
68pub type PostCommitHook =
69    Option<Box<dyn Fn() -> futures::future::BoxFuture<'static, ()> + Send + Sync>>;
70
71pub type PreCommitHook = Option<Box<dyn Fn(&Transaction<'_>) -> Result<(), Error> + Send + Sync>>;
72
73pub struct Options {
74    /// True if the filesystem is read-only.
75    pub read_only: bool,
76
77    /// The metadata keys will be rolled after this many bytes.  This must be large enough such that
78    /// we can't end up with more than two live keys (so it must be bigger than the maximum possible
79    /// size of unflushed journal contents).  This is exposed for testing purposes.
80    pub roll_metadata_key_byte_count: u64,
81
82    /// A callback that runs before every transaction is committed.  If this callback returns an
83    /// error then the transaction is failed with that error.
84    pub pre_commit_hook: PreCommitHook,
85
86    /// A callback that runs after every transaction has been committed.  This will be called whilst
87    /// a lock is held which will block more transactions from being committed.
88    pub post_commit_hook: PostCommitHook,
89
90    /// If true, don't do an initial reap of the graveyard at mount time.  This is useful for
91    /// testing.
92    pub skip_initial_reap: bool,
93
94    // The first duration is how long after the filesystem has been mounted to perform an initial
95    // trim.  The second is the interval to repeat trimming thereafter.  If set to None, no trimming
96    // is done.
97    // Default values are (5 minutes, 24 hours).
98    pub trim_config: Option<(std::time::Duration, std::time::Duration)>,
99
100    // If set, journal will not be used for writes. The user must call 'close' when finished.
101    // The provided superblock instance will be written upon close().
102    pub image_builder_mode: Option<SuperBlockInstance>,
103
104    // If true, the filesystem will use the hardware's inline crypto engine to write encrypted
105    // data. Requires the block device to support inline encryption and for `barriers_enabled` to
106    // be true.
107    // TODO(https://fxbug.dev/393196849): For now, this flag only prevents the filesystem from
108    // computing checksums. Update this comment when the filesystem actually uses inline
109    // encryption.
110    pub inline_crypto_enabled: bool,
111
112    // Configures the filesystem to use barriers instead of checksums to ensure consistency.
113    // Checksums may be computed and stored in extent records but will no longer be stored in the
114    // journal. The journal will use barriers to enforce proper ordering between data and metadata
115    // writes. Must be true if `inline_crypto_enabled` is true.
116    pub barriers_enabled: bool,
117}
118
119impl Default for Options {
120    fn default() -> Self {
121        Options {
122            roll_metadata_key_byte_count: 128 * 1024 * 1024,
123            read_only: false,
124            pre_commit_hook: None,
125            post_commit_hook: None,
126            skip_initial_reap: false,
127            trim_config: Some((TRIM_AFTER_BOOT_TIMER, TRIM_INTERVAL_TIMER)),
128            image_builder_mode: None,
129            inline_crypto_enabled: false,
130            barriers_enabled: false,
131        }
132    }
133}
134
135/// The context in which a transaction is being applied.
136pub struct ApplyContext<'a, 'b> {
137    /// The mode indicates whether the transaction is being replayed.
138    pub mode: ApplyMode<'a, 'b>,
139
140    /// The transaction checkpoint for this mutation.
141    pub checkpoint: JournalCheckpoint,
142}
143
144/// A transaction can be applied during replay or on a live running system (in which case a
145/// transaction object will be available).
146pub enum ApplyMode<'a, 'b> {
147    Replay,
148    Live(&'a Transaction<'b>),
149}
150
151impl ApplyMode<'_, '_> {
152    pub fn is_replay(&self) -> bool {
153        matches!(self, ApplyMode::Replay)
154    }
155
156    pub fn is_live(&self) -> bool {
157        matches!(self, ApplyMode::Live(_))
158    }
159}
160
161/// Objects that use journaling to track mutations (`Allocator` and `ObjectStore`) implement this.
162/// This is primarily used by `ObjectManager` and `SuperBlock` with flush calls used in a few tests.
163#[async_trait]
164pub trait JournalingObject: Send + Sync {
165    /// This method get called when the transaction commits, which can either be during live
166    /// operation (See `ObjectManager::apply_mutation`) or during journal replay, in which case
167    /// transaction will be None (See `super_block::read`).
168    fn apply_mutation(
169        &self,
170        mutation: Mutation,
171        context: &ApplyContext<'_, '_>,
172        assoc_obj: AssocObj<'_>,
173    ) -> Result<(), Error>;
174
175    /// Called when a transaction fails to commit.
176    fn drop_mutation(&self, mutation: Mutation, transaction: &Transaction<'_>);
177
178    /// Flushes in-memory changes to the device (to allow journal space to be freed).
179    ///
180    /// Also returns the earliest version of a struct in the filesystem.
181    async fn flush(&self) -> Result<Version, Error>;
182
183    /// Writes a mutation to the journal.  This allows objects to encrypt or otherwise modify what
184    /// gets written to the journal.
185    fn write_mutation(&self, mutation: &Mutation, mut writer: journal::Writer<'_>) {
186        writer.write(mutation.clone());
187    }
188}
189
190#[derive(Default)]
191pub struct SyncOptions<'a> {
192    /// If set, the journal will be flushed, as well as the underlying block device.  This is much
193    /// more expensive, but ensures the contents of the journal are persisted (which also acts as a
194    /// barrier, ensuring all previous journal writes are observable by future operations).
195    /// Note that when this is not set, the journal is *not* synchronously flushed by the sync call,
196    /// and it will return before the journal flush completes.  In other words, some journal
197    /// mutations may still be buffered in memory after this call returns.
198    pub flush_device: bool,
199
200    /// A precondition that is evaluated whilst a lock is held that determines whether or not the
201    /// sync needs to proceed.
202    pub precondition: Option<Box<dyn FnOnce() -> bool + 'a + Send>>,
203}
204
205pub struct OpenFxFilesystem(Arc<FxFilesystem>);
206
207impl OpenFxFilesystem {
208    /// Waits for filesystem to be dropped (so callers should ensure all direct and indirect
209    /// references are dropped) and returns the device.  No attempt is made at a graceful shutdown.
210    pub async fn take_device(self) -> DeviceHolder {
211        let fut = self.device.take_when_dropped();
212        std::mem::drop(self);
213        debug_assert_not_too_long!(fut)
214    }
215}
216
217impl From<Arc<FxFilesystem>> for OpenFxFilesystem {
218    fn from(fs: Arc<FxFilesystem>) -> Self {
219        Self(fs)
220    }
221}
222
223impl Drop for OpenFxFilesystem {
224    fn drop(&mut self) {
225        if self.options.image_builder_mode.is_some()
226            && self.journal().image_builder_mode().is_some()
227        {
228            error!("OpenFxFilesystem in image_builder_mode dropped without calling close().");
229        }
230        if !self.options.read_only && !self.closed.load(Ordering::SeqCst) {
231            error!("OpenFxFilesystem dropped without first being closed. Data loss may occur.");
232        }
233    }
234}
235
236impl std::ops::Deref for OpenFxFilesystem {
237    type Target = Arc<FxFilesystem>;
238
239    fn deref(&self) -> &Self::Target {
240        &self.0
241    }
242}
243
244pub struct FxFilesystemBuilder {
245    format: bool,
246    trace: bool,
247    options: Options,
248    journal_options: JournalOptions,
249    on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
250    on_new_store: Option<Box<dyn Fn(&ObjectStore) + Send + Sync>>,
251    fsck_after_every_transaction: bool,
252}
253
254impl FxFilesystemBuilder {
255    pub fn new() -> Self {
256        Self {
257            format: false,
258            trace: false,
259            options: Options::default(),
260            journal_options: JournalOptions::default(),
261            on_new_allocator: None,
262            on_new_store: None,
263            fsck_after_every_transaction: false,
264        }
265    }
266
267    /// Sets whether the block device should be formatted when opened. Defaults to `false`.
268    pub fn format(mut self, format: bool) -> Self {
269        self.format = format;
270        self
271    }
272
273    /// Enables or disables trace level logging. Defaults to `false`.
274    pub fn trace(mut self, trace: bool) -> Self {
275        self.trace = trace;
276        self
277    }
278
279    /// Sets whether the filesystem will be opened in read-only mode. Defaults to `false`.
280    /// Incompatible with `format`.
281    pub fn read_only(mut self, read_only: bool) -> Self {
282        self.options.read_only = read_only;
283        self
284    }
285
286    /// For image building and in-place migration.
287    ///
288    /// This mode avoids the initial write of super blocks and skips the journal for all
289    /// transactions. The user *must* call `close()` before dropping the filesystem to trigger
290    /// a compaction of in-memory data structures, a minimal journal and a write to one
291    /// superblock (as specified).
292    pub fn image_builder_mode(mut self, mode: Option<SuperBlockInstance>) -> Self {
293        self.options.image_builder_mode = mode;
294        self
295    }
296
297    /// Sets how often the metadata keys are rolled. See `Options::roll_metadata_key_byte_count`.
298    pub fn roll_metadata_key_byte_count(mut self, roll_metadata_key_byte_count: u64) -> Self {
299        self.options.roll_metadata_key_byte_count = roll_metadata_key_byte_count;
300        self
301    }
302
303    /// Sets a callback that runs before every transaction. See `Options::pre_commit_hook`.
304    pub fn pre_commit_hook(
305        mut self,
306        hook: impl Fn(&Transaction<'_>) -> Result<(), Error> + Send + Sync + 'static,
307    ) -> Self {
308        self.options.pre_commit_hook = Some(Box::new(hook));
309        self
310    }
311
312    /// Sets a callback that runs after every transaction has been committed. See
313    /// `Options::post_commit_hook`.
314    pub fn post_commit_hook(
315        mut self,
316        hook: impl Fn() -> futures::future::BoxFuture<'static, ()> + Send + Sync + 'static,
317    ) -> Self {
318        self.options.post_commit_hook = Some(Box::new(hook));
319        self
320    }
321
322    /// Sets whether to do an initial reap of the graveyard at mount time. See
323    /// `Options::skip_initial_reap`. Defaults to `false`.
324    pub fn skip_initial_reap(mut self, skip_initial_reap: bool) -> Self {
325        self.options.skip_initial_reap = skip_initial_reap;
326        self
327    }
328
329    /// Sets the options for the journal.
330    pub fn journal_options(mut self, journal_options: JournalOptions) -> Self {
331        self.journal_options = journal_options;
332        self
333    }
334
335    /// Sets a method to be called immediately after creating the allocator.
336    pub fn on_new_allocator(
337        mut self,
338        on_new_allocator: impl Fn(Arc<Allocator>) + Send + Sync + 'static,
339    ) -> Self {
340        self.on_new_allocator = Some(Box::new(on_new_allocator));
341        self
342    }
343
344    /// Sets a method to be called each time a new store is registered with `ObjectManager`.
345    pub fn on_new_store(
346        mut self,
347        on_new_store: impl Fn(&ObjectStore) + Send + Sync + 'static,
348    ) -> Self {
349        self.on_new_store = Some(Box::new(on_new_store));
350        self
351    }
352
353    /// Enables or disables running fsck after every transaction. Defaults to `false`.
354    pub fn fsck_after_every_transaction(mut self, fsck_after_every_transaction: bool) -> Self {
355        self.fsck_after_every_transaction = fsck_after_every_transaction;
356        self
357    }
358
359    pub fn trim_config(
360        mut self,
361        delay_and_interval: Option<(std::time::Duration, std::time::Duration)>,
362    ) -> Self {
363        self.options.trim_config = delay_and_interval;
364        self
365    }
366
367    /// Enables or disables inline encryption. Defaults to `false`.
368    pub fn inline_crypto_enabled(mut self, inline_crypto_enabled: bool) -> Self {
369        self.options.inline_crypto_enabled = inline_crypto_enabled;
370        self
371    }
372
373    /// Enables or disables barriers in both the filesystem and journal options.
374    /// Defaults to `false`.
375    pub fn barriers_enabled(mut self, barriers_enabled: bool) -> Self {
376        self.options.barriers_enabled = barriers_enabled;
377        self.journal_options.barriers_enabled = barriers_enabled;
378        self
379    }
380
381    /// Constructs an `FxFilesystem` object with the specified settings.
382    pub async fn open(self, device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
383        let read_only = self.options.read_only;
384        if self.format && read_only {
385            bail!("Cannot initialize a filesystem as read-only");
386        }
387
388        // Inline encryption requires barriers to be enabled.
389        if self.options.inline_crypto_enabled && !self.options.barriers_enabled {
390            bail!("A filesystem using inline encryption requires barriers");
391        }
392
393        let objects = Arc::new(ObjectManager::new(self.on_new_store));
394        let journal = Arc::new(Journal::new(objects.clone(), self.journal_options));
395
396        let image_builder_mode = self.options.image_builder_mode;
397
398        let block_size = std::cmp::max(device.block_size().into(), MIN_BLOCK_SIZE);
399        assert_eq!(block_size % MIN_BLOCK_SIZE, 0);
400        assert!(block_size <= MAX_BLOCK_SIZE, "Max supported block size is 64KiB");
401
402        let mut fsck_after_every_transaction = None;
403        let mut filesystem_options = self.options;
404        if self.fsck_after_every_transaction {
405            let instance =
406                FsckAfterEveryTransaction::new(filesystem_options.post_commit_hook.take());
407            fsck_after_every_transaction = Some(instance.clone());
408            filesystem_options.post_commit_hook =
409                Some(Box::new(move || Box::pin(instance.clone().run())));
410        }
411
412        if !read_only && !self.format {
413            // See comment in JournalRecord::DidFlushDevice for why we need to flush the device
414            // before replay.
415            device.flush().await.context("Device flush failed")?;
416        }
417
418        let filesystem = Arc::new_cyclic(|weak: &Weak<FxFilesystem>| {
419            let weak = weak.clone();
420            FxFilesystem {
421                device,
422                block_size,
423                objects: objects.clone(),
424                journal,
425                commit_mutex: futures::lock::Mutex::new(()),
426                lock_manager: LockManager::new(),
427                flush_task: Mutex::new(None),
428                trim_task: Mutex::new(None),
429                clean_transfer_buffer_task: Mutex::new(None),
430                closed: AtomicBool::new(true),
431                shutdown_event: Event::new(),
432                trace: self.trace,
433                graveyard: Graveyard::new(objects.clone()),
434                completed_transactions: metrics::detail().create_uint("completed_transactions", 0),
435                options: filesystem_options,
436                in_flight_transactions: AtomicU64::new(0),
437                transaction_limit_event: Event::new(),
438                _stores_node: metrics::register_fs(move || {
439                    let weak = weak.clone();
440                    Box::pin(async move {
441                        if let Some(fs) = weak.upgrade() {
442                            fs.populate_stores_node().await
443                        } else {
444                            Err(anyhow!("Filesystem has been dropped"))
445                        }
446                    })
447                }),
448            }
449        });
450
451        filesystem.journal().set_image_builder_mode(image_builder_mode);
452
453        filesystem.journal.set_trace(self.trace);
454        if self.format {
455            filesystem.journal.init_empty(filesystem.clone()).await?;
456            if image_builder_mode.is_none() {
457                // The filesystem isn't valid until superblocks are written but we want to defer
458                // that until last when migrating filesystems or building system images.
459                filesystem.journal.init_superblocks().await?;
460
461                // Start the graveyard's background reaping task.
462                filesystem.graveyard.clone().reap_async();
463            }
464
465            // Create the root volume directory.
466            let root_store = filesystem.root_store();
467            root_store.set_trace(self.trace);
468            let root_directory =
469                Directory::open(&root_store, root_store.root_directory_object_id())
470                    .await
471                    .context("Unable to open root volume directory")?;
472            let mut transaction = filesystem
473                .clone()
474                .new_transaction(
475                    lock_keys![LockKey::object(
476                        root_store.store_object_id(),
477                        root_directory.object_id()
478                    )],
479                    transaction::Options::default(),
480                )
481                .await?;
482            let volume_directory =
483                root_directory.create_child_dir(&mut transaction, VOLUMES_DIRECTORY).await?;
484            transaction.commit().await?;
485            objects.set_volume_directory(volume_directory);
486        } else {
487            filesystem
488                .journal
489                .replay(filesystem.clone(), self.on_new_allocator)
490                .await
491                .context("Journal replay failed")?;
492            filesystem.root_store().set_trace(self.trace);
493
494            if !read_only {
495                // Queue all purged entries for tombstoning.  Don't start the reaper yet because
496                // that can trigger a flush which can add more entries to the graveyard which might
497                // get caught in the initial reap and cause objects to be prematurely tombstoned.
498                for store in objects.unlocked_stores() {
499                    filesystem.graveyard.initial_reap(&store).await?;
500                }
501            }
502        }
503
504        // This must be after we've formatted the filesystem; it will fail during format otherwise.
505        if let Some(fsck_after_every_transaction) = fsck_after_every_transaction {
506            fsck_after_every_transaction
507                .fs
508                .set(Arc::downgrade(&filesystem))
509                .unwrap_or_else(|_| unreachable!());
510        }
511
512        filesystem.closed.store(false, Ordering::SeqCst);
513
514        if !read_only && image_builder_mode.is_none() {
515            // Start the background tasks.
516            filesystem.graveyard.clone().reap_async();
517
518            if let Some((delay, interval)) = filesystem.options.trim_config.clone() {
519                filesystem.start_trim_task(delay, interval);
520            }
521            filesystem.start_clean_transfer_buffer_task();
522        }
523
524        Ok(filesystem.into())
525    }
526}
527
528pub struct FxFilesystem {
529    block_size: u64,
530    objects: Arc<ObjectManager>,
531    journal: Arc<Journal>,
532    commit_mutex: futures::lock::Mutex<()>,
533    lock_manager: LockManager,
534    flush_task: Mutex<Option<fasync::Task<()>>>,
535    trim_task: Mutex<Option<fasync::Task<()>>>,
536    clean_transfer_buffer_task: Mutex<Option<fasync::Task<()>>>,
537    closed: AtomicBool,
538    // An event that is signalled when the filesystem starts to shut down.
539    shutdown_event: Event,
540    trace: bool,
541    graveyard: Arc<Graveyard>,
542    completed_transactions: UintProperty,
543    options: Options,
544
545    // The number of in-flight transactions which we will limit to MAX_IN_FLIGHT_TRANSACTIONS.
546    in_flight_transactions: AtomicU64,
547
548    // An event that is used to wake up tasks that are blocked due to the in-flight transaction
549    // limit.
550    transaction_limit_event: Event,
551
552    // NOTE: This *must* go last so that when users take the device from a closed filesystem, the
553    // filesystem has dropped all other members first (Rust drops members in declaration order).
554    device: DeviceHolder,
555
556    // The "stores" node in the Inspect tree.
557    _stores_node: LazyNode,
558}
559
560#[fxfs_trace::trace]
561impl FxFilesystem {
562    pub async fn new_empty(device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
563        FxFilesystemBuilder::new().format(true).open(device).await
564    }
565
566    pub async fn open(device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
567        FxFilesystemBuilder::new().open(device).await
568    }
569
570    pub fn root_parent_store(&self) -> Arc<ObjectStore> {
571        self.objects.root_parent_store()
572    }
573
574    pub async fn close(&self) -> Result<(), Error> {
575        if self.journal().image_builder_mode().is_some() {
576            self.journal().allocate_journal().await?;
577            self.journal().set_image_builder_mode(None);
578            self.journal().force_compact().await?;
579        }
580        assert_eq!(self.closed.swap(true, Ordering::SeqCst), false);
581        self.shutdown_event.notify(usize::MAX);
582        debug_assert_not_too_long!(self.graveyard.wait_for_reap());
583        let trim_task = self.trim_task.lock().take();
584        if let Some(task) = trim_task {
585            debug_assert_not_too_long!(task);
586        }
587        let clean_transfer_buffer_task = self.clean_transfer_buffer_task.lock().take();
588        if let Some(task) = clean_transfer_buffer_task {
589            debug_assert_not_too_long!(task);
590        }
591        self.journal.stop_compactions().await;
592        let sync_status =
593            if self.journal().image_builder_mode().is_some() || self.options().read_only {
594                Ok(None)
595            } else {
596                self.journal.sync(SyncOptions { flush_device: true, ..Default::default() }).await
597            };
598        match &sync_status {
599            Ok(None) => {}
600            Ok(checkpoint) => info!(
601                "Filesystem closed (checkpoint={}, metadata_reservation={:?}, \
602                 reservation_required={}, borrowed={})",
603                checkpoint.as_ref().unwrap().0.file_offset,
604                self.object_manager().metadata_reservation(),
605                self.object_manager().required_reservation(),
606                self.object_manager().borrowed_metadata_space(),
607            ),
608            Err(e) => error!(error:? = e; "Failed to sync filesystem; data may be lost"),
609        }
610        self.journal.terminate();
611        let flush_task = self.flush_task.lock().take();
612        if let Some(task) = flush_task {
613            debug_assert_not_too_long!(task);
614        }
615        // Regardless of whether sync succeeds, we should close the device, since otherwise we will
616        // crash instead of exiting gracefully.
617        self.device().close().await.context("Failed to close device")?;
618        sync_status.map(|_| ())
619    }
620
621    pub fn device(&self) -> Arc<dyn Device> {
622        Arc::clone(&self.device)
623    }
624
625    pub fn root_store(&self) -> Arc<ObjectStore> {
626        self.objects.root_store()
627    }
628
629    pub fn allocator(&self) -> Arc<Allocator> {
630        self.objects.allocator()
631    }
632
633    /// Enables allocations for the allocator.
634    /// This is only used in image_builder_mode where it *must*
635    /// be called before any allocations can take place.
636    pub fn enable_allocations(&self) {
637        self.allocator().enable_allocations();
638    }
639
640    pub fn object_manager(&self) -> &Arc<ObjectManager> {
641        &self.objects
642    }
643
644    pub fn journal(&self) -> &Arc<Journal> {
645        &self.journal
646    }
647
648    pub async fn sync(&self, options: SyncOptions<'_>) -> Result<(), Error> {
649        self.journal.sync(options).await.map(|_| ())
650    }
651
652    pub fn block_size(&self) -> u64 {
653        self.block_size
654    }
655
656    pub fn get_info(&self) -> Info {
657        Info {
658            total_bytes: self.device.size(),
659            used_bytes: self.object_manager().allocator().get_used_bytes().0,
660        }
661    }
662
663    pub fn super_block_header(&self) -> SuperBlockHeader {
664        self.journal.super_block_header()
665    }
666
667    pub fn graveyard(&self) -> &Arc<Graveyard> {
668        &self.graveyard
669    }
670
671    pub fn trace(&self) -> bool {
672        self.trace
673    }
674
675    pub fn options(&self) -> &Options {
676        &self.options
677    }
678
679    /// Returns a guard that must be taken before any transaction can commence.  This guard takes a
680    /// shared lock on the filesystem.  `fsck` will take an exclusive lock so that it can get a
681    /// consistent picture of the filesystem that it can verify.  It is important that this lock is
682    /// acquired before *all* other locks.  It is also important that this lock is not taken twice
683    /// by the same task since that can lead to deadlocks if another task tries to take a write
684    /// lock.
685    pub async fn txn_guard(self: Arc<Self>) -> TxnGuard<'static> {
686        TxnGuard::Owned(
687            self.lock_manager
688                .read_lock(lock_keys!(LockKey::Filesystem))
689                .await
690                .into_owned(self.clone()),
691        )
692    }
693
694    pub async fn new_transaction<'a>(
695        self: Arc<Self>,
696        locks: LockKeys,
697        options: transaction::Options<'a>,
698    ) -> Result<Transaction<'a>, Error> {
699        let guard = if let Some(guard) = options.txn_guard.as_ref() {
700            TxnGuard::Borrowed(guard)
701        } else {
702            self.txn_guard().await
703        };
704        Transaction::new(guard, options, locks).await
705    }
706
707    #[trace]
708    pub async fn commit_transaction(
709        &self,
710        transaction: &mut Transaction<'_>,
711        callback: &mut (dyn FnMut(u64) + Send),
712    ) -> Result<u64, Error> {
713        if let Some(hook) = self.options.pre_commit_hook.as_ref() {
714            hook(transaction)?;
715        }
716        debug_assert_not_too_long!(self.lock_manager.commit_prepare(&transaction));
717        self.maybe_start_flush_task();
718        let _guard = debug_assert_not_too_long!(self.commit_mutex.lock());
719        let journal_offset = if self.journal().image_builder_mode().is_some() {
720            let journal_checkpoint =
721                JournalCheckpoint { file_offset: 0, checksum: 0, version: LATEST_VERSION };
722            let maybe_mutation = self
723                .object_manager()
724                .apply_transaction(transaction, &journal_checkpoint)
725                .expect("Transactions must not fail in image_builder_mode");
726            if let Some(mutation) = maybe_mutation {
727                assert!(matches!(mutation, Mutation::UpdateBorrowed(_)));
728                // These are Mutation::UpdateBorrowed which are normally used to track borrowing of
729                // metadata reservations. As we are image-building and not using the journal,
730                // we don't track this.
731            }
732            self.object_manager().did_commit_transaction(transaction, &journal_checkpoint, 0);
733            0
734        } else {
735            self.journal.commit(transaction).await?
736        };
737        self.completed_transactions.add(1);
738
739        // For now, call the callback whilst holding the lock.  Technically, we don't need to do
740        // that except if there's a post-commit-hook (which there usually won't be).  We can
741        // consider changing this if we need to for performance, but we'd need to double check that
742        // callers don't depend on this.
743        callback(journal_offset);
744
745        if let Some(hook) = self.options.post_commit_hook.as_ref() {
746            hook().await;
747        }
748
749        Ok(journal_offset)
750    }
751
752    pub fn lock_manager(&self) -> &LockManager {
753        &self.lock_manager
754    }
755
756    pub(crate) fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
757        if !matches!(transaction.metadata_reservation, MetadataReservation::None) {
758            self.sub_transaction();
759        }
760        // If we placed a hold for metadata space, return it now.
761        if let MetadataReservation::Hold(hold_amount) =
762            std::mem::replace(&mut transaction.metadata_reservation, MetadataReservation::None)
763        {
764            let hold = transaction
765                .allocator_reservation
766                .unwrap()
767                .reserve(0)
768                .expect("Zero should always succeed.");
769            hold.add(hold_amount);
770        }
771        self.objects.drop_transaction(transaction);
772        self.lock_manager.drop_transaction(transaction);
773    }
774
775    fn maybe_start_flush_task(&self) {
776        if self.journal.image_builder_mode().is_some() {
777            return;
778        }
779        let mut flush_task = self.flush_task.lock();
780        if flush_task.is_none() {
781            let journal = self.journal.clone();
782            *flush_task = Some(fasync::Task::spawn(
783                journal.flush_task().trace(trace_future_args!("Journal::flush_task")),
784            ));
785        }
786    }
787
788    // Returns the number of bytes trimmed.
789    async fn do_trim(&self) -> Result<usize, Error> {
790        const MAX_EXTENTS_PER_BATCH: usize = 8;
791        const MAX_EXTENT_SIZE: usize = 256 * 1024;
792        let mut offset = 0;
793        let mut bytes_trimmed = 0;
794        loop {
795            if self.closed.load(Ordering::Relaxed) {
796                info!("Filesystem is closed, nothing to trim");
797                return Ok(bytes_trimmed);
798            }
799            let allocator = self.allocator();
800            let trimmable_extents =
801                allocator.take_for_trimming(offset, MAX_EXTENT_SIZE, MAX_EXTENTS_PER_BATCH).await?;
802            for device_range in trimmable_extents.extents() {
803                self.device.trim(device_range.clone()).await?;
804                bytes_trimmed += device_range.length()? as usize;
805            }
806            if let Some(device_range) = trimmable_extents.extents().last() {
807                offset = device_range.end;
808            } else {
809                break;
810            }
811        }
812        Ok(bytes_trimmed)
813    }
814
815    fn start_trim_task(
816        self: &Arc<Self>,
817        delay: std::time::Duration,
818        interval: std::time::Duration,
819    ) {
820        if !self.device.supports_trim() {
821            info!("Device does not support trim; not scheduling trimming");
822            return;
823        }
824        let this = self.clone();
825        let mut next_timer = delay;
826        *self.trim_task.lock() = Some(fasync::Task::spawn(
827            async move {
828                loop {
829                    let shutdown_listener = this.shutdown_event.listen();
830                    // Note that we need to check if the filesystem was closed after we start
831                    // listening to the shutdown event, but before we start waiting on `timer`,
832                    // because otherwise we might start listening on `shutdown_event` *after* the
833                    // event was signaled, and so `shutdown_listener` will never fire, and this
834                    // task will get stuck until `timer` expires.
835                    if this.closed.load(Ordering::SeqCst) {
836                        return;
837                    }
838                    futures::select!(
839                        () = fasync::Timer::new(next_timer.clone()).fuse() => {},
840                        () = shutdown_listener.fuse() => return,
841                    );
842                    let start_time = std::time::Instant::now();
843                    let res = this.do_trim().await;
844                    let duration = std::time::Instant::now() - start_time;
845                    next_timer = interval.clone();
846                    match res {
847                        Ok(bytes_trimmed) => info!(
848                            "Trimmed {bytes_trimmed} bytes in {duration:?}.  Next trim in \
849                            {next_timer:?}",
850                        ),
851                        Err(e) => error!(e:?; "Failed to trim"),
852                    }
853                }
854            }
855            .trace(trace_future_args!("Filesystem::trim_task")),
856        ));
857    }
858
859    fn start_clean_transfer_buffer_task(self: &Arc<Self>) {
860        let this = self.clone();
861        *self.clean_transfer_buffer_task.lock() = Some(fasync::Task::spawn(
862            async move {
863                loop {
864                    let shutdown_listener = this.shutdown_event.listen();
865                    if this.closed.load(Ordering::SeqCst) {
866                        return;
867                    }
868                    futures::select!(
869                        () = fasync::Timer::new(CLEAN_TRANSFER_BUFFER_INTERVAL).fuse() => {},
870                        () = shutdown_listener.fuse() => return,
871                    );
872                    this.device().clean_transfer_buffer();
873                }
874            }
875            .trace(trace_future_args!("Filesystem::clean_transfer_buffer_task")),
876        ));
877    }
878
879    pub(crate) async fn reservation_for_transaction<'a>(
880        self: &Arc<Self>,
881        options: transaction::Options<'a>,
882    ) -> Result<(MetadataReservation, Option<&'a Reservation>, Option<Hold<'a>>), Error> {
883        if self.options.image_builder_mode.is_some() {
884            // Image builder mode avoids the journal so reservation tracking for metadata overheads
885            // doesn't make sense and so we essentially have 'all or nothing' semantics instead.
886            return Ok((MetadataReservation::Borrowed, None, None));
887        }
888        if !options.skip_journal_checks {
889            self.maybe_start_flush_task();
890            self.journal.check_journal_space().await?;
891        }
892
893        // We support three options for metadata space reservation:
894        //
895        //   1. We can borrow from the filesystem's metadata reservation.  This should only be
896        //      be used on the understanding that eventually, potentially after a full compaction,
897        //      there should be no net increase in space used.  For example, unlinking an object
898        //      should eventually decrease the amount of space used and setting most attributes
899        //      should not result in any change.
900        //
901        //   2. A reservation is provided in which case we'll place a hold on some of it for
902        //      metadata.
903        //
904        //   3. No reservation is supplied, so we try and reserve space with the allocator now,
905        //      and will return NoSpace if that fails.
906        let mut hold = None;
907        let metadata_reservation = if options.borrow_metadata_space {
908            MetadataReservation::Borrowed
909        } else {
910            match options.allocator_reservation {
911                Some(reservation) => {
912                    hold = Some(
913                        reservation
914                            .reserve(TRANSACTION_METADATA_MAX_AMOUNT)
915                            .ok_or(FxfsError::NoSpace)?,
916                    );
917                    MetadataReservation::Hold(TRANSACTION_METADATA_MAX_AMOUNT)
918                }
919                None => {
920                    let reservation = self
921                        .allocator()
922                        .reserve(None, TRANSACTION_METADATA_MAX_AMOUNT)
923                        .ok_or(FxfsError::NoSpace)?;
924                    MetadataReservation::Reservation(reservation)
925                }
926            }
927        };
928        Ok((metadata_reservation, options.allocator_reservation, hold))
929    }
930
931    pub(crate) async fn add_transaction(&self, skip_journal_checks: bool) {
932        if skip_journal_checks {
933            self.in_flight_transactions.fetch_add(1, Ordering::Relaxed);
934        } else {
935            let inc = || {
936                let mut in_flights = self.in_flight_transactions.load(Ordering::Relaxed);
937                while in_flights < MAX_IN_FLIGHT_TRANSACTIONS {
938                    match self.in_flight_transactions.compare_exchange_weak(
939                        in_flights,
940                        in_flights + 1,
941                        Ordering::Relaxed,
942                        Ordering::Relaxed,
943                    ) {
944                        Ok(_) => return true,
945                        Err(x) => in_flights = x,
946                    }
947                }
948                return false;
949            };
950            while !inc() {
951                let listener = self.transaction_limit_event.listen();
952                if inc() {
953                    break;
954                }
955                listener.await;
956            }
957        }
958    }
959
960    pub(crate) fn sub_transaction(&self) {
961        let old = self.in_flight_transactions.fetch_sub(1, Ordering::Relaxed);
962        assert!(old != 0);
963        if old <= MAX_IN_FLIGHT_TRANSACTIONS {
964            self.transaction_limit_event.notify(usize::MAX);
965        }
966    }
967
968    pub async fn truncate_guard(&self, store_id: u64, object_id: u64) -> TruncateGuard<'_> {
969        let keys = lock_keys![LockKey::truncate(store_id, object_id,)];
970        TruncateGuard(self.lock_manager().write_lock(keys).await)
971    }
972
973    async fn populate_stores_node(&self) -> Result<Inspector, Error> {
974        let inspector = fuchsia_inspect::Inspector::default();
975        let root = inspector.root();
976        root.record_child("__root", |n| self.root_store().record_data(n));
977        root.record_child("__root_parent", |n| self.root_parent_store().record_data(n));
978        let object_manager = self.object_manager();
979        let volume_directory = object_manager.volume_directory();
980        let layer_set = volume_directory.store().tree().layer_set();
981        let mut merger = layer_set.merger();
982        let mut iter = volume_directory.iter(&mut merger).await?;
983        while let Some((name, id, _)) = iter.get() {
984            if let Some(store) = object_manager.store(id) {
985                root.record_child(name.to_string(), |n| store.record_data(n));
986            }
987            iter.advance().await?;
988        }
989        Ok(inspector)
990    }
991}
992
993pub enum TxnGuard<'a> {
994    Borrowed(&'a TxnGuard<'a>),
995    Owned(ReadGuard<'static>),
996}
997
998impl TxnGuard<'_> {
999    pub fn fs(&self) -> &Arc<FxFilesystem> {
1000        match self {
1001            TxnGuard::Borrowed(b) => b.fs(),
1002            TxnGuard::Owned(o) => o.fs().unwrap(),
1003        }
1004    }
1005}
1006
1007/// A wrapper around a guard that needs to be taken when truncating an object.
1008#[allow(dead_code)]
1009pub struct TruncateGuard<'a>(WriteGuard<'a>);
1010
1011/// Helper method for making a new filesystem.
1012pub async fn mkfs(device: DeviceHolder) -> Result<DeviceHolder, Error> {
1013    let fs = FxFilesystem::new_empty(device).await?;
1014    fs.close().await?;
1015    Ok(fs.take_device().await)
1016}
1017
1018/// Helper method for making a new filesystem with a single named volume.
1019/// This shouldn't be used in production; instead volumes should be created with the Volumes
1020/// protocol.
1021pub async fn mkfs_with_volume(
1022    device: DeviceHolder,
1023    volume_name: &str,
1024    crypt: Option<Arc<dyn Crypt>>,
1025) -> Result<DeviceHolder, Error> {
1026    let fs = FxFilesystem::new_empty(device).await?;
1027    {
1028        // expect instead of propagating errors here, since otherwise we could drop |fs| before
1029        // close is called, which leads to confusing and unrelated error messages.
1030        let root_volume = root_volume(fs.clone()).await.expect("Open root_volume failed");
1031        root_volume
1032            .new_volume(
1033                volume_name,
1034                NewChildStoreOptions {
1035                    options: StoreOptions { crypt, ..StoreOptions::default() },
1036                    ..Default::default()
1037                },
1038            )
1039            .await
1040            .expect("Create volume failed");
1041    }
1042    fs.close().await?;
1043    Ok(fs.take_device().await)
1044}
1045
1046struct FsckAfterEveryTransaction {
1047    fs: OnceLock<Weak<FxFilesystem>>,
1048    old_hook: PostCommitHook,
1049}
1050
1051impl FsckAfterEveryTransaction {
1052    fn new(old_hook: PostCommitHook) -> Arc<Self> {
1053        Arc::new(Self { fs: OnceLock::new(), old_hook })
1054    }
1055
1056    async fn run(self: Arc<Self>) {
1057        if let Some(fs) = self.fs.get().and_then(Weak::upgrade) {
1058            let options = FsckOptions {
1059                fail_on_warning: true,
1060                no_lock: true,
1061                quiet: true,
1062                ..Default::default()
1063            };
1064            fsck_with_options(fs.clone(), &options).await.expect("fsck failed");
1065            let object_manager = fs.object_manager();
1066            for store in object_manager.unlocked_stores() {
1067                let store_id = store.store_object_id();
1068                if !object_manager.is_system_store(store_id) {
1069                    fsck_volume_with_options(fs.as_ref(), &options, store_id, None)
1070                        .await
1071                        .expect("fsck_volume_with_options failed");
1072                }
1073            }
1074        }
1075        if let Some(old_hook) = self.old_hook.as_ref() {
1076            old_hook().await;
1077        }
1078    }
1079}
1080
1081#[cfg(test)]
1082mod tests {
1083    use super::{FxFilesystem, FxFilesystemBuilder, FxfsError, SyncOptions};
1084    use crate::fsck::{fsck, fsck_volume};
1085    use crate::log::*;
1086    use crate::lsm_tree::Operation;
1087    use crate::lsm_tree::types::Item;
1088    use crate::object_handle::{
1089        INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle, WriteObjectHandle,
1090    };
1091    use crate::object_store::directory::{Directory, replace_child};
1092    use crate::object_store::journal::JournalOptions;
1093    use crate::object_store::journal::super_block::SuperBlockInstance;
1094    use crate::object_store::transaction::{LockKey, Options, lock_keys};
1095    use crate::object_store::volume::root_volume;
1096    use crate::object_store::{
1097        HandleOptions, NewChildStoreOptions, ObjectDescriptor, ObjectStore, StoreOptions,
1098    };
1099    use crate::range::RangeExt;
1100    use fuchsia_async as fasync;
1101    use fuchsia_sync::Mutex;
1102    use futures::future::join_all;
1103    use futures::stream::{FuturesUnordered, TryStreamExt};
1104    use fxfs_insecure_crypto::new_insecure_crypt;
1105    use rustc_hash::FxHashMap as HashMap;
1106    use std::ops::Range;
1107    use std::sync::Arc;
1108    use std::sync::atomic::{self, AtomicU32};
1109    use std::time::Duration;
1110    use storage_device::DeviceHolder;
1111    use storage_device::fake_device::{self, FakeDevice};
1112    use test_case::test_case;
1113
1114    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
1115
1116    #[fuchsia::test(threads = 10)]
1117    async fn test_compaction() {
1118        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1119
1120        // If compaction is not working correctly, this test will run out of space.
1121        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1122        let root_store = fs.root_store();
1123        let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1124            .await
1125            .expect("open failed");
1126
1127        let mut tasks = Vec::new();
1128        for i in 0..2 {
1129            let mut transaction = fs
1130                .clone()
1131                .new_transaction(
1132                    lock_keys![LockKey::object(
1133                        root_store.store_object_id(),
1134                        root_directory.object_id()
1135                    )],
1136                    Options::default(),
1137                )
1138                .await
1139                .expect("new_transaction failed");
1140            let handle = root_directory
1141                .create_child_file(&mut transaction, &format!("{}", i))
1142                .await
1143                .expect("create_child_file failed");
1144            transaction.commit().await.expect("commit failed");
1145            tasks.push(fasync::Task::spawn(async move {
1146                const TEST_DATA: &[u8] = b"hello";
1147                let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
1148                buf.as_mut_slice().copy_from_slice(TEST_DATA);
1149                for _ in 0..1500 {
1150                    handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1151                }
1152            }));
1153        }
1154        join_all(tasks).await;
1155        fs.sync(SyncOptions::default()).await.expect("sync failed");
1156
1157        fsck(fs.clone()).await.expect("fsck failed");
1158        fs.close().await.expect("Close failed");
1159    }
1160
1161    #[fuchsia::test]
1162    async fn test_enable_allocations() {
1163        // 1. enable_allocations() has no impact if image_builder_mode is not used.
1164        {
1165            let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1166            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1167            fs.enable_allocations();
1168            let root_store = fs.root_store();
1169            let root_directory =
1170                Directory::open(&root_store, root_store.root_directory_object_id())
1171                    .await
1172                    .expect("open failed");
1173            let mut transaction = fs
1174                .clone()
1175                .new_transaction(
1176                    lock_keys![LockKey::object(
1177                        root_store.store_object_id(),
1178                        root_directory.object_id()
1179                    )],
1180                    Options::default(),
1181                )
1182                .await
1183                .expect("new_transaction failed");
1184            root_directory
1185                .create_child_file(&mut transaction, "test")
1186                .await
1187                .expect("create_child_file failed");
1188            transaction.commit().await.expect("commit failed");
1189            fs.close().await.expect("close failed");
1190        }
1191
1192        // 2. Allocations blow up if done before this call (in image_builder_mode), but work after
1193        {
1194            let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1195            let fs = FxFilesystemBuilder::new()
1196                .format(true)
1197                .image_builder_mode(Some(SuperBlockInstance::A))
1198                .open(device)
1199                .await
1200                .expect("open failed");
1201            let root_store = fs.root_store();
1202            let root_directory =
1203                Directory::open(&root_store, root_store.root_directory_object_id())
1204                    .await
1205                    .expect("open failed");
1206
1207            let mut transaction = fs
1208                .clone()
1209                .new_transaction(
1210                    lock_keys![LockKey::object(
1211                        root_store.store_object_id(),
1212                        root_directory.object_id()
1213                    )],
1214                    Options::default(),
1215                )
1216                .await
1217                .expect("new_transaction failed");
1218            let handle = root_directory
1219                .create_child_file(&mut transaction, "test_fail")
1220                .await
1221                .expect("create_child_file failed");
1222            transaction.commit().await.expect("commit failed");
1223
1224            // Allocations should fail before enable_allocations()
1225            assert!(
1226                FxfsError::Unavailable
1227                    .matches(&handle.allocate(0..4096).await.expect_err("allocate should fail"))
1228            );
1229
1230            // Allocations should work after enable_allocations()
1231            fs.enable_allocations();
1232            handle.allocate(0..4096).await.expect("allocate should work after enable_allocations");
1233
1234            // 3. finalize() works regardless of whether enable_allocations() is called.
1235            // (We already called it above, so this verifies it works after it was called).
1236
1237            fs.close().await.expect("close failed");
1238        }
1239        // TODO(https://fxbug.dev/467401079): Add a failure test where we close without
1240        // enabling allocations. (Trivial to do, but causes error logs, which are interpreted as
1241        // test failures and only seem controllable at the BUILD target level).
1242    }
1243
1244    #[fuchsia::test(threads = 10)]
1245    async fn test_replay_is_identical() {
1246        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1247        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1248
1249        // Reopen the store, but set reclaim size to a very large value which will effectively
1250        // stop the journal from flushing and allows us to track all the mutations to the store.
1251        fs.close().await.expect("close failed");
1252        let device = fs.take_device().await;
1253        device.reopen(false);
1254
1255        struct Mutations<K, V>(Mutex<Vec<(Operation, Item<K, V>)>>);
1256
1257        impl<K: Clone, V: Clone> Mutations<K, V> {
1258            fn new() -> Self {
1259                Mutations(Mutex::new(Vec::new()))
1260            }
1261
1262            fn push(&self, operation: Operation, item: &Item<K, V>) {
1263                self.0.lock().push((operation, item.clone()));
1264            }
1265        }
1266
1267        let open_fs = |device,
1268                       object_mutations: Arc<Mutex<HashMap<_, _>>>,
1269                       allocator_mutations: Arc<Mutations<_, _>>| async {
1270            FxFilesystemBuilder::new()
1271                .journal_options(JournalOptions { reclaim_size: u64::MAX, ..Default::default() })
1272                .on_new_allocator(move |allocator| {
1273                    let allocator_mutations = allocator_mutations.clone();
1274                    allocator.tree().set_mutation_callback(Some(Box::new(move |op, item| {
1275                        allocator_mutations.push(op, item)
1276                    })));
1277                })
1278                .on_new_store(move |store| {
1279                    let mutations = Arc::new(Mutations::new());
1280                    object_mutations.lock().insert(store.store_object_id(), mutations.clone());
1281                    store.tree().set_mutation_callback(Some(Box::new(move |op, item| {
1282                        mutations.push(op, item)
1283                    })));
1284                })
1285                .open(device)
1286                .await
1287                .expect("open failed")
1288        };
1289
1290        let allocator_mutations = Arc::new(Mutations::new());
1291        let object_mutations = Arc::new(Mutex::new(HashMap::default()));
1292        let fs = open_fs(device, object_mutations.clone(), allocator_mutations.clone()).await;
1293
1294        let root_store = fs.root_store();
1295        let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1296            .await
1297            .expect("open failed");
1298
1299        let mut transaction = fs
1300            .clone()
1301            .new_transaction(
1302                lock_keys![LockKey::object(
1303                    root_store.store_object_id(),
1304                    root_directory.object_id()
1305                )],
1306                Options::default(),
1307            )
1308            .await
1309            .expect("new_transaction failed");
1310        let object = root_directory
1311            .create_child_file(&mut transaction, "test")
1312            .await
1313            .expect("create_child_file failed");
1314        transaction.commit().await.expect("commit failed");
1315
1316        // Append some data.
1317        let buf = object.allocate_buffer(10000).await;
1318        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1319
1320        // Overwrite some data.
1321        object.write_or_append(Some(5000), buf.as_ref()).await.expect("write failed");
1322
1323        // Truncate.
1324        object.truncate(3000).await.expect("truncate failed");
1325
1326        // Delete the object.
1327        let mut transaction = fs
1328            .clone()
1329            .new_transaction(
1330                lock_keys![
1331                    LockKey::object(root_store.store_object_id(), root_directory.object_id()),
1332                    LockKey::object(root_store.store_object_id(), object.object_id()),
1333                ],
1334                Options::default(),
1335            )
1336            .await
1337            .expect("new_transaction failed");
1338
1339        replace_child(&mut transaction, None, (&root_directory, "test"))
1340            .await
1341            .expect("replace_child failed");
1342
1343        transaction.commit().await.expect("commit failed");
1344
1345        // Finally tombstone the object.
1346        root_store
1347            .tombstone_object(object.object_id(), Options::default())
1348            .await
1349            .expect("tombstone failed");
1350
1351        // Now reopen and check that replay produces the same set of mutations.
1352        fs.close().await.expect("close failed");
1353
1354        let metadata_reservation_amount = fs.object_manager().metadata_reservation().amount();
1355
1356        let device = fs.take_device().await;
1357        device.reopen(false);
1358
1359        let replayed_object_mutations = Arc::new(Mutex::new(HashMap::default()));
1360        let replayed_allocator_mutations = Arc::new(Mutations::new());
1361        let fs = open_fs(
1362            device,
1363            replayed_object_mutations.clone(),
1364            replayed_allocator_mutations.clone(),
1365        )
1366        .await;
1367
1368        let m1 = object_mutations.lock();
1369        let m2 = replayed_object_mutations.lock();
1370        assert_eq!(m1.len(), m2.len());
1371        for (store_id, mutations) in &*m1 {
1372            let mutations = mutations.0.lock();
1373            let replayed = m2.get(&store_id).expect("Found unexpected store").0.lock();
1374            assert_eq!(mutations.len(), replayed.len());
1375            for ((op1, i1), (op2, i2)) in mutations.iter().zip(replayed.iter()) {
1376                assert_eq!(op1, op2);
1377                assert_eq!(i1.key, i2.key);
1378                assert_eq!(i1.value, i2.value);
1379                assert_eq!(i1.sequence, i2.sequence);
1380            }
1381        }
1382
1383        let a1 = allocator_mutations.0.lock();
1384        let a2 = replayed_allocator_mutations.0.lock();
1385        assert_eq!(a1.len(), a2.len());
1386        for ((op1, i1), (op2, i2)) in a1.iter().zip(a2.iter()) {
1387            assert_eq!(op1, op2);
1388            assert_eq!(i1.key, i2.key);
1389            assert_eq!(i1.value, i2.value);
1390            assert_eq!(i1.sequence, i2.sequence);
1391        }
1392
1393        assert_eq!(
1394            fs.object_manager().metadata_reservation().amount(),
1395            metadata_reservation_amount
1396        );
1397    }
1398
1399    #[fuchsia::test]
1400    async fn test_max_in_flight_transactions() {
1401        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1402        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1403
1404        let transactions = FuturesUnordered::new();
1405        for _ in 0..super::MAX_IN_FLIGHT_TRANSACTIONS {
1406            transactions.push(fs.clone().new_transaction(lock_keys![], Options::default()));
1407        }
1408        let mut transactions: Vec<_> = transactions.try_collect().await.unwrap();
1409
1410        // Trying to create another one should be blocked.
1411        let mut fut = std::pin::pin!(fs.clone().new_transaction(lock_keys![], Options::default()));
1412        assert!(futures::poll!(&mut fut).is_pending());
1413
1414        // Dropping one should allow it to proceed.
1415        transactions.pop();
1416
1417        assert!(futures::poll!(&mut fut).is_ready());
1418    }
1419
1420    // If run on a single thread, the trim tasks starve out other work.
1421    #[fuchsia::test(threads = 10)]
1422    async fn test_continuously_trim() {
1423        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1424        let fs = FxFilesystemBuilder::new()
1425            .trim_config(Some((Duration::ZERO, Duration::ZERO)))
1426            .format(true)
1427            .open(device)
1428            .await
1429            .expect("open failed");
1430        // Do a small sleep so trim has time to get going.
1431        fasync::Timer::new(Duration::from_millis(10)).await;
1432
1433        // Create and delete a bunch of files whilst trim is ongoing.  This just ensures that
1434        // regular usage isn't affected by trim.
1435        let root_store = fs.root_store();
1436        let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1437            .await
1438            .expect("open failed");
1439        for _ in 0..100 {
1440            let mut transaction = fs
1441                .clone()
1442                .new_transaction(
1443                    lock_keys![LockKey::object(
1444                        root_store.store_object_id(),
1445                        root_directory.object_id()
1446                    )],
1447                    Options::default(),
1448                )
1449                .await
1450                .expect("new_transaction failed");
1451            let object = root_directory
1452                .create_child_file(&mut transaction, "test")
1453                .await
1454                .expect("create_child_file failed");
1455            transaction.commit().await.expect("commit failed");
1456
1457            {
1458                let buf = object.allocate_buffer(1024).await;
1459                object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1460            }
1461            std::mem::drop(object);
1462
1463            let mut transaction = root_directory
1464                .acquire_context_for_replace(None, "test", true)
1465                .await
1466                .expect("acquire_context_for_replace failed")
1467                .transaction;
1468            replace_child(&mut transaction, None, (&root_directory, "test"))
1469                .await
1470                .expect("replace_child failed");
1471            transaction.commit().await.expect("commit failed");
1472        }
1473        fs.close().await.expect("close failed");
1474    }
1475
1476    #[test_case(true; "test power fail with barriers")]
1477    #[test_case(false; "test power fail with checksums")]
1478    #[fuchsia::test]
1479    async fn test_power_fail(barriers_enabled: bool) {
1480        // This test randomly discards blocks, so we run it a few times to increase the chances
1481        // of catching an issue in a single run.
1482        for _ in 0..10 {
1483            let (store_id, device, test_file_object_id) = {
1484                let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
1485                let fs = if barriers_enabled {
1486                    FxFilesystemBuilder::new()
1487                        .barriers_enabled(true)
1488                        .format(true)
1489                        .open(device)
1490                        .await
1491                        .expect("new filesystem failed")
1492                } else {
1493                    FxFilesystem::new_empty(device).await.expect("new_empty failed")
1494                };
1495                let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
1496
1497                fs.sync(SyncOptions { flush_device: true, ..SyncOptions::default() })
1498                    .await
1499                    .expect("sync failed");
1500
1501                let store = root_volume
1502                    .new_volume(
1503                        "test",
1504                        NewChildStoreOptions {
1505                            options: StoreOptions {
1506                                crypt: Some(Arc::new(new_insecure_crypt())),
1507                                ..StoreOptions::default()
1508                            },
1509                            ..Default::default()
1510                        },
1511                    )
1512                    .await
1513                    .expect("new_volume failed");
1514                let root_directory = Directory::open(&store, store.root_directory_object_id())
1515                    .await
1516                    .expect("open failed");
1517
1518                // Create a number of files with the goal of using up more than one journal block.
1519                async fn create_files(store: &Arc<ObjectStore>, prefix: &str) {
1520                    let fs = store.filesystem();
1521                    let root_directory = Directory::open(store, store.root_directory_object_id())
1522                        .await
1523                        .expect("open failed");
1524                    for i in 0..100 {
1525                        let mut transaction = fs
1526                            .clone()
1527                            .new_transaction(
1528                                lock_keys![LockKey::object(
1529                                    store.store_object_id(),
1530                                    store.root_directory_object_id()
1531                                )],
1532                                Options::default(),
1533                            )
1534                            .await
1535                            .expect("new_transaction failed");
1536                        root_directory
1537                            .create_child_file(&mut transaction, &format!("{prefix} {i}"))
1538                            .await
1539                            .expect("create_child_file failed");
1540                        transaction.commit().await.expect("commit failed");
1541                    }
1542                }
1543
1544                // Create one batch of files.
1545                create_files(&store, "A").await;
1546
1547                // Create a file and write something to it.  This will make sure there's a
1548                // transaction present that includes a checksum.
1549                let mut transaction = fs
1550                    .clone()
1551                    .new_transaction(
1552                        lock_keys![LockKey::object(
1553                            store.store_object_id(),
1554                            store.root_directory_object_id()
1555                        )],
1556                        Options::default(),
1557                    )
1558                    .await
1559                    .expect("new_transaction failed");
1560                let object = root_directory
1561                    .create_child_file(&mut transaction, "test")
1562                    .await
1563                    .expect("create_child_file failed");
1564                transaction.commit().await.expect("commit failed");
1565
1566                let mut transaction =
1567                    object.new_transaction().await.expect("new_transaction failed");
1568                let mut buffer = object.allocate_buffer(4096).await;
1569                buffer.as_mut_slice().fill(0xed);
1570                object
1571                    .txn_write(&mut transaction, 0, buffer.as_ref())
1572                    .await
1573                    .expect("txn_write failed");
1574                transaction.commit().await.expect("commit failed");
1575
1576                // Create another batch of files.
1577                create_files(&store, "B").await;
1578
1579                // Sync the device, but don't flush the device. We want to do this so we can
1580                // randomly discard blocks below.
1581                fs.sync(SyncOptions::default()).await.expect("sync failed");
1582
1583                // When we call `sync` above on the filesystem, it will pad the journal so that it
1584                // will get written, but it doesn't wait for the write to occur.  We wait for a
1585                // short time here to give allow time for the journal to be written.  Adding timers
1586                // isn't great, but this test already isn't deterministic since we randomly discard
1587                // blocks.
1588                fasync::Timer::new(Duration::from_millis(10)).await;
1589
1590                (
1591                    store.store_object_id(),
1592                    fs.device().snapshot().expect("snapshot failed"),
1593                    object.object_id(),
1594                )
1595            };
1596
1597            // Randomly discard blocks since the last flush.  This simulates what might happen in
1598            // the case of power-loss.  This will be an uncontrolled unmount.
1599            device
1600                .discard_random_since_last_flush()
1601                .expect("discard_random_since_last_flush failed");
1602
1603            let fs = FxFilesystem::open(device).await.expect("open failed");
1604            fsck(fs.clone()).await.expect("fsck failed");
1605
1606            let mut check_test_file = false;
1607
1608            // If we replayed and the store exists (i.e. the transaction that created the store
1609            // made it out), start by running fsck on it.
1610            let object_id = if fs.object_manager().store(store_id).is_some() {
1611                fsck_volume(&fs, store_id, Some(Arc::new(new_insecure_crypt())))
1612                    .await
1613                    .expect("fsck_volume failed");
1614
1615                // Now we want to create another file, unmount cleanly, and then finally check that
1616                // the new file exists.  This checks that we can continue to use the filesystem
1617                // after an unclean unmount.
1618                let store = root_volume(fs.clone())
1619                    .await
1620                    .expect("root_volume failed")
1621                    .volume(
1622                        "test",
1623                        StoreOptions {
1624                            crypt: Some(Arc::new(new_insecure_crypt())),
1625                            ..StoreOptions::default()
1626                        },
1627                    )
1628                    .await
1629                    .expect("volume failed");
1630
1631                let root_directory = Directory::open(&store, store.root_directory_object_id())
1632                    .await
1633                    .expect("open failed");
1634
1635                let mut transaction = fs
1636                    .clone()
1637                    .new_transaction(
1638                        lock_keys![LockKey::object(
1639                            store.store_object_id(),
1640                            store.root_directory_object_id()
1641                        )],
1642                        Options::default(),
1643                    )
1644                    .await
1645                    .expect("new_transaction failed");
1646                let object = root_directory
1647                    .create_child_file(&mut transaction, &format!("C"))
1648                    .await
1649                    .expect("create_child_file failed");
1650                transaction.commit().await.expect("commit failed");
1651
1652                // Write again to the test file if it exists.
1653                if let Ok(test_file) = ObjectStore::open_object(
1654                    &store,
1655                    test_file_object_id,
1656                    HandleOptions::default(),
1657                    None,
1658                )
1659                .await
1660                {
1661                    // Check it has the contents we expect.
1662                    let mut buffer = test_file.allocate_buffer(4096).await;
1663                    let bytes = test_file.read(0, buffer.as_mut()).await.expect("read failed");
1664                    if bytes == 4096 {
1665                        let expected = [0xed; 4096];
1666                        assert_eq!(buffer.as_slice(), &expected);
1667                    } else {
1668                        // If the write didn't make it, the file should have zero bytes.
1669                        assert_eq!(bytes, 0);
1670                    }
1671
1672                    // Modify the test file.
1673                    let mut transaction =
1674                        test_file.new_transaction().await.expect("new_transaction failed");
1675                    buffer.as_mut_slice().fill(0x37);
1676                    test_file
1677                        .txn_write(&mut transaction, 0, buffer.as_ref())
1678                        .await
1679                        .expect("txn_write failed");
1680                    transaction.commit().await.expect("commit failed");
1681                    check_test_file = true;
1682                }
1683
1684                object.object_id()
1685            } else {
1686                INVALID_OBJECT_ID
1687            };
1688
1689            // This will do a controlled unmount.
1690            fs.close().await.expect("close failed");
1691            let device = fs.take_device().await;
1692            device.reopen(false);
1693
1694            let fs = FxFilesystem::open(device).await.expect("open failed");
1695            fsck(fs.clone()).await.expect("fsck failed");
1696
1697            // As mentioned above, make sure that the object we created before the clean unmount
1698            // exists.
1699            if object_id != INVALID_OBJECT_ID {
1700                fsck_volume(&fs, store_id, Some(Arc::new(new_insecure_crypt())))
1701                    .await
1702                    .expect("fsck_volume failed");
1703
1704                let store = root_volume(fs.clone())
1705                    .await
1706                    .expect("root_volume failed")
1707                    .volume(
1708                        "test",
1709                        StoreOptions {
1710                            crypt: Some(Arc::new(new_insecure_crypt())),
1711                            ..StoreOptions::default()
1712                        },
1713                    )
1714                    .await
1715                    .expect("volume failed");
1716                // We should be able to open the C object.
1717                ObjectStore::open_object(&store, object_id, HandleOptions::default(), None)
1718                    .await
1719                    .expect("open_object failed");
1720
1721                // If we made the modification to the test file, check it.
1722                if check_test_file {
1723                    info!("Checking test file for modification");
1724                    let test_file = ObjectStore::open_object(
1725                        &store,
1726                        test_file_object_id,
1727                        HandleOptions::default(),
1728                        None,
1729                    )
1730                    .await
1731                    .expect("open_object failed");
1732                    let mut buffer = test_file.allocate_buffer(4096).await;
1733                    assert_eq!(
1734                        test_file.read(0, buffer.as_mut()).await.expect("read failed"),
1735                        4096
1736                    );
1737                    let expected = [0x37; 4096];
1738                    assert_eq!(buffer.as_slice(), &expected);
1739                }
1740            }
1741
1742            fs.close().await.expect("close failed");
1743        }
1744    }
1745
1746    #[fuchsia::test]
1747    async fn test_barrier_not_emitted_when_transaction_has_no_data() {
1748        let barrier_count = Arc::new(AtomicU32::new(0));
1749
1750        struct Observer(Arc<AtomicU32>);
1751
1752        impl fake_device::Observer for Observer {
1753            fn barrier(&self) {
1754                self.0.fetch_add(1, atomic::Ordering::Relaxed);
1755            }
1756        }
1757
1758        let mut fake_device = FakeDevice::new(8192, 4096);
1759        fake_device.set_observer(Box::new(Observer(barrier_count.clone())));
1760        let device = DeviceHolder::new(fake_device);
1761        let fs = FxFilesystemBuilder::new()
1762            .barriers_enabled(true)
1763            .format(true)
1764            .open(device)
1765            .await
1766            .expect("new filesystem failed");
1767
1768        {
1769            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1770            root_vol
1771                .new_volume(
1772                    "test",
1773                    NewChildStoreOptions {
1774                        options: StoreOptions {
1775                            crypt: Some(Arc::new(new_insecure_crypt())),
1776                            ..StoreOptions::default()
1777                        },
1778                        ..NewChildStoreOptions::default()
1779                    },
1780                )
1781                .await
1782                .expect("there is no test volume");
1783            fs.close().await.expect("close failed");
1784        }
1785        // Remount the filesystem to ensure that the journal flushes and we can get a reliable
1786        // measure of the number of barriers issued during setup.
1787        let device = fs.take_device().await;
1788        device.reopen(false);
1789        let fs = FxFilesystemBuilder::new()
1790            .barriers_enabled(true)
1791            .open(device)
1792            .await
1793            .expect("new filesystem failed");
1794        let expected_barrier_count = barrier_count.load(atomic::Ordering::Relaxed);
1795
1796        let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1797        let store = root_vol
1798            .volume(
1799                "test",
1800                StoreOptions {
1801                    crypt: Some(Arc::new(new_insecure_crypt())),
1802                    ..StoreOptions::default()
1803                },
1804            )
1805            .await
1806            .expect("there is no test volume");
1807
1808        // Create a number of files with the goal of using up more than one journal block.
1809        let fs = store.filesystem();
1810        let root_directory =
1811            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
1812        for i in 0..100 {
1813            let mut transaction = fs
1814                .clone()
1815                .new_transaction(
1816                    lock_keys![LockKey::object(
1817                        store.store_object_id(),
1818                        store.root_directory_object_id()
1819                    )],
1820                    Options::default(),
1821                )
1822                .await
1823                .expect("new_transaction failed");
1824            root_directory
1825                .create_child_file(&mut transaction, &format!("A {i}"))
1826                .await
1827                .expect("create_child_file failed");
1828            transaction.commit().await.expect("commit failed");
1829        }
1830
1831        // Unmount the filesystem to ensure that the journal flushes.
1832        fs.close().await.expect("close failed");
1833        // Ensure that no barriers were emitted while creating files, as no data was written.
1834        assert_eq!(expected_barrier_count, barrier_count.load(atomic::Ordering::Relaxed));
1835    }
1836
1837    #[fuchsia::test]
1838    async fn test_barrier_emitted_when_transaction_includes_data() {
1839        let barrier_count = Arc::new(AtomicU32::new(0));
1840
1841        struct Observer(Arc<AtomicU32>);
1842
1843        impl fake_device::Observer for Observer {
1844            fn barrier(&self) {
1845                self.0.fetch_add(1, atomic::Ordering::Relaxed);
1846            }
1847        }
1848
1849        let mut fake_device = FakeDevice::new(8192, 4096);
1850        fake_device.set_observer(Box::new(Observer(barrier_count.clone())));
1851        let device = DeviceHolder::new(fake_device);
1852        let fs = FxFilesystemBuilder::new()
1853            .barriers_enabled(true)
1854            .format(true)
1855            .open(device)
1856            .await
1857            .expect("new filesystem failed");
1858
1859        {
1860            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1861            root_vol
1862                .new_volume(
1863                    "test",
1864                    NewChildStoreOptions {
1865                        options: StoreOptions {
1866                            crypt: Some(Arc::new(new_insecure_crypt())),
1867                            ..StoreOptions::default()
1868                        },
1869                        ..NewChildStoreOptions::default()
1870                    },
1871                )
1872                .await
1873                .expect("there is no test volume");
1874            fs.close().await.expect("close failed");
1875        }
1876        // Remount the filesystem to ensure that the journal flushes and we can get a reliable
1877        // measure of the number of barriers issued during setup.
1878        let device = fs.take_device().await;
1879        device.reopen(false);
1880        let fs = FxFilesystemBuilder::new()
1881            .barriers_enabled(true)
1882            .open(device)
1883            .await
1884            .expect("new filesystem failed");
1885        let expected_barrier_count = barrier_count.load(atomic::Ordering::Relaxed);
1886
1887        let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1888        let store = root_vol
1889            .volume(
1890                "test",
1891                StoreOptions {
1892                    crypt: Some(Arc::new(new_insecure_crypt())),
1893                    ..StoreOptions::default()
1894                },
1895            )
1896            .await
1897            .expect("there is no test volume");
1898
1899        // Create a file and write something to it. This should cause a barrier to be emitted.
1900        let fs: Arc<FxFilesystem> = store.filesystem();
1901        let root_directory =
1902            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
1903
1904        let mut transaction = fs
1905            .clone()
1906            .new_transaction(
1907                lock_keys![LockKey::object(
1908                    store.store_object_id(),
1909                    store.root_directory_object_id()
1910                )],
1911                Options::default(),
1912            )
1913            .await
1914            .expect("new_transaction failed");
1915        let object = root_directory
1916            .create_child_file(&mut transaction, "test")
1917            .await
1918            .expect("create_child_file failed");
1919        transaction.commit().await.expect("commit failed");
1920
1921        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
1922        let mut buffer = object.allocate_buffer(4096).await;
1923        buffer.as_mut_slice().fill(0xed);
1924        object.txn_write(&mut transaction, 0, buffer.as_ref()).await.expect("txn_write failed");
1925        transaction.commit().await.expect("commit failed");
1926
1927        // Unmount the filesystem to ensure that the journal flushes.
1928        fs.close().await.expect("close failed");
1929        // Ensure that a barrier was emitted while writing to the file.
1930        assert!(expected_barrier_count < barrier_count.load(atomic::Ordering::Relaxed));
1931    }
1932
1933    #[test_case(true; "fail when original filesystem has barriers enabled")]
1934    #[test_case(false; "fail when original filesystem has barriers disabled")]
1935    #[fuchsia::test]
1936    async fn test_switching_barrier_mode_on_existing_filesystem(original_barrier_mode: bool) {
1937        let crypt = Some(Arc::new(new_insecure_crypt()) as Arc<dyn fxfs_crypto::Crypt>);
1938        let fake_device = FakeDevice::new(8192, 4096);
1939        let device = DeviceHolder::new(fake_device);
1940        let fs: super::OpenFxFilesystem = FxFilesystemBuilder::new()
1941            .barriers_enabled(original_barrier_mode)
1942            .format(true)
1943            .open(device)
1944            .await
1945            .expect("new filesystem failed");
1946
1947        // Create a volume named test with a file inside it called file.
1948        {
1949            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1950            let store = root_vol
1951                .new_volume(
1952                    "test",
1953                    NewChildStoreOptions {
1954                        options: StoreOptions { crypt: crypt.clone(), ..Default::default() },
1955                        ..Default::default()
1956                    },
1957                )
1958                .await
1959                .expect("creating test volume");
1960            let root_dir = Directory::open(&store, store.root_directory_object_id())
1961                .await
1962                .expect("open failed");
1963            let mut transaction = fs
1964                .clone()
1965                .new_transaction(
1966                    lock_keys![LockKey::object(
1967                        store.store_object_id(),
1968                        store.root_directory_object_id()
1969                    )],
1970                    Default::default(),
1971                )
1972                .await
1973                .expect("new_transaction failed");
1974            let object = root_dir
1975                .create_child_file(&mut transaction, "file")
1976                .await
1977                .expect("create_child_file failed");
1978            transaction.commit().await.expect("commit failed");
1979            let mut buffer = object.allocate_buffer(4096).await;
1980            buffer.as_mut_slice().fill(0xA7);
1981            let new_size = object.write_or_append(None, buffer.as_ref()).await.unwrap();
1982            assert_eq!(new_size, 4096);
1983        }
1984
1985        // Remount the filesystem with the opposite barrier mode and write more data to our file.
1986        fs.close().await.expect("close failed");
1987        let device = fs.take_device().await;
1988        device.reopen(false);
1989        let fs = FxFilesystemBuilder::new()
1990            .barriers_enabled(!original_barrier_mode)
1991            .open(device)
1992            .await
1993            .expect("new filesystem failed");
1994        {
1995            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1996            let store = root_vol
1997                .volume("test", StoreOptions { crypt: crypt.clone(), ..Default::default() })
1998                .await
1999                .expect("opening test volume");
2000            let root_dir = Directory::open(&store, store.root_directory_object_id())
2001                .await
2002                .expect("open failed");
2003            let (object_id, _, _) =
2004                root_dir.lookup("file").await.expect("lookup failed").expect("missing file");
2005            let test_file = ObjectStore::open_object(&store, object_id, Default::default(), None)
2006                .await
2007                .expect("open failed");
2008            // Write some more data.
2009            let mut buffer = test_file.allocate_buffer(4096).await;
2010            buffer.as_mut_slice().fill(0xA8);
2011            let new_size = test_file.write_or_append(None, buffer.as_ref()).await.unwrap();
2012            assert_eq!(new_size, 8192);
2013        }
2014
2015        // Lastly, remount the filesystems with the original barrier mode and make sure everything
2016        // can be read from the file as expected.
2017        fs.close().await.expect("close failed");
2018        let device = fs.take_device().await;
2019        device.reopen(false);
2020        let fs = FxFilesystemBuilder::new()
2021            .barriers_enabled(original_barrier_mode)
2022            .open(device)
2023            .await
2024            .expect("new filesystem failed");
2025        {
2026            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2027            let store = root_vol
2028                .volume("test", StoreOptions { crypt: crypt.clone(), ..Default::default() })
2029                .await
2030                .expect("opening test volume");
2031            let root_dir = Directory::open(&store, store.root_directory_object_id())
2032                .await
2033                .expect("open failed");
2034            let (object_id, _, _) =
2035                root_dir.lookup("file").await.expect("lookup failed").expect("missing file");
2036            let test_file = ObjectStore::open_object(&store, object_id, Default::default(), None)
2037                .await
2038                .expect("open failed");
2039            let mut buffer = test_file.allocate_buffer(8192).await;
2040            assert_eq!(
2041                test_file.read(0, buffer.as_mut()).await.expect("read failed"),
2042                8192,
2043                "short read"
2044            );
2045            assert_eq!(buffer.as_slice()[0..4096], [0xA7; 4096]);
2046            assert_eq!(buffer.as_slice()[4096..8192], [0xA8; 4096]);
2047        }
2048        fs.close().await.expect("close failed");
2049    }
2050
2051    #[fuchsia::test]
2052    async fn test_image_builder_mode_no_early_writes() {
2053        const BLOCK_SIZE: u32 = 4096;
2054        let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
2055        device.reopen(true);
2056        let fs = FxFilesystemBuilder::new()
2057            .format(true)
2058            .image_builder_mode(Some(SuperBlockInstance::A))
2059            .open(device)
2060            .await
2061            .expect("open failed");
2062        fs.enable_allocations();
2063        // fs.close() now performs compaction (writing superblock), so device must be writable.
2064        fs.device().reopen(false);
2065        fs.close().await.expect("closed");
2066    }
2067
2068    #[fuchsia::test]
2069    async fn test_image_builder_mode() {
2070        const BLOCK_SIZE: u32 = 4096;
2071        const EXISTING_FILE_RANGE: Range<u64> = 4096 * 1024..4096 * 1025;
2072        let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
2073
2074        // Write some fake file data at an offset in the image and confirm it as an fxfs file below.
2075        {
2076            let mut write_buf =
2077                device.allocate_buffer(EXISTING_FILE_RANGE.length().unwrap() as usize).await;
2078            write_buf.as_mut_slice().fill(0xf0);
2079            device.write(EXISTING_FILE_RANGE.start, write_buf.as_ref()).await.expect("write");
2080        }
2081
2082        device.reopen(true);
2083
2084        let device = {
2085            let fs = FxFilesystemBuilder::new()
2086                .format(true)
2087                .image_builder_mode(Some(SuperBlockInstance::B))
2088                .open(device)
2089                .await
2090                .expect("open failed");
2091            fs.enable_allocations();
2092            {
2093                let root_store = fs.root_store();
2094                let root_directory =
2095                    Directory::open(&root_store, root_store.root_directory_object_id())
2096                        .await
2097                        .expect("open failed");
2098                // Create a file referencing existing data on device.
2099                let handle;
2100                {
2101                    let mut transaction = fs
2102                        .clone()
2103                        .new_transaction(
2104                            lock_keys![LockKey::object(
2105                                root_directory.store().store_object_id(),
2106                                root_directory.object_id()
2107                            )],
2108                            Options::default(),
2109                        )
2110                        .await
2111                        .expect("new transaction");
2112                    handle = root_directory
2113                        .create_child_file(&mut transaction, "test")
2114                        .await
2115                        .expect("create file");
2116                    handle.extend(&mut transaction, EXISTING_FILE_RANGE).await.expect("extend");
2117                    transaction.commit().await.expect("commit");
2118                }
2119            }
2120            fs.device().reopen(false);
2121            fs.close().await.expect("close");
2122            fs.take_device().await
2123        };
2124        device.reopen(false);
2125        let fs = FxFilesystem::open(device).await.expect("open failed");
2126        fsck(fs.clone()).await.expect("fsck failed");
2127
2128        // Confirm that the test file points at the correct data.
2129        let root_store = fs.root_store();
2130        let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
2131            .await
2132            .expect("open failed");
2133        let (object_id, descriptor, _) =
2134            root_directory.lookup("test").await.expect("lookup failed").unwrap();
2135        assert_eq!(descriptor, ObjectDescriptor::File);
2136        let test_file =
2137            ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
2138                .await
2139                .expect("open failed");
2140        let mut read_buf =
2141            test_file.allocate_buffer(EXISTING_FILE_RANGE.length().unwrap() as usize).await;
2142        test_file.read(0, read_buf.as_mut()).await.expect("read failed");
2143        assert_eq!(read_buf.as_slice(), [0xf0; 4096]);
2144        fs.close().await.expect("closed");
2145    }
2146
2147    #[fuchsia::test]
2148    async fn test_read_only_mount_on_full_filesystem() {
2149        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2150        let fs =
2151            FxFilesystemBuilder::new().format(true).open(device).await.expect("new_empty failed");
2152        let root_store = fs.root_store();
2153        let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
2154            .await
2155            .expect("open failed");
2156
2157        let mut transaction = fs
2158            .clone()
2159            .new_transaction(
2160                lock_keys![LockKey::object(
2161                    root_store.store_object_id(),
2162                    root_directory.object_id()
2163                )],
2164                Options::default(),
2165            )
2166            .await
2167            .expect("new_transaction failed");
2168        let handle = root_directory
2169            .create_child_file(&mut transaction, "test")
2170            .await
2171            .expect("create_child_file failed");
2172        transaction.commit().await.expect("commit failed");
2173
2174        let mut buf = handle.allocate_buffer(4096).await;
2175        buf.as_mut_slice().fill(0xaa);
2176        loop {
2177            if handle.write_or_append(None, buf.as_ref()).await.is_err() {
2178                break;
2179            }
2180        }
2181
2182        let max_offset = fs.allocator().maximum_offset();
2183        fs.close().await.expect("Close failed");
2184
2185        let device = fs.take_device().await;
2186        device.reopen(false);
2187        let mut buffer = device
2188            .allocate_buffer(
2189                crate::round::round_up(max_offset, TEST_DEVICE_BLOCK_SIZE).unwrap() as usize
2190            )
2191            .await;
2192        device.read(0, buffer.as_mut()).await.expect("read failed");
2193
2194        let device = DeviceHolder::new(
2195            FakeDevice::from_image(&buffer.as_slice()[..], TEST_DEVICE_BLOCK_SIZE)
2196                .expect("from_image failed"),
2197        );
2198        let fs =
2199            FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
2200        fs.close().await.expect("Close failed");
2201    }
2202
2203    #[test_case(SuperBlockInstance::A; "Superblock instance A")]
2204    #[test_case(SuperBlockInstance::B; "Superblock instance B")]
2205    #[fuchsia::test]
2206    async fn test_image_builder_mode_flush_on_close_sb_a(target_sb: SuperBlockInstance) {
2207        const BLOCK_SIZE: u32 = 4096;
2208        let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
2209
2210        // 1. Initialize in image_builder_mode
2211        device.reopen(true);
2212        let fs = FxFilesystemBuilder::new()
2213            .format(true)
2214            .image_builder_mode(Some(target_sb))
2215            .open(device)
2216            .await
2217            .expect("open failed");
2218
2219        fs.enable_allocations();
2220
2221        // 2. Finalize logic (via close)
2222        fs.device().reopen(false);
2223
2224        // 3. Write data
2225        {
2226            let root_store = fs.root_store();
2227            let root_directory =
2228                Directory::open(&root_store, root_store.root_directory_object_id())
2229                    .await
2230                    .expect("open failed");
2231
2232            let mut transaction = fs
2233                .clone()
2234                .new_transaction(
2235                    lock_keys![LockKey::object(
2236                        root_directory.store().store_object_id(),
2237                        root_directory.object_id()
2238                    )],
2239                    Options::default(),
2240                )
2241                .await
2242                .expect("new transaction");
2243            let handle = root_directory
2244                .create_child_file(&mut transaction, "post_finalize_file")
2245                .await
2246                .expect("create file");
2247            transaction.commit().await.expect("commit");
2248
2249            let mut buf = handle.allocate_buffer(BLOCK_SIZE as usize).await;
2250            buf.as_mut_slice().fill(0xaa);
2251            handle.write_or_append(None, buf.as_ref()).await.expect("write failed");
2252        }
2253
2254        // 4. Close. Should flush to `target_sb` only.
2255        fs.close().await.expect("close failed");
2256
2257        let other_sb = target_sb.next();
2258
2259        // 5. Verify `target_sb` is valid and `other_sb` is empty.
2260        let device = fs.take_device().await;
2261        device.reopen(true); // Read-only is fine for verifying.
2262        let mut buf = device.allocate_buffer(BLOCK_SIZE as usize).await;
2263
2264        device.read(target_sb.first_extent().start, buf.as_mut()).await.expect("read target_sb");
2265        assert_eq!(&buf.as_slice()[..8], b"FxfsSupr", "target_sb should have magic bytes");
2266
2267        buf.as_mut_slice().fill(0); // Clear buffer
2268        device.read(other_sb.first_extent().start, buf.as_mut()).await.expect("read other_sb");
2269        // Expecting all zeros for `other_sb`
2270        assert_eq!(buf.as_slice(), &[0; 4096], "other_sb should be zeroed");
2271    }
2272}