fxfs/
filesystem.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::errors::FxfsError;
6use crate::fsck::{FsckOptions, fsck_volume_with_options, fsck_with_options};
7use crate::log::*;
8use crate::metrics;
9use crate::object_store::allocator::{Allocator, Hold, Reservation};
10use crate::object_store::directory::Directory;
11use crate::object_store::graveyard::Graveyard;
12use crate::object_store::journal::super_block::{SuperBlockHeader, SuperBlockInstance};
13use crate::object_store::journal::{self, Journal, JournalCheckpoint, JournalOptions};
14use crate::object_store::object_manager::ObjectManager;
15use crate::object_store::transaction::{
16    self, AssocObj, LockKey, LockKeys, LockManager, MetadataReservation, Mutation, ReadGuard,
17    TRANSACTION_METADATA_MAX_AMOUNT, Transaction, WriteGuard, lock_keys,
18};
19use crate::object_store::volume::{VOLUMES_DIRECTORY, root_volume};
20use crate::object_store::{NewChildStoreOptions, ObjectStore, StoreOptions};
21use crate::range::RangeExt;
22use crate::serialized_types::{LATEST_VERSION, Version};
23use anyhow::{Context, Error, anyhow, bail, ensure};
24use async_trait::async_trait;
25use event_listener::Event;
26use fuchsia_async as fasync;
27use fuchsia_inspect::{Inspector, LazyNode, NumericProperty as _, UintProperty};
28use fuchsia_sync::Mutex;
29use futures::FutureExt;
30use fxfs_crypto::Crypt;
31use static_assertions::const_assert;
32use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
33use std::sync::{Arc, OnceLock, Weak};
34use storage_device::{Device, DeviceHolder};
35
36pub const MIN_BLOCK_SIZE: u64 = 4096;
37pub const MAX_BLOCK_SIZE: u64 = u16::MAX as u64 + 1;
38
39// Whilst Fxfs could support up to u64::MAX, off_t is i64 so allowing files larger than that becomes
40// difficult to deal with via the POSIX APIs. Additionally, PagedObjectHandle only sees data get
41// modified in page chunks so to prevent writes at i64::MAX the entire page containing i64::MAX
42// needs to be excluded.
43pub const MAX_FILE_SIZE: u64 = i64::MAX as u64 - 4095;
44const_assert!(9223372036854771712 == MAX_FILE_SIZE);
45
46// The maximum number of transactions that can be in-flight at any time.
47const MAX_IN_FLIGHT_TRANSACTIONS: u64 = 4;
48
49// Start trimming 1 hour after boot.  The idea here is to wait until the initial flurry of
50// activity during boot is finished.  This is a rough heuristic and may need to change later if
51// performance is affected.
52const TRIM_AFTER_BOOT_TIMER: std::time::Duration = std::time::Duration::from_secs(60 * 60);
53
54// After the initial trim, perform another trim every 24 hours.
55const TRIM_INTERVAL_TIMER: std::time::Duration = std::time::Duration::from_secs(60 * 60 * 24);
56
57/// Holds information on an Fxfs Filesystem
58pub struct Info {
59    pub total_bytes: u64,
60    pub used_bytes: u64,
61}
62
63pub type PostCommitHook =
64    Option<Box<dyn Fn() -> futures::future::BoxFuture<'static, ()> + Send + Sync>>;
65
66pub type PreCommitHook = Option<Box<dyn Fn(&Transaction<'_>) -> Result<(), Error> + Send + Sync>>;
67
68pub struct Options {
69    /// True if the filesystem is read-only.
70    pub read_only: bool,
71
72    /// The metadata keys will be rolled after this many bytes.  This must be large enough such that
73    /// we can't end up with more than two live keys (so it must be bigger than the maximum possible
74    /// size of unflushed journal contents).  This is exposed for testing purposes.
75    pub roll_metadata_key_byte_count: u64,
76
77    /// A callback that runs before every transaction is committed.  If this callback returns an
78    /// error then the transaction is failed with that error.
79    pub pre_commit_hook: PreCommitHook,
80
81    /// A callback that runs after every transaction has been committed.  This will be called whilst
82    /// a lock is held which will block more transactions from being committed.
83    pub post_commit_hook: PostCommitHook,
84
85    /// If true, don't do an initial reap of the graveyard at mount time.  This is useful for
86    /// testing.
87    pub skip_initial_reap: bool,
88
89    // The first duration is how long after the filesystem has been mounted to perform an initial
90    // trim.  The second is the interval to repeat trimming thereafter.  If set to None, no trimming
91    // is done.
92    // Default values are (5 minutes, 24 hours).
93    pub trim_config: Option<(std::time::Duration, std::time::Duration)>,
94
95    // If set, journal will not be used for writes. The user must call 'finalize' when finished.
96    // The provided superblock instance will be written upon finalize().
97    pub image_builder_mode: Option<SuperBlockInstance>,
98
99    // If true, the filesystem will use the hardware's inline crypto engine to write encrypted
100    // data. Requires the block device to support inline encryption and for `barriers_enabled` to
101    // be true.
102    // TODO(https://fxbug.dev/393196849): For now, this flag only prevents the filesystem from
103    // computing checksums. Update this comment when the filesystem actually uses inline
104    // encryption.
105    pub inline_crypto_enabled: bool,
106
107    // Configures the filesystem to use barriers instead of checksums to ensure consistency.
108    // Checksums may be computed and stored in extent records but will no longer be stored in the
109    // journal. The journal will use barriers to enforce proper ordering between data and metadata
110    // writes. Must be true if `inline_crypto_enabled` is true.
111    pub barriers_enabled: bool,
112}
113
114impl Default for Options {
115    fn default() -> Self {
116        Options {
117            roll_metadata_key_byte_count: 128 * 1024 * 1024,
118            read_only: false,
119            pre_commit_hook: None,
120            post_commit_hook: None,
121            skip_initial_reap: false,
122            trim_config: Some((TRIM_AFTER_BOOT_TIMER, TRIM_INTERVAL_TIMER)),
123            image_builder_mode: None,
124            inline_crypto_enabled: false,
125            barriers_enabled: false,
126        }
127    }
128}
129
130/// The context in which a transaction is being applied.
131pub struct ApplyContext<'a, 'b> {
132    /// The mode indicates whether the transaction is being replayed.
133    pub mode: ApplyMode<'a, 'b>,
134
135    /// The transaction checkpoint for this mutation.
136    pub checkpoint: JournalCheckpoint,
137}
138
139/// A transaction can be applied during replay or on a live running system (in which case a
140/// transaction object will be available).
141pub enum ApplyMode<'a, 'b> {
142    Replay,
143    Live(&'a Transaction<'b>),
144}
145
146impl ApplyMode<'_, '_> {
147    pub fn is_replay(&self) -> bool {
148        matches!(self, ApplyMode::Replay)
149    }
150
151    pub fn is_live(&self) -> bool {
152        matches!(self, ApplyMode::Live(_))
153    }
154}
155
156/// Objects that use journaling to track mutations (`Allocator` and `ObjectStore`) implement this.
157/// This is primarily used by `ObjectManager` and `SuperBlock` with flush calls used in a few tests.
158#[async_trait]
159pub trait JournalingObject: Send + Sync {
160    /// This method get called when the transaction commits, which can either be during live
161    /// operation (See `ObjectManager::apply_mutation`) or during journal replay, in which case
162    /// transaction will be None (See `super_block::read`).
163    fn apply_mutation(
164        &self,
165        mutation: Mutation,
166        context: &ApplyContext<'_, '_>,
167        assoc_obj: AssocObj<'_>,
168    ) -> Result<(), Error>;
169
170    /// Called when a transaction fails to commit.
171    fn drop_mutation(&self, mutation: Mutation, transaction: &Transaction<'_>);
172
173    /// Flushes in-memory changes to the device (to allow journal space to be freed).
174    ///
175    /// Also returns the earliest version of a struct in the filesystem.
176    async fn flush(&self) -> Result<Version, Error>;
177
178    /// Writes a mutation to the journal.  This allows objects to encrypt or otherwise modify what
179    /// gets written to the journal.
180    fn write_mutation(&self, mutation: &Mutation, mut writer: journal::Writer<'_>) {
181        writer.write(mutation.clone());
182    }
183}
184
185#[derive(Default)]
186pub struct SyncOptions<'a> {
187    /// If set, the journal will be flushed, as well as the underlying block device.  This is much
188    /// more expensive, but ensures the contents of the journal are persisted (which also acts as a
189    /// barrier, ensuring all previous journal writes are observable by future operations).
190    /// Note that when this is not set, the journal is *not* synchronously flushed by the sync call,
191    /// and it will return before the journal flush completes.  In other words, some journal
192    /// mutations may still be buffered in memory after this call returns.
193    pub flush_device: bool,
194
195    /// A precondition that is evaluated whilst a lock is held that determines whether or not the
196    /// sync needs to proceed.
197    pub precondition: Option<Box<dyn FnOnce() -> bool + 'a + Send>>,
198}
199
200pub struct OpenFxFilesystem(Arc<FxFilesystem>);
201
202impl OpenFxFilesystem {
203    /// Waits for filesystem to be dropped (so callers should ensure all direct and indirect
204    /// references are dropped) and returns the device.  No attempt is made at a graceful shutdown.
205    pub async fn take_device(self) -> DeviceHolder {
206        let fut = self.device.take_when_dropped();
207        std::mem::drop(self);
208        debug_assert_not_too_long!(fut)
209    }
210
211    /// Used to finalize a filesystem image when in image_builder_mode.
212    pub async fn finalize(&self) -> Result<(), Error> {
213        ensure!(
214            self.journal().image_builder_mode().is_some(),
215            "finalize() only valid in image_builder_mode."
216        );
217        self.journal().allocate_journal().await?;
218        self.journal().set_image_builder_mode(None);
219        self.journal().compact().await?;
220        Ok(())
221    }
222}
223
224impl From<Arc<FxFilesystem>> for OpenFxFilesystem {
225    fn from(fs: Arc<FxFilesystem>) -> Self {
226        Self(fs)
227    }
228}
229
230impl Drop for OpenFxFilesystem {
231    fn drop(&mut self) {
232        if self.options.image_builder_mode.is_some()
233            && self.journal().image_builder_mode().is_some()
234        {
235            error!("OpenFxFilesystem in image_builder_mode dropped without calling finalize().");
236        }
237        if !self.options.read_only && !self.closed.load(Ordering::SeqCst) {
238            error!("OpenFxFilesystem dropped without first being closed. Data loss may occur.");
239        }
240    }
241}
242
243impl std::ops::Deref for OpenFxFilesystem {
244    type Target = Arc<FxFilesystem>;
245
246    fn deref(&self) -> &Self::Target {
247        &self.0
248    }
249}
250
251pub struct FxFilesystemBuilder {
252    format: bool,
253    trace: bool,
254    options: Options,
255    journal_options: JournalOptions,
256    on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
257    on_new_store: Option<Box<dyn Fn(&ObjectStore) + Send + Sync>>,
258    fsck_after_every_transaction: bool,
259}
260
261impl FxFilesystemBuilder {
262    pub fn new() -> Self {
263        Self {
264            format: false,
265            trace: false,
266            options: Options::default(),
267            journal_options: JournalOptions::default(),
268            on_new_allocator: None,
269            on_new_store: None,
270            fsck_after_every_transaction: false,
271        }
272    }
273
274    /// Sets whether the block device should be formatted when opened. Defaults to `false`.
275    pub fn format(mut self, format: bool) -> Self {
276        self.format = format;
277        self
278    }
279
280    /// Enables or disables trace level logging. Defaults to `false`.
281    pub fn trace(mut self, trace: bool) -> Self {
282        self.trace = trace;
283        self
284    }
285
286    /// Sets whether the filesystem will be opened in read-only mode. Defaults to `false`.
287    /// Incompatible with `format`.
288    pub fn read_only(mut self, read_only: bool) -> Self {
289        self.options.read_only = read_only;
290        self
291    }
292
293    /// For image building and in-place migration.
294    ///
295    /// This mode avoids the initial write of super blocks and skips the journal for all
296    /// transactions. The user *must* call `finalize()` before closing the filesystem to trigger
297    /// a compaction of in-memory data structures, a minimal journal and a write to one
298    /// superblock (as specified).
299    pub fn image_builder_mode(mut self, mode: Option<SuperBlockInstance>) -> Self {
300        self.options.image_builder_mode = mode;
301        self
302    }
303
304    /// Sets how often the metadata keys are rolled. See `Options::roll_metadata_key_byte_count`.
305    pub fn roll_metadata_key_byte_count(mut self, roll_metadata_key_byte_count: u64) -> Self {
306        self.options.roll_metadata_key_byte_count = roll_metadata_key_byte_count;
307        self
308    }
309
310    /// Sets a callback that runs before every transaction. See `Options::pre_commit_hook`.
311    pub fn pre_commit_hook(
312        mut self,
313        hook: impl Fn(&Transaction<'_>) -> Result<(), Error> + Send + Sync + 'static,
314    ) -> Self {
315        self.options.pre_commit_hook = Some(Box::new(hook));
316        self
317    }
318
319    /// Sets a callback that runs after every transaction has been committed. See
320    /// `Options::post_commit_hook`.
321    pub fn post_commit_hook(
322        mut self,
323        hook: impl Fn() -> futures::future::BoxFuture<'static, ()> + Send + Sync + 'static,
324    ) -> Self {
325        self.options.post_commit_hook = Some(Box::new(hook));
326        self
327    }
328
329    /// Sets whether to do an initial reap of the graveyard at mount time. See
330    /// `Options::skip_initial_reap`. Defaults to `false`.
331    pub fn skip_initial_reap(mut self, skip_initial_reap: bool) -> Self {
332        self.options.skip_initial_reap = skip_initial_reap;
333        self
334    }
335
336    /// Sets the options for the journal.
337    pub fn journal_options(mut self, journal_options: JournalOptions) -> Self {
338        self.journal_options = journal_options;
339        self
340    }
341
342    /// Sets a method to be called immediately after creating the allocator.
343    pub fn on_new_allocator(
344        mut self,
345        on_new_allocator: impl Fn(Arc<Allocator>) + Send + Sync + 'static,
346    ) -> Self {
347        self.on_new_allocator = Some(Box::new(on_new_allocator));
348        self
349    }
350
351    /// Sets a method to be called each time a new store is registered with `ObjectManager`.
352    pub fn on_new_store(
353        mut self,
354        on_new_store: impl Fn(&ObjectStore) + Send + Sync + 'static,
355    ) -> Self {
356        self.on_new_store = Some(Box::new(on_new_store));
357        self
358    }
359
360    /// Enables or disables running fsck after every transaction. Defaults to `false`.
361    pub fn fsck_after_every_transaction(mut self, fsck_after_every_transaction: bool) -> Self {
362        self.fsck_after_every_transaction = fsck_after_every_transaction;
363        self
364    }
365
366    pub fn trim_config(
367        mut self,
368        delay_and_interval: Option<(std::time::Duration, std::time::Duration)>,
369    ) -> Self {
370        self.options.trim_config = delay_and_interval;
371        self
372    }
373
374    /// Enables or disables inline encryption. Defaults to `false`.
375    pub fn inline_crypto_enabled(mut self, inline_crypto_enabled: bool) -> Self {
376        self.options.inline_crypto_enabled = inline_crypto_enabled;
377        self
378    }
379
380    /// Enables or disables barriers in both the filesystem and journal options.
381    /// Defaults to `false`.
382    pub fn barriers_enabled(mut self, barriers_enabled: bool) -> Self {
383        self.options.barriers_enabled = barriers_enabled;
384        self.journal_options.barriers_enabled = barriers_enabled;
385        self
386    }
387
388    /// Constructs an `FxFilesystem` object with the specified settings.
389    pub async fn open(self, device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
390        let read_only = self.options.read_only;
391        if self.format && read_only {
392            bail!("Cannot initialize a filesystem as read-only");
393        }
394
395        // Inline encryption requires barriers to be enabled.
396        if self.options.inline_crypto_enabled && !self.options.barriers_enabled {
397            bail!("A filesystem using inline encryption requires barriers");
398        }
399
400        let objects = Arc::new(ObjectManager::new(self.on_new_store));
401        let journal = Arc::new(Journal::new(objects.clone(), self.journal_options));
402
403        let image_builder_mode = self.options.image_builder_mode;
404
405        let block_size = std::cmp::max(device.block_size().into(), MIN_BLOCK_SIZE);
406        assert_eq!(block_size % MIN_BLOCK_SIZE, 0);
407        assert!(block_size <= MAX_BLOCK_SIZE, "Max supported block size is 64KiB");
408
409        let mut fsck_after_every_transaction = None;
410        let mut filesystem_options = self.options;
411        if self.fsck_after_every_transaction {
412            let instance =
413                FsckAfterEveryTransaction::new(filesystem_options.post_commit_hook.take());
414            fsck_after_every_transaction = Some(instance.clone());
415            filesystem_options.post_commit_hook =
416                Some(Box::new(move || instance.clone().run().boxed()));
417        }
418
419        if !read_only && !self.format {
420            // See comment in JournalRecord::DidFlushDevice for why we need to flush the device
421            // before replay.
422            device.flush().await.context("Device flush failed")?;
423        }
424
425        let filesystem = Arc::new_cyclic(|weak: &Weak<FxFilesystem>| {
426            let weak = weak.clone();
427            FxFilesystem {
428                device,
429                block_size,
430                objects: objects.clone(),
431                journal,
432                commit_mutex: futures::lock::Mutex::new(()),
433                lock_manager: LockManager::new(),
434                flush_task: Mutex::new(None),
435                trim_task: Mutex::new(None),
436                closed: AtomicBool::new(true),
437                shutdown_event: Event::new(),
438                trace: self.trace,
439                graveyard: Graveyard::new(objects.clone()),
440                completed_transactions: metrics::detail().create_uint("completed_transactions", 0),
441                options: filesystem_options,
442                in_flight_transactions: AtomicU64::new(0),
443                transaction_limit_event: Event::new(),
444                _stores_node: metrics::register_fs(move || {
445                    let weak = weak.clone();
446                    Box::pin(async move {
447                        if let Some(fs) = weak.upgrade() {
448                            fs.populate_stores_node().await
449                        } else {
450                            Err(anyhow!("Filesystem has been dropped"))
451                        }
452                    })
453                }),
454            }
455        });
456
457        filesystem.journal().set_image_builder_mode(image_builder_mode);
458
459        filesystem.journal.set_trace(self.trace);
460        if self.format {
461            filesystem.journal.init_empty(filesystem.clone()).await?;
462            if image_builder_mode.is_none() {
463                // The filesystem isn't valid until superblocks are written but we want to defer
464                // that until last when migrating filesystems or building system images.
465                filesystem.journal.init_superblocks().await?;
466
467                // Start the graveyard's background reaping task.
468                filesystem.graveyard.clone().reap_async();
469            }
470
471            // Create the root volume directory.
472            let root_store = filesystem.root_store();
473            root_store.set_trace(self.trace);
474            let root_directory =
475                Directory::open(&root_store, root_store.root_directory_object_id())
476                    .await
477                    .context("Unable to open root volume directory")?;
478            let mut transaction = filesystem
479                .clone()
480                .new_transaction(
481                    lock_keys![LockKey::object(
482                        root_store.store_object_id(),
483                        root_directory.object_id()
484                    )],
485                    transaction::Options::default(),
486                )
487                .await?;
488            let volume_directory =
489                root_directory.create_child_dir(&mut transaction, VOLUMES_DIRECTORY).await?;
490            transaction.commit().await?;
491            objects.set_volume_directory(volume_directory);
492        } else {
493            filesystem
494                .journal
495                .replay(filesystem.clone(), self.on_new_allocator)
496                .await
497                .context("Journal replay failed")?;
498            filesystem.root_store().set_trace(self.trace);
499
500            if !read_only {
501                // Queue all purged entries for tombstoning.  Don't start the reaper yet because
502                // that can trigger a flush which can add more entries to the graveyard which might
503                // get caught in the initial reap and cause objects to be prematurely tombstoned.
504                for store in objects.unlocked_stores() {
505                    filesystem.graveyard.initial_reap(&store).await?;
506                }
507            }
508        }
509
510        // This must be after we've formatted the filesystem; it will fail during format otherwise.
511        if let Some(fsck_after_every_transaction) = fsck_after_every_transaction {
512            fsck_after_every_transaction
513                .fs
514                .set(Arc::downgrade(&filesystem))
515                .unwrap_or_else(|_| unreachable!());
516        }
517
518        filesystem.closed.store(false, Ordering::SeqCst);
519
520        if !read_only && image_builder_mode.is_none() {
521            // Start the background tasks.
522            filesystem.graveyard.clone().reap_async();
523
524            if let Some((delay, interval)) = filesystem.options.trim_config.clone() {
525                filesystem.start_trim_task(delay, interval);
526            }
527        }
528
529        Ok(filesystem.into())
530    }
531}
532
533pub struct FxFilesystem {
534    block_size: u64,
535    objects: Arc<ObjectManager>,
536    journal: Arc<Journal>,
537    commit_mutex: futures::lock::Mutex<()>,
538    lock_manager: LockManager,
539    flush_task: Mutex<Option<fasync::Task<()>>>,
540    trim_task: Mutex<Option<fasync::Task<()>>>,
541    closed: AtomicBool,
542    // An event that is signalled when the filesystem starts to shut down.
543    shutdown_event: Event,
544    trace: bool,
545    graveyard: Arc<Graveyard>,
546    completed_transactions: UintProperty,
547    options: Options,
548
549    // The number of in-flight transactions which we will limit to MAX_IN_FLIGHT_TRANSACTIONS.
550    in_flight_transactions: AtomicU64,
551
552    // An event that is used to wake up tasks that are blocked due to the in-flight transaction
553    // limit.
554    transaction_limit_event: Event,
555
556    // NOTE: This *must* go last so that when users take the device from a closed filesystem, the
557    // filesystem has dropped all other members first (Rust drops members in declaration order).
558    device: DeviceHolder,
559
560    // The "stores" node in the Inspect tree.
561    _stores_node: LazyNode,
562}
563
564#[fxfs_trace::trace]
565impl FxFilesystem {
566    pub async fn new_empty(device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
567        FxFilesystemBuilder::new().format(true).open(device).await
568    }
569
570    pub async fn open(device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
571        FxFilesystemBuilder::new().open(device).await
572    }
573
574    pub fn root_parent_store(&self) -> Arc<ObjectStore> {
575        self.objects.root_parent_store()
576    }
577
578    pub async fn close(&self) -> Result<(), Error> {
579        assert_eq!(self.closed.swap(true, Ordering::SeqCst), false);
580        self.shutdown_event.notify(usize::MAX);
581        debug_assert_not_too_long!(self.graveyard.wait_for_reap());
582        let trim_task = self.trim_task.lock().take();
583        if let Some(task) = trim_task {
584            debug_assert_not_too_long!(task);
585        }
586        self.journal.stop_compactions().await;
587        let sync_status =
588            self.journal.sync(SyncOptions { flush_device: true, ..Default::default() }).await;
589        match &sync_status {
590            Ok(checkpoint) => info!(
591                "Filesystem closed (checkpoint={}, metadata_reservation={:?}, \
592                 reservation_required={}, borrowed={})",
593                checkpoint.as_ref().unwrap().0.file_offset,
594                self.object_manager().metadata_reservation(),
595                self.object_manager().required_reservation(),
596                self.object_manager().borrowed_metadata_space(),
597            ),
598            Err(e) => error!(error:? = e; "Failed to sync filesystem; data may be lost"),
599        }
600        self.journal.terminate();
601        let flush_task = self.flush_task.lock().take();
602        if let Some(task) = flush_task {
603            debug_assert_not_too_long!(task);
604        }
605        // Regardless of whether sync succeeds, we should close the device, since otherwise we will
606        // crash instead of exiting gracefully.
607        self.device().close().await.context("Failed to close device")?;
608        sync_status.map(|_| ())
609    }
610
611    pub fn device(&self) -> Arc<dyn Device> {
612        Arc::clone(&self.device)
613    }
614
615    pub fn root_store(&self) -> Arc<ObjectStore> {
616        self.objects.root_store()
617    }
618
619    pub fn allocator(&self) -> Arc<Allocator> {
620        self.objects.allocator()
621    }
622
623    pub fn object_manager(&self) -> &Arc<ObjectManager> {
624        &self.objects
625    }
626
627    pub fn journal(&self) -> &Arc<Journal> {
628        &self.journal
629    }
630
631    pub async fn sync(&self, options: SyncOptions<'_>) -> Result<(), Error> {
632        self.journal.sync(options).await.map(|_| ())
633    }
634
635    pub fn block_size(&self) -> u64 {
636        self.block_size
637    }
638
639    pub fn get_info(&self) -> Info {
640        Info {
641            total_bytes: self.device.size(),
642            used_bytes: self.object_manager().allocator().get_used_bytes().0,
643        }
644    }
645
646    pub fn super_block_header(&self) -> SuperBlockHeader {
647        self.journal.super_block_header()
648    }
649
650    pub fn graveyard(&self) -> &Arc<Graveyard> {
651        &self.graveyard
652    }
653
654    pub fn trace(&self) -> bool {
655        self.trace
656    }
657
658    pub fn options(&self) -> &Options {
659        &self.options
660    }
661
662    /// Returns a guard that must be taken before any transaction can commence.  This guard takes a
663    /// shared lock on the filesystem.  `fsck` will take an exclusive lock so that it can get a
664    /// consistent picture of the filesystem that it can verify.  It is important that this lock is
665    /// acquired before *all* other locks.  It is also important that this lock is not taken twice
666    /// by the same task since that can lead to deadlocks if another task tries to take a write
667    /// lock.
668    pub async fn txn_guard(self: Arc<Self>) -> TxnGuard<'static> {
669        TxnGuard::Owned(
670            self.lock_manager
671                .read_lock(lock_keys!(LockKey::Filesystem))
672                .await
673                .into_owned(self.clone()),
674        )
675    }
676
677    pub async fn new_transaction<'a>(
678        self: Arc<Self>,
679        locks: LockKeys,
680        options: transaction::Options<'a>,
681    ) -> Result<Transaction<'a>, Error> {
682        let guard = if let Some(guard) = options.txn_guard.as_ref() {
683            TxnGuard::Borrowed(guard)
684        } else {
685            self.txn_guard().await
686        };
687        Transaction::new(guard, options, locks).await
688    }
689
690    #[trace]
691    pub async fn commit_transaction(
692        &self,
693        transaction: &mut Transaction<'_>,
694        callback: &mut (dyn FnMut(u64) + Send),
695    ) -> Result<u64, Error> {
696        if let Some(hook) = self.options.pre_commit_hook.as_ref() {
697            hook(transaction)?;
698        }
699        debug_assert_not_too_long!(self.lock_manager.commit_prepare(&transaction));
700        self.maybe_start_flush_task();
701        let _guard = debug_assert_not_too_long!(self.commit_mutex.lock());
702        let journal_offset = if self.journal().image_builder_mode().is_some() {
703            let journal_checkpoint =
704                JournalCheckpoint { file_offset: 0, checksum: 0, version: LATEST_VERSION };
705            let maybe_mutation = self
706                .object_manager()
707                .apply_transaction(transaction, &journal_checkpoint)
708                .expect("Transactions must not fail in image_builder_mode");
709            if let Some(mutation) = maybe_mutation {
710                assert!(matches!(mutation, Mutation::UpdateBorrowed(_)));
711                // These are Mutation::UpdateBorrowed which are normally used to track borrowing of
712                // metadata reservations. As we are image-building and not using the journal,
713                // we don't track this.
714            }
715            self.object_manager().did_commit_transaction(transaction, &journal_checkpoint, 0);
716            0
717        } else {
718            self.journal.commit(transaction).await?
719        };
720        self.completed_transactions.add(1);
721
722        // For now, call the callback whilst holding the lock.  Technically, we don't need to do
723        // that except if there's a post-commit-hook (which there usually won't be).  We can
724        // consider changing this if we need to for performance, but we'd need to double check that
725        // callers don't depend on this.
726        callback(journal_offset);
727
728        if let Some(hook) = self.options.post_commit_hook.as_ref() {
729            hook().await;
730        }
731
732        Ok(journal_offset)
733    }
734
735    pub fn lock_manager(&self) -> &LockManager {
736        &self.lock_manager
737    }
738
739    pub(crate) fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
740        if !matches!(transaction.metadata_reservation, MetadataReservation::None) {
741            self.sub_transaction();
742        }
743        // If we placed a hold for metadata space, return it now.
744        if let MetadataReservation::Hold(hold_amount) =
745            std::mem::replace(&mut transaction.metadata_reservation, MetadataReservation::None)
746        {
747            let hold = transaction
748                .allocator_reservation
749                .unwrap()
750                .reserve(0)
751                .expect("Zero should always succeed.");
752            hold.add(hold_amount);
753        }
754        self.objects.drop_transaction(transaction);
755        self.lock_manager.drop_transaction(transaction);
756    }
757
758    fn maybe_start_flush_task(&self) {
759        let mut flush_task = self.flush_task.lock();
760        if flush_task.is_none() {
761            let journal = self.journal.clone();
762            *flush_task = Some(fasync::Task::spawn(journal.flush_task()));
763        }
764    }
765
766    // Returns the number of bytes trimmed.
767    async fn do_trim(&self) -> Result<usize, Error> {
768        const MAX_EXTENTS_PER_BATCH: usize = 8;
769        const MAX_EXTENT_SIZE: usize = 256 * 1024;
770        let mut offset = 0;
771        let mut bytes_trimmed = 0;
772        loop {
773            if self.closed.load(Ordering::Relaxed) {
774                info!("Filesystem is closed, nothing to trim");
775                return Ok(bytes_trimmed);
776            }
777            let allocator = self.allocator();
778            let trimmable_extents =
779                allocator.take_for_trimming(offset, MAX_EXTENT_SIZE, MAX_EXTENTS_PER_BATCH).await?;
780            for device_range in trimmable_extents.extents() {
781                self.device.trim(device_range.clone()).await?;
782                bytes_trimmed += device_range.length()? as usize;
783            }
784            if let Some(device_range) = trimmable_extents.extents().last() {
785                offset = device_range.end;
786            } else {
787                break;
788            }
789        }
790        Ok(bytes_trimmed)
791    }
792
793    fn start_trim_task(
794        self: &Arc<Self>,
795        delay: std::time::Duration,
796        interval: std::time::Duration,
797    ) {
798        if !self.device.supports_trim() {
799            info!("Device does not support trim; not scheduling trimming");
800            return;
801        }
802        let this = self.clone();
803        let mut next_timer = delay;
804        *self.trim_task.lock() = Some(fasync::Task::spawn(async move {
805            loop {
806                let shutdown_listener = this.shutdown_event.listen();
807                // Note that we need to check if the filesystem was closed after we start listening
808                // to the shutdown event, but before we start waiting on `timer`, because otherwise
809                // we might start listening on `shutdown_event` *after* the event was signaled, and
810                // so `shutdown_listener` will never fire, and this task will get stuck until
811                // `timer` expires.
812                if this.closed.load(Ordering::SeqCst) {
813                    return;
814                }
815                futures::select!(
816                    () = fasync::Timer::new(next_timer.clone()).fuse() => {},
817                    () = shutdown_listener.fuse() => return,
818                );
819                let start_time = std::time::Instant::now();
820                let res = this.do_trim().await;
821                let duration = std::time::Instant::now() - start_time;
822                next_timer = interval.clone();
823                match res {
824                    Ok(bytes_trimmed) => info!(
825                        "Trimmed {bytes_trimmed} bytes in {duration:?}.  Next trim in \
826                        {next_timer:?}",
827                    ),
828                    Err(e) => error!(e:?; "Failed to trim"),
829                }
830            }
831        }));
832    }
833
834    pub(crate) async fn reservation_for_transaction<'a>(
835        self: &Arc<Self>,
836        options: transaction::Options<'a>,
837    ) -> Result<(MetadataReservation, Option<&'a Reservation>, Option<Hold<'a>>), Error> {
838        if self.options.image_builder_mode.is_some() {
839            // Image builder mode avoids the journal so reservation tracking for metadata overheads
840            // doesn't make sense and so we essentially have 'all or nothing' semantics instead.
841            return Ok((MetadataReservation::Borrowed, None, None));
842        }
843        if !options.skip_journal_checks {
844            self.maybe_start_flush_task();
845            self.journal.check_journal_space().await?;
846        }
847
848        // We support three options for metadata space reservation:
849        //
850        //   1. We can borrow from the filesystem's metadata reservation.  This should only be
851        //      be used on the understanding that eventually, potentially after a full compaction,
852        //      there should be no net increase in space used.  For example, unlinking an object
853        //      should eventually decrease the amount of space used and setting most attributes
854        //      should not result in any change.
855        //
856        //   2. A reservation is provided in which case we'll place a hold on some of it for
857        //      metadata.
858        //
859        //   3. No reservation is supplied, so we try and reserve space with the allocator now,
860        //      and will return NoSpace if that fails.
861        let mut hold = None;
862        let metadata_reservation = if options.borrow_metadata_space {
863            MetadataReservation::Borrowed
864        } else {
865            match options.allocator_reservation {
866                Some(reservation) => {
867                    hold = Some(
868                        reservation
869                            .reserve(TRANSACTION_METADATA_MAX_AMOUNT)
870                            .ok_or(FxfsError::NoSpace)?,
871                    );
872                    MetadataReservation::Hold(TRANSACTION_METADATA_MAX_AMOUNT)
873                }
874                None => {
875                    let reservation = self
876                        .allocator()
877                        .reserve(None, TRANSACTION_METADATA_MAX_AMOUNT)
878                        .ok_or(FxfsError::NoSpace)?;
879                    MetadataReservation::Reservation(reservation)
880                }
881            }
882        };
883        Ok((metadata_reservation, options.allocator_reservation, hold))
884    }
885
886    pub(crate) async fn add_transaction(&self, skip_journal_checks: bool) {
887        if skip_journal_checks {
888            self.in_flight_transactions.fetch_add(1, Ordering::Relaxed);
889        } else {
890            let inc = || {
891                let mut in_flights = self.in_flight_transactions.load(Ordering::Relaxed);
892                while in_flights < MAX_IN_FLIGHT_TRANSACTIONS {
893                    match self.in_flight_transactions.compare_exchange_weak(
894                        in_flights,
895                        in_flights + 1,
896                        Ordering::Relaxed,
897                        Ordering::Relaxed,
898                    ) {
899                        Ok(_) => return true,
900                        Err(x) => in_flights = x,
901                    }
902                }
903                return false;
904            };
905            while !inc() {
906                let listener = self.transaction_limit_event.listen();
907                if inc() {
908                    break;
909                }
910                listener.await;
911            }
912        }
913    }
914
915    pub(crate) fn sub_transaction(&self) {
916        let old = self.in_flight_transactions.fetch_sub(1, Ordering::Relaxed);
917        assert!(old != 0);
918        if old <= MAX_IN_FLIGHT_TRANSACTIONS {
919            self.transaction_limit_event.notify(usize::MAX);
920        }
921    }
922
923    pub async fn truncate_guard(&self, store_id: u64, object_id: u64) -> TruncateGuard<'_> {
924        let keys = lock_keys![LockKey::truncate(store_id, object_id,)];
925        TruncateGuard(self.lock_manager().write_lock(keys).await)
926    }
927
928    async fn populate_stores_node(&self) -> Result<Inspector, Error> {
929        let inspector = fuchsia_inspect::Inspector::default();
930        let root = inspector.root();
931        root.record_child("__root", |n| self.root_store().record_data(n));
932        let object_manager = self.object_manager();
933        let volume_directory = object_manager.volume_directory();
934        let layer_set = volume_directory.store().tree().layer_set();
935        let mut merger = layer_set.merger();
936        let mut iter = volume_directory.iter(&mut merger).await?;
937        while let Some((name, id, _)) = iter.get() {
938            if let Some(store) = object_manager.store(id) {
939                root.record_child(name.to_string(), |n| store.record_data(n));
940            }
941            iter.advance().await?;
942        }
943        Ok(inspector)
944    }
945}
946
947pub enum TxnGuard<'a> {
948    Borrowed(&'a TxnGuard<'a>),
949    Owned(ReadGuard<'static>),
950}
951
952impl TxnGuard<'_> {
953    pub fn fs(&self) -> &Arc<FxFilesystem> {
954        match self {
955            TxnGuard::Borrowed(b) => b.fs(),
956            TxnGuard::Owned(o) => o.fs().unwrap(),
957        }
958    }
959}
960
961/// A wrapper around a guard that needs to be taken when truncating an object.
962#[allow(dead_code)]
963pub struct TruncateGuard<'a>(WriteGuard<'a>);
964
965/// Helper method for making a new filesystem.
966pub async fn mkfs(device: DeviceHolder) -> Result<DeviceHolder, Error> {
967    let fs = FxFilesystem::new_empty(device).await?;
968    fs.close().await?;
969    Ok(fs.take_device().await)
970}
971
972/// Helper method for making a new filesystem with a single named volume.
973/// This shouldn't be used in production; instead volumes should be created with the Volumes
974/// protocol.
975pub async fn mkfs_with_volume(
976    device: DeviceHolder,
977    volume_name: &str,
978    crypt: Option<Arc<dyn Crypt>>,
979) -> Result<DeviceHolder, Error> {
980    let fs = FxFilesystem::new_empty(device).await?;
981    {
982        // expect instead of propagating errors here, since otherwise we could drop |fs| before
983        // close is called, which leads to confusing and unrelated error messages.
984        let root_volume = root_volume(fs.clone()).await.expect("Open root_volume failed");
985        root_volume
986            .new_volume(
987                volume_name,
988                NewChildStoreOptions {
989                    options: StoreOptions { crypt, ..StoreOptions::default() },
990                    ..Default::default()
991                },
992            )
993            .await
994            .expect("Create volume failed");
995    }
996    fs.close().await?;
997    Ok(fs.take_device().await)
998}
999
1000struct FsckAfterEveryTransaction {
1001    fs: OnceLock<Weak<FxFilesystem>>,
1002    old_hook: PostCommitHook,
1003}
1004
1005impl FsckAfterEveryTransaction {
1006    fn new(old_hook: PostCommitHook) -> Arc<Self> {
1007        Arc::new(Self { fs: OnceLock::new(), old_hook })
1008    }
1009
1010    async fn run(self: Arc<Self>) {
1011        if let Some(fs) = self.fs.get().and_then(Weak::upgrade) {
1012            let options = FsckOptions {
1013                fail_on_warning: true,
1014                no_lock: true,
1015                quiet: true,
1016                ..Default::default()
1017            };
1018            fsck_with_options(fs.clone(), &options).await.expect("fsck failed");
1019            let object_manager = fs.object_manager();
1020            for store in object_manager.unlocked_stores() {
1021                let store_id = store.store_object_id();
1022                if !object_manager.is_system_store(store_id) {
1023                    fsck_volume_with_options(fs.as_ref(), &options, store_id, None)
1024                        .await
1025                        .expect("fsck_volume_with_options failed");
1026                }
1027            }
1028        }
1029        if let Some(old_hook) = self.old_hook.as_ref() {
1030            old_hook().await;
1031        }
1032    }
1033}
1034
1035#[cfg(test)]
1036mod tests {
1037    use super::{FxFilesystem, FxFilesystemBuilder, SyncOptions};
1038    use crate::fsck::{fsck, fsck_volume};
1039    use crate::log::*;
1040    use crate::lsm_tree::Operation;
1041    use crate::lsm_tree::types::Item;
1042    use crate::object_handle::{
1043        INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle, WriteObjectHandle,
1044    };
1045    use crate::object_store::directory::{Directory, replace_child};
1046    use crate::object_store::journal::JournalOptions;
1047    use crate::object_store::journal::super_block::SuperBlockInstance;
1048    use crate::object_store::transaction::{LockKey, Options, lock_keys};
1049    use crate::object_store::volume::root_volume;
1050    use crate::object_store::{
1051        HandleOptions, NewChildStoreOptions, ObjectDescriptor, ObjectStore, StoreOptions,
1052    };
1053    use crate::range::RangeExt;
1054    use fuchsia_async as fasync;
1055    use fuchsia_sync::Mutex;
1056    use futures::future::join_all;
1057    use futures::stream::{FuturesUnordered, TryStreamExt};
1058    use fxfs_insecure_crypto::InsecureCrypt;
1059    use rustc_hash::FxHashMap as HashMap;
1060    use std::ops::Range;
1061    use std::sync::Arc;
1062    use std::sync::atomic::{self, AtomicU32};
1063    use std::time::Duration;
1064    use storage_device::DeviceHolder;
1065    use storage_device::fake_device::{self, FakeDevice};
1066    use test_case::test_case;
1067
1068    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
1069
1070    #[fuchsia::test(threads = 10)]
1071    async fn test_compaction() {
1072        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1073
1074        // If compaction is not working correctly, this test will run out of space.
1075        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1076        let root_store = fs.root_store();
1077        let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1078            .await
1079            .expect("open failed");
1080
1081        let mut tasks = Vec::new();
1082        for i in 0..2 {
1083            let mut transaction = fs
1084                .clone()
1085                .new_transaction(
1086                    lock_keys![LockKey::object(
1087                        root_store.store_object_id(),
1088                        root_directory.object_id()
1089                    )],
1090                    Options::default(),
1091                )
1092                .await
1093                .expect("new_transaction failed");
1094            let handle = root_directory
1095                .create_child_file(&mut transaction, &format!("{}", i))
1096                .await
1097                .expect("create_child_file failed");
1098            transaction.commit().await.expect("commit failed");
1099            tasks.push(fasync::Task::spawn(async move {
1100                const TEST_DATA: &[u8] = b"hello";
1101                let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
1102                buf.as_mut_slice().copy_from_slice(TEST_DATA);
1103                for _ in 0..1500 {
1104                    handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1105                }
1106            }));
1107        }
1108        join_all(tasks).await;
1109        fs.sync(SyncOptions::default()).await.expect("sync failed");
1110
1111        fsck(fs.clone()).await.expect("fsck failed");
1112        fs.close().await.expect("Close failed");
1113    }
1114
1115    #[fuchsia::test(threads = 10)]
1116    async fn test_replay_is_identical() {
1117        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1118        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1119
1120        // Reopen the store, but set reclaim size to a very large value which will effectively
1121        // stop the journal from flushing and allows us to track all the mutations to the store.
1122        fs.close().await.expect("close failed");
1123        let device = fs.take_device().await;
1124        device.reopen(false);
1125
1126        struct Mutations<K, V>(Mutex<Vec<(Operation, Item<K, V>)>>);
1127
1128        impl<K: Clone, V: Clone> Mutations<K, V> {
1129            fn new() -> Self {
1130                Mutations(Mutex::new(Vec::new()))
1131            }
1132
1133            fn push(&self, operation: Operation, item: &Item<K, V>) {
1134                self.0.lock().push((operation, item.clone()));
1135            }
1136        }
1137
1138        let open_fs = |device,
1139                       object_mutations: Arc<Mutex<HashMap<_, _>>>,
1140                       allocator_mutations: Arc<Mutations<_, _>>| async {
1141            FxFilesystemBuilder::new()
1142                .journal_options(JournalOptions { reclaim_size: u64::MAX, ..Default::default() })
1143                .on_new_allocator(move |allocator| {
1144                    let allocator_mutations = allocator_mutations.clone();
1145                    allocator.tree().set_mutation_callback(Some(Box::new(move |op, item| {
1146                        allocator_mutations.push(op, item)
1147                    })));
1148                })
1149                .on_new_store(move |store| {
1150                    let mutations = Arc::new(Mutations::new());
1151                    object_mutations.lock().insert(store.store_object_id(), mutations.clone());
1152                    store.tree().set_mutation_callback(Some(Box::new(move |op, item| {
1153                        mutations.push(op, item)
1154                    })));
1155                })
1156                .open(device)
1157                .await
1158                .expect("open failed")
1159        };
1160
1161        let allocator_mutations = Arc::new(Mutations::new());
1162        let object_mutations = Arc::new(Mutex::new(HashMap::default()));
1163        let fs = open_fs(device, object_mutations.clone(), allocator_mutations.clone()).await;
1164
1165        let root_store = fs.root_store();
1166        let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1167            .await
1168            .expect("open failed");
1169
1170        let mut transaction = fs
1171            .clone()
1172            .new_transaction(
1173                lock_keys![LockKey::object(
1174                    root_store.store_object_id(),
1175                    root_directory.object_id()
1176                )],
1177                Options::default(),
1178            )
1179            .await
1180            .expect("new_transaction failed");
1181        let object = root_directory
1182            .create_child_file(&mut transaction, "test")
1183            .await
1184            .expect("create_child_file failed");
1185        transaction.commit().await.expect("commit failed");
1186
1187        // Append some data.
1188        let buf = object.allocate_buffer(10000).await;
1189        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1190
1191        // Overwrite some data.
1192        object.write_or_append(Some(5000), buf.as_ref()).await.expect("write failed");
1193
1194        // Truncate.
1195        object.truncate(3000).await.expect("truncate failed");
1196
1197        // Delete the object.
1198        let mut transaction = fs
1199            .clone()
1200            .new_transaction(
1201                lock_keys![
1202                    LockKey::object(root_store.store_object_id(), root_directory.object_id()),
1203                    LockKey::object(root_store.store_object_id(), object.object_id()),
1204                ],
1205                Options::default(),
1206            )
1207            .await
1208            .expect("new_transaction failed");
1209
1210        replace_child(&mut transaction, None, (&root_directory, "test"))
1211            .await
1212            .expect("replace_child failed");
1213
1214        transaction.commit().await.expect("commit failed");
1215
1216        // Finally tombstone the object.
1217        root_store
1218            .tombstone_object(object.object_id(), Options::default())
1219            .await
1220            .expect("tombstone failed");
1221
1222        // Now reopen and check that replay produces the same set of mutations.
1223        fs.close().await.expect("close failed");
1224
1225        let metadata_reservation_amount = fs.object_manager().metadata_reservation().amount();
1226
1227        let device = fs.take_device().await;
1228        device.reopen(false);
1229
1230        let replayed_object_mutations = Arc::new(Mutex::new(HashMap::default()));
1231        let replayed_allocator_mutations = Arc::new(Mutations::new());
1232        let fs = open_fs(
1233            device,
1234            replayed_object_mutations.clone(),
1235            replayed_allocator_mutations.clone(),
1236        )
1237        .await;
1238
1239        let m1 = object_mutations.lock();
1240        let m2 = replayed_object_mutations.lock();
1241        assert_eq!(m1.len(), m2.len());
1242        for (store_id, mutations) in &*m1 {
1243            let mutations = mutations.0.lock();
1244            let replayed = m2.get(&store_id).expect("Found unexpected store").0.lock();
1245            assert_eq!(mutations.len(), replayed.len());
1246            for ((op1, i1), (op2, i2)) in mutations.iter().zip(replayed.iter()) {
1247                assert_eq!(op1, op2);
1248                assert_eq!(i1.key, i2.key);
1249                assert_eq!(i1.value, i2.value);
1250                assert_eq!(i1.sequence, i2.sequence);
1251            }
1252        }
1253
1254        let a1 = allocator_mutations.0.lock();
1255        let a2 = replayed_allocator_mutations.0.lock();
1256        assert_eq!(a1.len(), a2.len());
1257        for ((op1, i1), (op2, i2)) in a1.iter().zip(a2.iter()) {
1258            assert_eq!(op1, op2);
1259            assert_eq!(i1.key, i2.key);
1260            assert_eq!(i1.value, i2.value);
1261            assert_eq!(i1.sequence, i2.sequence);
1262        }
1263
1264        assert_eq!(
1265            fs.object_manager().metadata_reservation().amount(),
1266            metadata_reservation_amount
1267        );
1268    }
1269
1270    #[fuchsia::test]
1271    async fn test_max_in_flight_transactions() {
1272        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1273        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1274
1275        let transactions = FuturesUnordered::new();
1276        for _ in 0..super::MAX_IN_FLIGHT_TRANSACTIONS {
1277            transactions.push(fs.clone().new_transaction(lock_keys![], Options::default()));
1278        }
1279        let mut transactions: Vec<_> = transactions.try_collect().await.unwrap();
1280
1281        // Trying to create another one should be blocked.
1282        let mut fut = std::pin::pin!(fs.clone().new_transaction(lock_keys![], Options::default()));
1283        assert!(futures::poll!(&mut fut).is_pending());
1284
1285        // Dropping one should allow it to proceed.
1286        transactions.pop();
1287
1288        assert!(futures::poll!(&mut fut).is_ready());
1289    }
1290
1291    // If run on a single thread, the trim tasks starve out other work.
1292    #[fuchsia::test(threads = 10)]
1293    async fn test_continuously_trim() {
1294        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1295        let fs = FxFilesystemBuilder::new()
1296            .trim_config(Some((Duration::ZERO, Duration::ZERO)))
1297            .format(true)
1298            .open(device)
1299            .await
1300            .expect("open failed");
1301        // Do a small sleep so trim has time to get going.
1302        fasync::Timer::new(Duration::from_millis(10)).await;
1303
1304        // Create and delete a bunch of files whilst trim is ongoing.  This just ensures that
1305        // regular usage isn't affected by trim.
1306        let root_store = fs.root_store();
1307        let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1308            .await
1309            .expect("open failed");
1310        for _ in 0..100 {
1311            let mut transaction = fs
1312                .clone()
1313                .new_transaction(
1314                    lock_keys![LockKey::object(
1315                        root_store.store_object_id(),
1316                        root_directory.object_id()
1317                    )],
1318                    Options::default(),
1319                )
1320                .await
1321                .expect("new_transaction failed");
1322            let object = root_directory
1323                .create_child_file(&mut transaction, "test")
1324                .await
1325                .expect("create_child_file failed");
1326            transaction.commit().await.expect("commit failed");
1327
1328            {
1329                let buf = object.allocate_buffer(1024).await;
1330                object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1331            }
1332            std::mem::drop(object);
1333
1334            let mut transaction = root_directory
1335                .acquire_context_for_replace(None, "test", true)
1336                .await
1337                .expect("acquire_context_for_replace failed")
1338                .transaction;
1339            replace_child(&mut transaction, None, (&root_directory, "test"))
1340                .await
1341                .expect("replace_child failed");
1342            transaction.commit().await.expect("commit failed");
1343        }
1344        fs.close().await.expect("close failed");
1345    }
1346
1347    #[test_case(true; "test power fail with barriers")]
1348    #[test_case(false; "test power fail with checksums")]
1349    #[fuchsia::test]
1350    async fn test_power_fail(barriers_enabled: bool) {
1351        // This test randomly discards blocks, so we run it a few times to increase the chances
1352        // of catching an issue in a single run.
1353        for _ in 0..10 {
1354            let (store_id, device, test_file_object_id) = {
1355                let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
1356                let fs = if barriers_enabled {
1357                    FxFilesystemBuilder::new()
1358                        .barriers_enabled(true)
1359                        .format(true)
1360                        .open(device)
1361                        .await
1362                        .expect("new filesystem failed")
1363                } else {
1364                    FxFilesystem::new_empty(device).await.expect("new_empty failed")
1365                };
1366                let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
1367
1368                fs.sync(SyncOptions { flush_device: true, ..SyncOptions::default() })
1369                    .await
1370                    .expect("sync failed");
1371
1372                let store = root_volume
1373                    .new_volume(
1374                        "test",
1375                        NewChildStoreOptions {
1376                            options: StoreOptions {
1377                                crypt: Some(Arc::new(InsecureCrypt::new())),
1378                                ..StoreOptions::default()
1379                            },
1380                            ..Default::default()
1381                        },
1382                    )
1383                    .await
1384                    .expect("new_volume failed");
1385                let root_directory = Directory::open(&store, store.root_directory_object_id())
1386                    .await
1387                    .expect("open failed");
1388
1389                // Create a number of files with the goal of using up more than one journal block.
1390                async fn create_files(store: &Arc<ObjectStore>, prefix: &str) {
1391                    let fs = store.filesystem();
1392                    let root_directory = Directory::open(store, store.root_directory_object_id())
1393                        .await
1394                        .expect("open failed");
1395                    for i in 0..100 {
1396                        let mut transaction = fs
1397                            .clone()
1398                            .new_transaction(
1399                                lock_keys![LockKey::object(
1400                                    store.store_object_id(),
1401                                    store.root_directory_object_id()
1402                                )],
1403                                Options::default(),
1404                            )
1405                            .await
1406                            .expect("new_transaction failed");
1407                        root_directory
1408                            .create_child_file(&mut transaction, &format!("{prefix} {i}"))
1409                            .await
1410                            .expect("create_child_file failed");
1411                        transaction.commit().await.expect("commit failed");
1412                    }
1413                }
1414
1415                // Create one batch of files.
1416                create_files(&store, "A").await;
1417
1418                // Create a file and write something to it.  This will make sure there's a
1419                // transaction present that includes a checksum.
1420                let mut transaction = fs
1421                    .clone()
1422                    .new_transaction(
1423                        lock_keys![LockKey::object(
1424                            store.store_object_id(),
1425                            store.root_directory_object_id()
1426                        )],
1427                        Options::default(),
1428                    )
1429                    .await
1430                    .expect("new_transaction failed");
1431                let object = root_directory
1432                    .create_child_file(&mut transaction, "test")
1433                    .await
1434                    .expect("create_child_file failed");
1435                transaction.commit().await.expect("commit failed");
1436
1437                let mut transaction =
1438                    object.new_transaction().await.expect("new_transaction failed");
1439                let mut buffer = object.allocate_buffer(4096).await;
1440                buffer.as_mut_slice().fill(0xed);
1441                object
1442                    .txn_write(&mut transaction, 0, buffer.as_ref())
1443                    .await
1444                    .expect("txn_write failed");
1445                transaction.commit().await.expect("commit failed");
1446
1447                // Create another batch of files.
1448                create_files(&store, "B").await;
1449
1450                // Sync the device, but don't flush the device. We want to do this so we can
1451                // randomly discard blocks below.
1452                fs.sync(SyncOptions::default()).await.expect("sync failed");
1453
1454                // When we call `sync` above on the filesystem, it will pad the journal so that it
1455                // will get written, but it doesn't wait for the write to occur.  We wait for a
1456                // short time here to give allow time for the journal to be written.  Adding timers
1457                // isn't great, but this test already isn't deterministic since we randomly discard
1458                // blocks.
1459                fasync::Timer::new(Duration::from_millis(10)).await;
1460
1461                (
1462                    store.store_object_id(),
1463                    fs.device().snapshot().expect("snapshot failed"),
1464                    object.object_id(),
1465                )
1466            };
1467
1468            // Randomly discard blocks since the last flush.  This simulates what might happen in
1469            // the case of power-loss.  This will be an uncontrolled unmount.
1470            device
1471                .discard_random_since_last_flush()
1472                .expect("discard_random_since_last_flush failed");
1473
1474            let fs = FxFilesystem::open(device).await.expect("open failed");
1475            fsck(fs.clone()).await.expect("fsck failed");
1476
1477            let mut check_test_file = false;
1478
1479            // If we replayed and the store exists (i.e. the transaction that created the store
1480            // made it out), start by running fsck on it.
1481            let object_id = if fs.object_manager().store(store_id).is_some() {
1482                fsck_volume(&fs, store_id, Some(Arc::new(InsecureCrypt::new())))
1483                    .await
1484                    .expect("fsck_volume failed");
1485
1486                // Now we want to create another file, unmount cleanly, and then finally check that
1487                // the new file exists.  This checks that we can continue to use the filesystem
1488                // after an unclean unmount.
1489                let store = root_volume(fs.clone())
1490                    .await
1491                    .expect("root_volume failed")
1492                    .volume(
1493                        "test",
1494                        StoreOptions {
1495                            crypt: Some(Arc::new(InsecureCrypt::new())),
1496                            ..StoreOptions::default()
1497                        },
1498                    )
1499                    .await
1500                    .expect("volume failed");
1501
1502                let root_directory = Directory::open(&store, store.root_directory_object_id())
1503                    .await
1504                    .expect("open failed");
1505
1506                let mut transaction = fs
1507                    .clone()
1508                    .new_transaction(
1509                        lock_keys![LockKey::object(
1510                            store.store_object_id(),
1511                            store.root_directory_object_id()
1512                        )],
1513                        Options::default(),
1514                    )
1515                    .await
1516                    .expect("new_transaction failed");
1517                let object = root_directory
1518                    .create_child_file(&mut transaction, &format!("C"))
1519                    .await
1520                    .expect("create_child_file failed");
1521                transaction.commit().await.expect("commit failed");
1522
1523                // Write again to the test file if it exists.
1524                if let Ok(test_file) = ObjectStore::open_object(
1525                    &store,
1526                    test_file_object_id,
1527                    HandleOptions::default(),
1528                    None,
1529                )
1530                .await
1531                {
1532                    // Check it has the contents we expect.
1533                    let mut buffer = test_file.allocate_buffer(4096).await;
1534                    let bytes = test_file.read(0, buffer.as_mut()).await.expect("read failed");
1535                    if bytes == 4096 {
1536                        let expected = [0xed; 4096];
1537                        assert_eq!(buffer.as_slice(), &expected);
1538                    } else {
1539                        // If the write didn't make it, the file should have zero bytes.
1540                        assert_eq!(bytes, 0);
1541                    }
1542
1543                    // Modify the test file.
1544                    let mut transaction =
1545                        test_file.new_transaction().await.expect("new_transaction failed");
1546                    buffer.as_mut_slice().fill(0x37);
1547                    test_file
1548                        .txn_write(&mut transaction, 0, buffer.as_ref())
1549                        .await
1550                        .expect("txn_write failed");
1551                    transaction.commit().await.expect("commit failed");
1552                    check_test_file = true;
1553                }
1554
1555                object.object_id()
1556            } else {
1557                INVALID_OBJECT_ID
1558            };
1559
1560            // This will do a controlled unmount.
1561            fs.close().await.expect("close failed");
1562            let device = fs.take_device().await;
1563            device.reopen(false);
1564
1565            let fs = FxFilesystem::open(device).await.expect("open failed");
1566            fsck(fs.clone()).await.expect("fsck failed");
1567
1568            // As mentioned above, make sure that the object we created before the clean unmount
1569            // exists.
1570            if object_id != INVALID_OBJECT_ID {
1571                fsck_volume(&fs, store_id, Some(Arc::new(InsecureCrypt::new())))
1572                    .await
1573                    .expect("fsck_volume failed");
1574
1575                let store = root_volume(fs.clone())
1576                    .await
1577                    .expect("root_volume failed")
1578                    .volume(
1579                        "test",
1580                        StoreOptions {
1581                            crypt: Some(Arc::new(InsecureCrypt::new())),
1582                            ..StoreOptions::default()
1583                        },
1584                    )
1585                    .await
1586                    .expect("volume failed");
1587                // We should be able to open the C object.
1588                ObjectStore::open_object(&store, object_id, HandleOptions::default(), None)
1589                    .await
1590                    .expect("open_object failed");
1591
1592                // If we made the modification to the test file, check it.
1593                if check_test_file {
1594                    info!("Checking test file for modification");
1595                    let test_file = ObjectStore::open_object(
1596                        &store,
1597                        test_file_object_id,
1598                        HandleOptions::default(),
1599                        None,
1600                    )
1601                    .await
1602                    .expect("open_object failed");
1603                    let mut buffer = test_file.allocate_buffer(4096).await;
1604                    assert_eq!(
1605                        test_file.read(0, buffer.as_mut()).await.expect("read failed"),
1606                        4096
1607                    );
1608                    let expected = [0x37; 4096];
1609                    assert_eq!(buffer.as_slice(), &expected);
1610                }
1611            }
1612
1613            fs.close().await.expect("close failed");
1614        }
1615    }
1616
1617    #[fuchsia::test]
1618    async fn test_barrier_not_emitted_when_transaction_has_no_data() {
1619        let barrier_count = Arc::new(AtomicU32::new(0));
1620
1621        struct Observer(Arc<AtomicU32>);
1622
1623        impl fake_device::Observer for Observer {
1624            fn barrier(&self) {
1625                self.0.fetch_add(1, atomic::Ordering::Relaxed);
1626            }
1627        }
1628
1629        let mut fake_device = FakeDevice::new(8192, 4096);
1630        fake_device.set_observer(Box::new(Observer(barrier_count.clone())));
1631        let device = DeviceHolder::new(fake_device);
1632        let fs = FxFilesystemBuilder::new()
1633            .barriers_enabled(true)
1634            .format(true)
1635            .open(device)
1636            .await
1637            .expect("new filesystem failed");
1638
1639        {
1640            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1641            root_vol
1642                .new_volume(
1643                    "test",
1644                    NewChildStoreOptions {
1645                        options: StoreOptions {
1646                            crypt: Some(Arc::new(InsecureCrypt::new())),
1647                            ..StoreOptions::default()
1648                        },
1649                        ..NewChildStoreOptions::default()
1650                    },
1651                )
1652                .await
1653                .expect("there is no test volume");
1654            fs.close().await.expect("close failed");
1655        }
1656        // Remount the filesystem to ensure that the journal flushes and we can get a reliable
1657        // measure of the number of barriers issued during setup.
1658        let device = fs.take_device().await;
1659        device.reopen(false);
1660        let fs = FxFilesystemBuilder::new()
1661            .barriers_enabled(true)
1662            .open(device)
1663            .await
1664            .expect("new filesystem failed");
1665        let expected_barrier_count = barrier_count.load(atomic::Ordering::Relaxed);
1666
1667        let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1668        let store = root_vol
1669            .volume(
1670                "test",
1671                StoreOptions {
1672                    crypt: Some(Arc::new(InsecureCrypt::new())),
1673                    ..StoreOptions::default()
1674                },
1675            )
1676            .await
1677            .expect("there is no test volume");
1678
1679        // Create a number of files with the goal of using up more than one journal block.
1680        let fs = store.filesystem();
1681        let root_directory =
1682            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
1683        for i in 0..100 {
1684            let mut transaction = fs
1685                .clone()
1686                .new_transaction(
1687                    lock_keys![LockKey::object(
1688                        store.store_object_id(),
1689                        store.root_directory_object_id()
1690                    )],
1691                    Options::default(),
1692                )
1693                .await
1694                .expect("new_transaction failed");
1695            root_directory
1696                .create_child_file(&mut transaction, &format!("A {i}"))
1697                .await
1698                .expect("create_child_file failed");
1699            transaction.commit().await.expect("commit failed");
1700        }
1701
1702        // Unmount the filesystem to ensure that the journal flushes.
1703        fs.close().await.expect("close failed");
1704        // Ensure that no barriers were emitted while creating files, as no data was written.
1705        assert_eq!(expected_barrier_count, barrier_count.load(atomic::Ordering::Relaxed));
1706    }
1707
1708    #[fuchsia::test]
1709    async fn test_barrier_emitted_when_transaction_includes_data() {
1710        let barrier_count = Arc::new(AtomicU32::new(0));
1711
1712        struct Observer(Arc<AtomicU32>);
1713
1714        impl fake_device::Observer for Observer {
1715            fn barrier(&self) {
1716                self.0.fetch_add(1, atomic::Ordering::Relaxed);
1717            }
1718        }
1719
1720        let mut fake_device = FakeDevice::new(8192, 4096);
1721        fake_device.set_observer(Box::new(Observer(barrier_count.clone())));
1722        let device = DeviceHolder::new(fake_device);
1723        let fs = FxFilesystemBuilder::new()
1724            .barriers_enabled(true)
1725            .format(true)
1726            .open(device)
1727            .await
1728            .expect("new filesystem failed");
1729
1730        {
1731            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1732            root_vol
1733                .new_volume(
1734                    "test",
1735                    NewChildStoreOptions {
1736                        options: StoreOptions {
1737                            crypt: Some(Arc::new(InsecureCrypt::new())),
1738                            ..StoreOptions::default()
1739                        },
1740                        ..NewChildStoreOptions::default()
1741                    },
1742                )
1743                .await
1744                .expect("there is no test volume");
1745            fs.close().await.expect("close failed");
1746        }
1747        // Remount the filesystem to ensure that the journal flushes and we can get a reliable
1748        // measure of the number of barriers issued during setup.
1749        let device = fs.take_device().await;
1750        device.reopen(false);
1751        let fs = FxFilesystemBuilder::new()
1752            .barriers_enabled(true)
1753            .open(device)
1754            .await
1755            .expect("new filesystem failed");
1756        let expected_barrier_count = barrier_count.load(atomic::Ordering::Relaxed);
1757
1758        let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1759        let store = root_vol
1760            .volume(
1761                "test",
1762                StoreOptions {
1763                    crypt: Some(Arc::new(InsecureCrypt::new())),
1764                    ..StoreOptions::default()
1765                },
1766            )
1767            .await
1768            .expect("there is no test volume");
1769
1770        // Create a file and write something to it. This should cause a barrier to be emitted.
1771        let fs: Arc<FxFilesystem> = store.filesystem();
1772        let root_directory =
1773            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
1774
1775        let mut transaction = fs
1776            .clone()
1777            .new_transaction(
1778                lock_keys![LockKey::object(
1779                    store.store_object_id(),
1780                    store.root_directory_object_id()
1781                )],
1782                Options::default(),
1783            )
1784            .await
1785            .expect("new_transaction failed");
1786        let object = root_directory
1787            .create_child_file(&mut transaction, "test")
1788            .await
1789            .expect("create_child_file failed");
1790        transaction.commit().await.expect("commit failed");
1791
1792        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
1793        let mut buffer = object.allocate_buffer(4096).await;
1794        buffer.as_mut_slice().fill(0xed);
1795        object.txn_write(&mut transaction, 0, buffer.as_ref()).await.expect("txn_write failed");
1796        transaction.commit().await.expect("commit failed");
1797
1798        // Unmount the filesystem to ensure that the journal flushes.
1799        fs.close().await.expect("close failed");
1800        // Ensure that a barrier was emitted while writing to the file.
1801        assert!(expected_barrier_count < barrier_count.load(atomic::Ordering::Relaxed));
1802    }
1803
1804    #[fuchsia::test]
1805    async fn test_image_builder_mode_no_early_writes() {
1806        const BLOCK_SIZE: u32 = 4096;
1807        let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
1808        device.reopen(true);
1809        let fs = FxFilesystemBuilder::new()
1810            .format(true)
1811            .image_builder_mode(Some(SuperBlockInstance::A))
1812            .open(device)
1813            .await
1814            .expect("open failed");
1815        // Image builder mode only writes when data is written or fs.finalize() is called, so
1816        // we shouldn't see any errors here.
1817        fs.close().await.expect("closed");
1818    }
1819
1820    #[fuchsia::test]
1821    async fn test_image_builder_mode() {
1822        const BLOCK_SIZE: u32 = 4096;
1823        const EXISTING_FILE_RANGE: Range<u64> = 4096 * 1024..4096 * 1025;
1824        let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
1825
1826        // Write some fake file data at an offset in the image and confirm it as an fxfs file below.
1827        {
1828            let mut write_buf =
1829                device.allocate_buffer(EXISTING_FILE_RANGE.length().unwrap() as usize).await;
1830            write_buf.as_mut_slice().fill(0xf0);
1831            device.write(EXISTING_FILE_RANGE.start, write_buf.as_ref()).await.expect("write");
1832        }
1833
1834        device.reopen(true);
1835
1836        let device = {
1837            let fs = FxFilesystemBuilder::new()
1838                .format(true)
1839                .image_builder_mode(Some(SuperBlockInstance::B))
1840                .open(device)
1841                .await
1842                .expect("open failed");
1843            {
1844                let root_store = fs.root_store();
1845                let root_directory =
1846                    Directory::open(&root_store, root_store.root_directory_object_id())
1847                        .await
1848                        .expect("open failed");
1849                // Create a file referencing existing data on device.
1850                let handle;
1851                {
1852                    let mut transaction = fs
1853                        .clone()
1854                        .new_transaction(
1855                            lock_keys![LockKey::object(
1856                                root_directory.store().store_object_id(),
1857                                root_directory.object_id()
1858                            )],
1859                            Options::default(),
1860                        )
1861                        .await
1862                        .expect("new transaction");
1863                    handle = root_directory
1864                        .create_child_file(&mut transaction, "test")
1865                        .await
1866                        .expect("create file");
1867                    handle.extend(&mut transaction, EXISTING_FILE_RANGE).await.expect("extend");
1868                    transaction.commit().await.expect("commit");
1869                }
1870            }
1871            fs.device().reopen(false);
1872            fs.finalize().await.expect("finalize");
1873            fs.close().await.expect("close");
1874            fs.take_device().await
1875        };
1876        device.reopen(false);
1877        let fs = FxFilesystem::open(device).await.expect("open failed");
1878        fsck(fs.clone()).await.expect("fsck failed");
1879
1880        // Confirm that the test file points at the correct data.
1881        let root_store = fs.root_store();
1882        let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1883            .await
1884            .expect("open failed");
1885        let (object_id, descriptor, _) =
1886            root_directory.lookup("test").await.expect("lookup failed").unwrap();
1887        assert_eq!(descriptor, ObjectDescriptor::File);
1888        let test_file =
1889            ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
1890                .await
1891                .expect("open failed");
1892        let mut read_buf =
1893            test_file.allocate_buffer(EXISTING_FILE_RANGE.length().unwrap() as usize).await;
1894        test_file.read(0, read_buf.as_mut()).await.expect("read failed");
1895        assert_eq!(read_buf.as_slice(), [0xf0; 4096]);
1896        fs.close().await.expect("closed");
1897    }
1898}