Skip to main content

fxfs/
filesystem.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::errors::FxfsError;
6use crate::fsck::{FsckOptions, fsck_volume_with_options, fsck_with_options};
7use crate::log::*;
8use crate::metrics;
9use crate::object_store::allocator::{Allocator, Hold, Reservation};
10use crate::object_store::directory::Directory;
11use crate::object_store::graveyard::Graveyard;
12use crate::object_store::journal::super_block::{SuperBlockHeader, SuperBlockInstance};
13use crate::object_store::journal::{self, Journal, JournalCheckpoint, JournalOptions};
14use crate::object_store::object_manager::ObjectManager;
15use crate::object_store::transaction::{
16    self, AssocObj, LockKey, LockManager, MetadataReservation, Mutation,
17    TRANSACTION_METADATA_MAX_AMOUNT, Transaction, WriteGuard, lock_keys,
18};
19use crate::object_store::volume::{VOLUMES_DIRECTORY, root_volume};
20use crate::object_store::{NewChildStoreOptions, ObjectStore, StoreOptions};
21use crate::range::RangeExt;
22use crate::serialized_types::{LATEST_VERSION, Version};
23use anyhow::{Context, Error, anyhow, bail};
24use async_trait::async_trait;
25use event_listener::Event;
26use fuchsia_async as fasync;
27use fuchsia_async::condition::Condition;
28use fuchsia_inspect::{Inspector, LazyNode, NumericProperty as _, UintProperty};
29use fuchsia_sync::Mutex;
30use futures::{FutureExt, Stream};
31use fxfs_crypto::Crypt;
32use fxfs_trace::{TraceFutureExt, trace_future_args};
33use static_assertions::const_assert;
34use std::pin::pin;
35use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
36use std::sync::{Arc, OnceLock, Weak};
37use std::task::Poll;
38use std::time::{Duration, Instant};
39use storage_device::{Device, DeviceHolder};
40
41pub const MIN_BLOCK_SIZE: u64 = 4096;
42pub const MAX_BLOCK_SIZE: u64 = u16::MAX as u64 + 1;
43
44// Whilst Fxfs could support up to u64::MAX, off_t is i64 so allowing files larger than that becomes
45// difficult to deal with via the POSIX APIs. Additionally, PagedObjectHandle only sees data get
46// modified in page chunks so to prevent writes at i64::MAX the entire page containing i64::MAX
47// needs to be excluded.
48pub const MAX_FILE_SIZE: u64 = i64::MAX as u64 - 4095;
49const_assert!(9223372036854771712 == MAX_FILE_SIZE);
50
51use futures::stream::StreamExt;
52
53// The maximum number of transactions that can be in-flight at any time.
54const MAX_IN_FLIGHT_TRANSACTIONS: u64 = 4;
55
56// Start trimming 1 hour after boot.  The idea here is to wait until the initial flurry of
57// activity during boot is finished.  This is a rough heuristic and may need to change later if
58// performance is affected.
59const TRIM_AFTER_BOOT_TIMER: Duration = Duration::from_secs(60 * 60);
60
61// After the initial trim, perform another trim every 24 hours.
62const TRIM_INTERVAL_TIMER: Duration = Duration::from_secs(60 * 60 * 24);
63
64/// How often to clean the transfer buffer.
65// TODO(https://fxbug.dev/489725256) Configure the task to run when fxfs is idle.
66const CLEAN_TRANSFER_BUFFER_INTERVAL: Duration = Duration::from_secs(60);
67
68#[cfg(target_os = "fuchsia")]
69pub type WakeLease = zx::NullableHandle;
70
71#[cfg(not(target_os = "fuchsia"))]
72pub type WakeLease = fasync::emulated_handle::Handle;
73
74pub trait PowerManager: Send + Sync {
75    /// Returns a stream of battery status changes (true if using battery).
76    fn watch_battery(self: Arc<Self>) -> futures::stream::BoxStream<'static, (bool, WakeLease)>;
77}
78
79/// Holds information on an Fxfs Filesystem
80pub struct Info {
81    pub total_bytes: u64,
82    pub used_bytes: u64,
83}
84
85pub type PostCommitHook =
86    Option<Box<dyn Fn() -> futures::future::BoxFuture<'static, ()> + Send + Sync>>;
87
88pub type PreCommitHook = Option<Box<dyn Fn(&Transaction<'_>) -> Result<(), Error> + Send + Sync>>;
89
90pub struct Options {
91    /// True if the filesystem is read-only.
92    pub read_only: bool,
93
94    /// The metadata keys will be rolled after this many bytes.  This must be large enough such that
95    /// we can't end up with more than two live keys (so it must be bigger than the maximum possible
96    /// size of unflushed journal contents).  This is exposed for testing purposes.
97    pub roll_metadata_key_byte_count: u64,
98
99    /// A callback that runs before every transaction is committed.  If this callback returns an
100    /// error then the transaction is failed with that error.
101    pub pre_commit_hook: PreCommitHook,
102
103    /// A callback that runs after every transaction has been committed.  This will be called whilst
104    /// a lock is held which will block more transactions from being committed.
105    pub post_commit_hook: PostCommitHook,
106
107    /// If true, don't do an initial reap of the graveyard at mount time.  This is useful for
108    /// testing.
109    pub skip_initial_reap: bool,
110
111    // The first duration is how long after the filesystem has been mounted to perform an initial
112    // trim.  The second is the interval to repeat trimming thereafter.  If set to None, no trimming
113    // is done.
114    // Default values are (5 minutes, 24 hours).
115    pub trim_config: Option<(Duration, Duration)>,
116
117    // If set, journal will not be used for writes. The user must call 'close' when finished.
118    // The provided superblock instance will be written upon close().
119    pub image_builder_mode: Option<SuperBlockInstance>,
120
121    // If true, the filesystem will use the hardware's inline crypto engine to write encrypted
122    // data. Requires the block device to support inline encryption and for `barriers_enabled` to
123    // be true.
124    // TODO(https://fxbug.dev/393196849): For now, this flag only prevents the filesystem from
125    // computing checksums. Update this comment when the filesystem actually uses inline
126    // encryption.
127    pub inline_crypto_enabled: bool,
128
129    // Configures the filesystem to use barriers instead of checksums to ensure consistency.
130    // Checksums may be computed and stored in extent records but will no longer be stored in the
131    // journal. The journal will use barriers to enforce proper ordering between data and metadata
132    // writes. Must be true if `inline_crypto_enabled` is true.
133    pub barriers_enabled: bool,
134
135    /// If set, this will be used to check for charger status before trimming.
136    pub power_manager: Option<Arc<dyn PowerManager>>,
137
138    /// How long to wait after being placed on a charger before starting a trim.
139    pub trim_charger_wait: Duration,
140}
141
142impl Default for Options {
143    fn default() -> Self {
144        Options {
145            roll_metadata_key_byte_count: 128 * 1024 * 1024,
146            read_only: false,
147            pre_commit_hook: None,
148            post_commit_hook: None,
149            skip_initial_reap: false,
150            trim_config: Some((TRIM_AFTER_BOOT_TIMER, TRIM_INTERVAL_TIMER)),
151            image_builder_mode: None,
152            inline_crypto_enabled: false,
153            barriers_enabled: false,
154            power_manager: None,
155            trim_charger_wait: Duration::from_secs(10),
156        }
157    }
158}
159
160/// The context in which a transaction is being applied.
161pub struct ApplyContext<'a, 'b> {
162    /// The mode indicates whether the transaction is being replayed.
163    pub mode: ApplyMode<'a, 'b>,
164
165    /// The transaction checkpoint for this mutation.
166    pub checkpoint: JournalCheckpoint,
167}
168
169/// A transaction can be applied during replay or on a live running system (in which case a
170/// transaction object will be available).
171pub enum ApplyMode<'a, 'b> {
172    Replay,
173    Live(&'a Transaction<'b>),
174}
175
176impl ApplyMode<'_, '_> {
177    pub fn is_replay(&self) -> bool {
178        matches!(self, ApplyMode::Replay)
179    }
180
181    pub fn is_live(&self) -> bool {
182        matches!(self, ApplyMode::Live(_))
183    }
184}
185
186/// Objects that use journaling to track mutations (`Allocator` and `ObjectStore`) implement this.
187/// This is primarily used by `ObjectManager` and `SuperBlock` with flush calls used in a few tests.
188#[async_trait]
189pub trait JournalingObject: Send + Sync {
190    /// This method get called when the transaction commits, which can either be during live
191    /// operation (See `ObjectManager::apply_mutation`) or during journal replay, in which case
192    /// transaction will be None (See `super_block::read`).
193    fn apply_mutation(
194        &self,
195        mutation: Mutation,
196        context: &ApplyContext<'_, '_>,
197        assoc_obj: AssocObj<'_>,
198    ) -> Result<(), Error>;
199
200    /// Called when a transaction fails to commit.
201    fn drop_mutation(&self, mutation: Mutation, transaction: &Transaction<'_>);
202
203    /// Called before committing a transaction. Implementations can use this to acquire locks
204    /// or resources (like keys) that must be held until the transaction is committed.
205    async fn prepare_commit<'a>(
206        &self,
207        _filesystem: &'a FxFilesystem,
208        _transaction: &Transaction<'_>,
209    ) -> Result<Option<WriteGuard<'a>>, Error> {
210        Ok(None)
211    }
212
213    /// Flushes in-memory changes to the device (to allow journal space to be freed).
214    ///
215    /// Also returns the earliest version of a struct in the filesystem.
216    async fn flush(&self) -> Result<Version, Error>;
217
218    /// Writes a mutation to the journal.  This allows objects to encrypt or otherwise modify what
219    /// gets written to the journal.
220    fn write_mutation(&self, mutation: &Mutation, mut writer: journal::Writer<'_>) {
221        writer.write(mutation.clone());
222    }
223}
224
225#[derive(Default)]
226pub struct SyncOptions<'a> {
227    /// If set, the journal will be flushed, as well as the underlying block device.  This is much
228    /// more expensive, but ensures the contents of the journal are persisted (which also acts as a
229    /// barrier, ensuring all previous journal writes are observable by future operations).
230    /// Note that when this is not set, the journal is *not* synchronously flushed by the sync call,
231    /// and it will return before the journal flush completes.  In other words, some journal
232    /// mutations may still be buffered in memory after this call returns.
233    pub flush_device: bool,
234
235    /// A precondition that is evaluated whilst a lock is held that determines whether or not the
236    /// sync needs to proceed.
237    pub precondition: Option<Box<dyn FnOnce() -> bool + 'a + Send>>,
238}
239
240pub struct OpenFxFilesystem(Arc<FxFilesystem>);
241
242impl OpenFxFilesystem {
243    /// Waits for filesystem to be dropped (so callers should ensure all direct and indirect
244    /// references are dropped) and returns the device.  No attempt is made at a graceful shutdown.
245    pub async fn take_device(self) -> DeviceHolder {
246        let fut = self.device.take_when_dropped();
247        std::mem::drop(self);
248        debug_assert_not_too_long!(fut)
249    }
250}
251
252impl From<Arc<FxFilesystem>> for OpenFxFilesystem {
253    fn from(fs: Arc<FxFilesystem>) -> Self {
254        Self(fs)
255    }
256}
257
258impl Drop for OpenFxFilesystem {
259    fn drop(&mut self) {
260        if self.options.image_builder_mode.is_some()
261            && self.journal().image_builder_mode().is_some()
262        {
263            error!("OpenFxFilesystem in image_builder_mode dropped without calling close().");
264        }
265        if !self.options.read_only && !self.closed.load(Ordering::SeqCst) {
266            error!("OpenFxFilesystem dropped without first being closed. Data loss may occur.");
267        }
268    }
269}
270
271impl std::ops::Deref for OpenFxFilesystem {
272    type Target = Arc<FxFilesystem>;
273
274    fn deref(&self) -> &Self::Target {
275        &self.0
276    }
277}
278
279pub struct FxFilesystemBuilder {
280    format: bool,
281    trace: bool,
282    options: Options,
283    journal_options: JournalOptions,
284    on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
285    on_new_store: Option<Box<dyn Fn(&ObjectStore) + Send + Sync>>,
286    fsck_after_every_transaction: bool,
287}
288
289impl FxFilesystemBuilder {
290    pub fn new() -> Self {
291        Self {
292            format: false,
293            trace: false,
294            options: Options::default(),
295            journal_options: JournalOptions::default(),
296            on_new_allocator: None,
297            on_new_store: None,
298            fsck_after_every_transaction: false,
299        }
300    }
301
302    /// Sets whether the block device should be formatted when opened. Defaults to `false`.
303    pub fn format(mut self, format: bool) -> Self {
304        self.format = format;
305        self
306    }
307
308    /// Enables or disables trace level logging. Defaults to `false`.
309    pub fn trace(mut self, trace: bool) -> Self {
310        self.trace = trace;
311        self
312    }
313
314    /// Sets whether the filesystem will be opened in read-only mode. Defaults to `false`.
315    /// Incompatible with `format`.
316    pub fn read_only(mut self, read_only: bool) -> Self {
317        self.options.read_only = read_only;
318        self
319    }
320
321    /// For image building and in-place migration.
322    ///
323    /// This mode avoids the initial write of super blocks and skips the journal for all
324    /// transactions. The user *must* call `close()` before dropping the filesystem to trigger
325    /// a compaction of in-memory data structures, a minimal journal and a write to one
326    /// superblock (as specified).
327    pub fn image_builder_mode(mut self, mode: Option<SuperBlockInstance>) -> Self {
328        self.options.image_builder_mode = mode;
329        self
330    }
331
332    /// Sets how often the metadata keys are rolled. See `Options::roll_metadata_key_byte_count`.
333    pub fn roll_metadata_key_byte_count(mut self, roll_metadata_key_byte_count: u64) -> Self {
334        self.options.roll_metadata_key_byte_count = roll_metadata_key_byte_count;
335        self
336    }
337
338    /// Sets a callback that runs before every transaction. See `Options::pre_commit_hook`.
339    pub fn pre_commit_hook(
340        mut self,
341        hook: impl Fn(&Transaction<'_>) -> Result<(), Error> + Send + Sync + 'static,
342    ) -> Self {
343        self.options.pre_commit_hook = Some(Box::new(hook));
344        self
345    }
346
347    /// Sets a callback that runs after every transaction has been committed. See
348    /// `Options::post_commit_hook`.
349    pub fn post_commit_hook(
350        mut self,
351        hook: impl Fn() -> futures::future::BoxFuture<'static, ()> + Send + Sync + 'static,
352    ) -> Self {
353        self.options.post_commit_hook = Some(Box::new(hook));
354        self
355    }
356
357    /// Sets whether to do an initial reap of the graveyard at mount time. See
358    /// `Options::skip_initial_reap`. Defaults to `false`.
359    pub fn skip_initial_reap(mut self, skip_initial_reap: bool) -> Self {
360        self.options.skip_initial_reap = skip_initial_reap;
361        self
362    }
363
364    /// Sets the options for the journal.
365    pub fn journal_options(mut self, journal_options: JournalOptions) -> Self {
366        self.journal_options = journal_options;
367        self
368    }
369
370    /// Sets a method to be called immediately after creating the allocator.
371    pub fn on_new_allocator(
372        mut self,
373        on_new_allocator: impl Fn(Arc<Allocator>) + Send + Sync + 'static,
374    ) -> Self {
375        self.on_new_allocator = Some(Box::new(on_new_allocator));
376        self
377    }
378
379    /// Sets a method to be called each time a new store is registered with `ObjectManager`.
380    pub fn on_new_store(
381        mut self,
382        on_new_store: impl Fn(&ObjectStore) + Send + Sync + 'static,
383    ) -> Self {
384        self.on_new_store = Some(Box::new(on_new_store));
385        self
386    }
387
388    /// Enables or disables running fsck after every transaction. Defaults to `false`.
389    pub fn fsck_after_every_transaction(mut self, fsck_after_every_transaction: bool) -> Self {
390        self.fsck_after_every_transaction = fsck_after_every_transaction;
391        self
392    }
393
394    pub fn trim_config(mut self, delay_and_interval: Option<(Duration, Duration)>) -> Self {
395        self.options.trim_config = delay_and_interval;
396        self
397    }
398
399    pub fn power_manager(mut self, power_manager: Arc<dyn PowerManager>) -> Self {
400        self.options.power_manager = Some(power_manager);
401        self
402    }
403
404    pub fn trim_charger_wait(mut self, wait: Duration) -> Self {
405        self.options.trim_charger_wait = wait;
406        self
407    }
408
409    /// Enables or disables inline encryption. Defaults to `false`.
410    pub fn inline_crypto_enabled(mut self, inline_crypto_enabled: bool) -> Self {
411        self.options.inline_crypto_enabled = inline_crypto_enabled;
412        self
413    }
414
415    /// Enables or disables barriers in both the filesystem and journal options.
416    /// Defaults to `false`.
417    pub fn barriers_enabled(mut self, barriers_enabled: bool) -> Self {
418        self.options.barriers_enabled = barriers_enabled;
419        self.journal_options.barriers_enabled = barriers_enabled;
420        self
421    }
422
423    /// Constructs an `FxFilesystem` object with the specified settings.
424    pub async fn open(self, device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
425        let read_only = self.options.read_only;
426        if self.format && read_only {
427            bail!("Cannot initialize a filesystem as read-only");
428        }
429
430        // Inline encryption requires barriers to be enabled.
431        if self.options.inline_crypto_enabled && !self.options.barriers_enabled {
432            bail!("A filesystem using inline encryption requires barriers");
433        }
434
435        let objects = Arc::new(ObjectManager::new(self.on_new_store));
436        let journal = Arc::new(Journal::new(objects.clone(), self.journal_options));
437
438        let image_builder_mode = self.options.image_builder_mode;
439
440        let block_size = std::cmp::max(device.block_size().into(), MIN_BLOCK_SIZE);
441        assert_eq!(block_size % MIN_BLOCK_SIZE, 0);
442        assert!(block_size <= MAX_BLOCK_SIZE, "Max supported block size is 64KiB");
443
444        let mut fsck_after_every_transaction = None;
445        let mut filesystem_options = self.options;
446        if self.fsck_after_every_transaction {
447            let instance =
448                FsckAfterEveryTransaction::new(filesystem_options.post_commit_hook.take());
449            fsck_after_every_transaction = Some(instance.clone());
450            filesystem_options.post_commit_hook =
451                Some(Box::new(move || Box::pin(instance.clone().run())));
452        }
453
454        if !read_only && !self.format {
455            // See comment in JournalRecord::DidFlushDevice for why we need to flush the device
456            // before replay.
457            device.flush().await.context("Device flush failed")?;
458        }
459
460        let filesystem = Arc::new_cyclic(|weak: &Weak<FxFilesystem>| {
461            let weak = weak.clone();
462            FxFilesystem {
463                device,
464                block_size,
465                objects: objects.clone(),
466                journal,
467                commit_mutex: futures::lock::Mutex::new(()),
468                lock_manager: LockManager::new(),
469                flush_task: Mutex::new(None),
470                background_tasks: fasync::Scope::new(),
471                closed: AtomicBool::new(true),
472                trace: self.trace,
473                graveyard: Graveyard::new(objects.clone()),
474                completed_transactions: metrics::detail().create_uint("completed_transactions", 0),
475                options: filesystem_options,
476                in_flight_transactions: AtomicU64::new(0),
477                transaction_limit_event: Event::new(),
478                _stores_node: metrics::register_fs(move || {
479                    let weak = weak.clone();
480                    Box::pin(async move {
481                        if let Some(fs) = weak.upgrade() {
482                            fs.populate_stores_node().await
483                        } else {
484                            Err(anyhow!("Filesystem has been dropped"))
485                        }
486                    })
487                }),
488            }
489        });
490
491        filesystem.journal().set_image_builder_mode(image_builder_mode);
492
493        filesystem.journal.set_trace(self.trace);
494        if self.format {
495            filesystem.journal.init_empty(filesystem.clone()).await?;
496            if image_builder_mode.is_none() {
497                // The filesystem isn't valid until superblocks are written but we want to defer
498                // that until last when migrating filesystems or building system images.
499                filesystem.journal.init_superblocks().await?;
500
501                // Start the graveyard's background reaping task.
502                filesystem.graveyard.clone().reap_async();
503            }
504
505            // Create the root volume directory.
506            let root_store = filesystem.root_store();
507            root_store.set_trace(self.trace);
508            let root_directory =
509                Directory::open(&root_store, root_store.root_directory_object_id())
510                    .await
511                    .context("Unable to open root volume directory")?;
512            let mut transaction = root_store
513                .new_transaction(
514                    lock_keys![LockKey::object(
515                        root_store.store_object_id(),
516                        root_directory.object_id()
517                    )],
518                    transaction::Options::default(),
519                )
520                .await?;
521            let volume_directory =
522                root_directory.create_child_dir(&mut transaction, VOLUMES_DIRECTORY).await?;
523            transaction.commit().await?;
524            objects.set_volume_directory(volume_directory);
525        } else {
526            filesystem
527                .journal
528                .replay(filesystem.clone(), self.on_new_allocator)
529                .await
530                .context("Journal replay failed")?;
531            filesystem.root_store().set_trace(self.trace);
532
533            if !read_only {
534                // Queue all purged entries for tombstoning.  Don't start the reaper yet because
535                // that can trigger a flush which can add more entries to the graveyard which might
536                // get caught in the initial reap and cause objects to be prematurely tombstoned.
537                for store in objects.unlocked_stores() {
538                    filesystem.graveyard.initial_reap(&store).await?;
539                }
540            }
541        }
542
543        // This must be after we've formatted the filesystem; it will fail during format otherwise.
544        if let Some(fsck_after_every_transaction) = fsck_after_every_transaction {
545            fsck_after_every_transaction
546                .fs
547                .set(Arc::downgrade(&filesystem))
548                .unwrap_or_else(|_| unreachable!());
549        }
550
551        filesystem.closed.store(false, Ordering::SeqCst);
552
553        if !read_only && image_builder_mode.is_none() {
554            // Start the background tasks.
555            filesystem.graveyard.clone().reap_async();
556
557            if filesystem.options.trim_config.is_some() {
558                filesystem.start_trim_task();
559            }
560            filesystem.start_clean_transfer_buffer_task();
561        }
562
563        Ok(filesystem.into())
564    }
565}
566
567pub struct FxFilesystem {
568    block_size: u64,
569    objects: Arc<ObjectManager>,
570    journal: Arc<Journal>,
571    commit_mutex: futures::lock::Mutex<()>,
572    lock_manager: LockManager,
573    flush_task: Mutex<Option<fasync::Task<()>>>,
574    background_tasks: fasync::Scope,
575    closed: AtomicBool,
576    // An event that is signalled when the filesystem starts to shut down.
577    trace: bool,
578    graveyard: Arc<Graveyard>,
579    completed_transactions: UintProperty,
580    options: Options,
581
582    // The number of in-flight transactions which we will limit to MAX_IN_FLIGHT_TRANSACTIONS.
583    in_flight_transactions: AtomicU64,
584
585    // An event that is used to wake up tasks that are blocked due to the in-flight transaction
586    // limit.
587    transaction_limit_event: Event,
588
589    // NOTE: This *must* go last so that when users take the device from a closed filesystem, the
590    // filesystem has dropped all other members first (Rust drops members in declaration order).
591    device: DeviceHolder,
592
593    // The "stores" node in the Inspect tree.
594    _stores_node: LazyNode,
595}
596
597#[fxfs_trace::trace]
598impl FxFilesystem {
599    pub async fn new_empty(device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
600        FxFilesystemBuilder::new().format(true).open(device).await
601    }
602
603    pub async fn open(device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
604        FxFilesystemBuilder::new().open(device).await
605    }
606
607    pub fn root_parent_store(&self) -> Arc<ObjectStore> {
608        self.objects.root_parent_store()
609    }
610
611    pub async fn close(&self) -> Result<(), Error> {
612        if self.journal().image_builder_mode().is_some() {
613            self.journal().allocate_journal().await?;
614            self.journal().set_image_builder_mode(None);
615            self.journal().force_compact().await?;
616        }
617        assert_eq!(self.closed.swap(true, Ordering::SeqCst), false);
618        debug_assert_not_too_long!(self.graveyard.wait_for_reap());
619        debug_assert_not_too_long!(self.background_tasks.clone().cancel());
620        self.journal.stop_compactions().await;
621        let sync_status =
622            if self.journal().image_builder_mode().is_some() || self.options().read_only {
623                Ok(None)
624            } else {
625                self.journal.sync(SyncOptions { flush_device: true, ..Default::default() }).await
626            };
627        match &sync_status {
628            Ok(None) => {}
629            Ok(checkpoint) => info!(
630                "Filesystem closed (checkpoint={}, metadata_reservation={:?}, \
631                 reservation_required={}, borrowed={})",
632                checkpoint.as_ref().unwrap().0.file_offset,
633                self.object_manager().metadata_reservation(),
634                self.object_manager().required_reservation(),
635                self.object_manager().borrowed_metadata_space(),
636            ),
637            Err(e) => error!(error:? = e; "Failed to sync filesystem; data may be lost"),
638        }
639        self.journal.terminate();
640        let flush_task = self.flush_task.lock().take();
641        if let Some(task) = flush_task {
642            debug_assert_not_too_long!(task);
643        }
644        // Regardless of whether sync succeeds, we should close the device, since otherwise we will
645        // crash instead of exiting gracefully.
646        self.device().close().await.context("Failed to close device")?;
647        sync_status.map(|_| ())
648    }
649
650    pub fn device(&self) -> Arc<dyn Device> {
651        Arc::clone(&self.device)
652    }
653
654    pub fn root_store(&self) -> Arc<ObjectStore> {
655        self.objects.root_store()
656    }
657
658    pub fn allocator(&self) -> Arc<Allocator> {
659        self.objects.allocator()
660    }
661
662    /// Enables allocations for the allocator.
663    /// This is only used in image_builder_mode where it *must*
664    /// be called before any allocations can take place.
665    pub fn enable_allocations(&self) {
666        self.allocator().enable_allocations();
667    }
668
669    pub fn object_manager(&self) -> &Arc<ObjectManager> {
670        &self.objects
671    }
672
673    pub fn journal(&self) -> &Arc<Journal> {
674        &self.journal
675    }
676
677    pub async fn sync(&self, options: SyncOptions<'_>) -> Result<(), Error> {
678        self.journal.sync(options).await.map(|_| ())
679    }
680
681    pub fn block_size(&self) -> u64 {
682        self.block_size
683    }
684
685    pub fn get_info(&self) -> Info {
686        Info {
687            total_bytes: self.device.size(),
688            used_bytes: self.object_manager().allocator().get_used_bytes().0,
689        }
690    }
691
692    pub fn super_block_header(&self) -> SuperBlockHeader {
693        self.journal.super_block_header()
694    }
695
696    pub fn graveyard(&self) -> &Arc<Graveyard> {
697        &self.graveyard
698    }
699
700    pub fn trace(&self) -> bool {
701        self.trace
702    }
703
704    pub fn options(&self) -> &Options {
705        &self.options
706    }
707
708    /// Returns a guard that must be taken before any transaction can commence.  This guard takes a
709    /// shared lock on the filesystem.  `fsck` will take an exclusive lock so that it can get a
710    /// consistent picture of the filesystem that it can verify.  It is important that this lock is
711    /// acquired before *all* other locks.  It is also important that this lock is not taken twice
712    /// by the same task since that can lead to deadlocks if another task tries to take a write
713    /// lock.
714    pub async fn lock_commits(&self) -> futures::lock::MutexGuard<'_, ()> {
715        self.commit_mutex.lock().await
716    }
717
718    #[trace]
719    pub async fn commit_transaction<R: Send>(
720        &self,
721        transaction: &mut Transaction<'_>,
722        callback: impl FnOnce(u64) -> R + Send,
723    ) -> Result<R, Error> {
724        if let Some(hook) = self.options.pre_commit_hook.as_ref() {
725            hook(transaction)?;
726        }
727        debug_assert_not_too_long!(self.lock_manager.commit_prepare(&transaction));
728
729        // Call prepare_commit on all unique objects involved in the transaction.
730        // We must hold the returned guards until the transaction is committed.
731        // Since transaction.mutations() is sorted by object_id, we can deduplicate
732        // on-the-fly.
733        let mut guards = Vec::new();
734        let mut last_object_id = 0;
735        for mutation in transaction.mutations() {
736            let object_id = mutation.object_id;
737
738            // We don't need to prepare commits (which reserves keys) for flush mutations.
739            if matches!(mutation.mutation, Mutation::BeginFlush | Mutation::EndFlush) {
740                continue;
741            }
742
743            if object_id == last_object_id {
744                continue;
745            }
746            assert!(object_id > last_object_id);
747            last_object_id = object_id;
748
749            if let Some(obj) = self.object_manager().journaling_object(object_id) {
750                if let Some(guard) = obj.prepare_commit(self, transaction).await? {
751                    guards.push(guard);
752                }
753            }
754        }
755
756        self.maybe_start_flush_task();
757        let _guard = debug_assert_not_too_long!(self.commit_mutex.lock());
758        let journal_offset = if self.journal().image_builder_mode().is_some() {
759            let journal_checkpoint =
760                JournalCheckpoint { file_offset: 0, checksum: 0, version: LATEST_VERSION };
761            let maybe_mutation = self
762                .object_manager()
763                .apply_transaction(transaction, &journal_checkpoint)
764                .expect("Transactions must not fail in image_builder_mode");
765            if let Some(mutation) = maybe_mutation {
766                assert!(matches!(mutation, Mutation::UpdateBorrowed(_)));
767                // These are Mutation::UpdateBorrowed which are normally used to track borrowing of
768                // metadata reservations. As we are image-building and not using the journal,
769                // we don't track this.
770            }
771            self.object_manager().did_commit_transaction(transaction, &journal_checkpoint, 0);
772            0
773        } else {
774            self.journal.commit(transaction).await?
775        };
776
777        std::mem::drop(guards);
778        self.completed_transactions.add(1);
779
780        // For now, call the callback whilst holding the lock.  Technically, we don't need to do
781        // that except if there's a post-commit-hook (which there usually won't be).  We can
782        // consider changing this if we need to for performance, but we'd need to double check that
783        // callers don't depend on this.
784        let result = callback(journal_offset);
785
786        if let Some(hook) = self.options.post_commit_hook.as_ref() {
787            hook().await;
788        }
789
790        Ok(result)
791    }
792
793    pub fn lock_manager(&self) -> &LockManager {
794        &self.lock_manager
795    }
796
797    pub(crate) fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
798        if !matches!(transaction.metadata_reservation, MetadataReservation::None) {
799            self.sub_transaction();
800        }
801        // If we placed a hold for metadata space, return it now.
802        if let MetadataReservation::Hold(hold_amount) =
803            std::mem::replace(&mut transaction.metadata_reservation, MetadataReservation::None)
804        {
805            let hold = transaction
806                .allocator_reservation
807                .unwrap()
808                .reserve(0)
809                .expect("Zero should always succeed.");
810            hold.add(hold_amount);
811        }
812        self.objects.drop_transaction(transaction);
813        self.lock_manager.drop_transaction(transaction);
814    }
815
816    fn maybe_start_flush_task(&self) {
817        if self.journal.image_builder_mode().is_some() {
818            return;
819        }
820        let mut flush_task = self.flush_task.lock();
821        if flush_task.is_none() {
822            let journal = self.journal.clone();
823            *flush_task = Some(fasync::Task::spawn(
824                journal.flush_task().trace(trace_future_args!("Journal::flush_task")),
825            ));
826        }
827    }
828
829    fn start_trim_task(self: &Arc<Self>) {
830        if !self.device.supports_trim() {
831            info!("Device does not support trim; not scheduling trimming");
832            return;
833        }
834        let this = self.clone();
835        self.background_tasks
836            .spawn(this.trim_task().trace(trace_future_args!("Filesystem::trim_task")));
837    }
838
839    async fn trim_task(self: Arc<Self>) {
840        // This task will be cancelled when the filesystem is closed.
841        let Some((mut next_timer, _)) = self.options.trim_config else { return };
842        loop {
843            fasync::Timer::new(next_timer.clone()).await;
844
845            // The timer has fired indicating a trim is now due.  If we have a power manager, we
846            // now check to see if there's an external power source.
847            let start = Instant::now();
848            let result = if let Some(pm) = &self.options.power_manager {
849                let mut watcher = pm.clone().watch_battery();
850
851                // The pauser starts paused.
852                let pauser = Pauser::new(self.options.trim_charger_wait);
853
854                let mut pause_future = pin!(
855                    async {
856                        let mut wake_lease = WakeLease::invalid();
857                        loop {
858                            let Some((using_battery, new_lease)) = watcher.next_latest().await
859                            else {
860                                // If we lose the connection to the watcher, unpause and do not
861                                // worry about monitoring the power source.
862                                pauser.set_pause(false);
863                                drop(wake_lease); // Silence the compiler warnings.
864                                return;
865                            };
866
867                            // Pause if the device is using battery.
868                            pauser.set_pause(using_battery);
869
870                            // Hold onto a wake lease if we are using an external power source (and
871                            // we are therefore unpaused).
872                            if using_battery {
873                                wake_lease = WakeLease::invalid();
874                            } else if !new_lease.is_invalid() {
875                                wake_lease = new_lease;
876                            }
877                        }
878                    }
879                    .fuse()
880                );
881
882                let mut do_trim = pin!(self.do_trim(Some(&pauser)).fuse());
883
884                loop {
885                    futures::select! {
886                        _ = pause_future => {}
887                        result = do_trim => break result,
888                    }
889                }
890
891                // Now that trim has completed, we don't need to watch the power source any more, so
892                // we just drop the pauser and the future monitoring the power source.
893            } else {
894                self.do_trim(None).await
895            };
896
897            let duration = start.elapsed();
898            match result {
899                Ok(bytes_trimmed) => info!(
900                    "Trimmed {bytes_trimmed} bytes in {duration:?}.  Next trim in \
901                     {next_timer:?}",
902                ),
903                Err(error) => error!(error:?; "Failed to trim"),
904            }
905
906            let Some((_, interval)) = self.options.trim_config else { return };
907            next_timer = interval;
908            if next_timer.is_zero() {
909                fasync::yield_now().await;
910            }
911        }
912    }
913
914    // Returns the number of bytes trimmed.
915    async fn do_trim(&self, pauser: Option<&Pauser>) -> Result<usize, Error> {
916        const MAX_EXTENTS_PER_BATCH: usize = 8;
917        const MAX_EXTENT_SIZE: usize = 256 * 1024;
918        let mut offset = 0;
919        let mut bytes_trimmed = 0;
920        loop {
921            let allocator = self.allocator();
922            if let Some(pauser) = pauser {
923                pauser.maybe_pause().await;
924            }
925            let trimmable_extents =
926                allocator.take_for_trimming(offset, MAX_EXTENT_SIZE, MAX_EXTENTS_PER_BATCH).await?;
927            for device_range in trimmable_extents.extents() {
928                self.device.trim(device_range.clone()).await?;
929                bytes_trimmed += device_range.length()? as usize;
930            }
931            if let Some(device_range) = trimmable_extents.extents().last() {
932                offset = device_range.end;
933            } else {
934                break;
935            }
936        }
937        Ok(bytes_trimmed)
938    }
939
940    fn start_clean_transfer_buffer_task(self: &Arc<Self>) {
941        let this = self.clone();
942        self.background_tasks.spawn(
943            async move {
944                loop {
945                    fasync::Timer::new(CLEAN_TRANSFER_BUFFER_INTERVAL).await;
946                    this.device().clean_transfer_buffer();
947                }
948            }
949            .trace(trace_future_args!("Filesystem::clean_transfer_buffer_task")),
950        );
951    }
952
953    pub(crate) async fn reservation_for_transaction<'a>(
954        self: &Arc<Self>,
955        options: transaction::Options<'a>,
956    ) -> Result<(MetadataReservation, Option<&'a Reservation>, Option<Hold<'a>>), Error> {
957        if self.options.image_builder_mode.is_some() {
958            // Image builder mode avoids the journal so reservation tracking for metadata overheads
959            // doesn't make sense and so we essentially have 'all or nothing' semantics instead.
960            return Ok((MetadataReservation::Borrowed, None, None));
961        }
962        if !options.skip_journal_checks {
963            self.maybe_start_flush_task();
964            self.journal.check_journal_space().await?;
965        }
966
967        // We support three options for metadata space reservation:
968        //
969        //   1. We can borrow from the filesystem's metadata reservation.  This should only be
970        //      be used on the understanding that eventually, potentially after a full compaction,
971        //      there should be no net increase in space used.  For example, unlinking an object
972        //      should eventually decrease the amount of space used and setting most attributes
973        //      should not result in any change.
974        //
975        //   2. A reservation is provided in which case we'll place a hold on some of it for
976        //      metadata.
977        //
978        //   3. No reservation is supplied, so we try and reserve space with the allocator now,
979        //      and will return NoSpace if that fails.
980        let mut hold = None;
981        let metadata_reservation = if options.borrow_metadata_space {
982            MetadataReservation::Borrowed
983        } else {
984            match options.allocator_reservation {
985                Some(reservation) => {
986                    hold = Some(
987                        reservation
988                            .reserve(TRANSACTION_METADATA_MAX_AMOUNT)
989                            .ok_or(FxfsError::NoSpace)?,
990                    );
991                    MetadataReservation::Hold(TRANSACTION_METADATA_MAX_AMOUNT)
992                }
993                None => {
994                    let reservation = self
995                        .allocator()
996                        .reserve(None, TRANSACTION_METADATA_MAX_AMOUNT)
997                        .ok_or(FxfsError::NoSpace)?;
998                    MetadataReservation::Reservation(reservation)
999                }
1000            }
1001        };
1002        Ok((metadata_reservation, options.allocator_reservation, hold))
1003    }
1004
1005    pub(crate) async fn add_transaction(&self, skip_journal_checks: bool) {
1006        if skip_journal_checks {
1007            self.in_flight_transactions.fetch_add(1, Ordering::Relaxed);
1008        } else {
1009            let inc = || {
1010                let mut in_flights = self.in_flight_transactions.load(Ordering::Relaxed);
1011                while in_flights < MAX_IN_FLIGHT_TRANSACTIONS {
1012                    match self.in_flight_transactions.compare_exchange_weak(
1013                        in_flights,
1014                        in_flights + 1,
1015                        Ordering::Relaxed,
1016                        Ordering::Relaxed,
1017                    ) {
1018                        Ok(_) => return true,
1019                        Err(x) => in_flights = x,
1020                    }
1021                }
1022                return false;
1023            };
1024            while !inc() {
1025                let listener = self.transaction_limit_event.listen();
1026                if inc() {
1027                    break;
1028                }
1029                listener.await;
1030            }
1031        }
1032    }
1033
1034    pub(crate) fn sub_transaction(&self) {
1035        let old = self.in_flight_transactions.fetch_sub(1, Ordering::Relaxed);
1036        assert!(old != 0);
1037        if old <= MAX_IN_FLIGHT_TRANSACTIONS {
1038            self.transaction_limit_event.notify(usize::MAX);
1039        }
1040    }
1041
1042    pub async fn truncate_guard(&self, store_id: u64, object_id: u64) -> TruncateGuard<'_> {
1043        let keys = lock_keys![LockKey::truncate(store_id, object_id,)];
1044        TruncateGuard(self.lock_manager().write_lock(keys).await)
1045    }
1046
1047    async fn populate_stores_node(&self) -> Result<Inspector, Error> {
1048        let inspector = fuchsia_inspect::Inspector::default();
1049        let root = inspector.root();
1050        root.record_child("__root", |n| self.root_store().record_data(n));
1051        root.record_child("__root_parent", |n| self.root_parent_store().record_data(n));
1052        let object_manager = self.object_manager();
1053        let volume_directory = object_manager.volume_directory();
1054        let layer_set = volume_directory.store().tree().layer_set();
1055        let mut merger = layer_set.merger();
1056        let mut iter = volume_directory.iter(&mut merger).await?;
1057        while let Some((name, id, _)) = iter.get() {
1058            if let Some(store) = object_manager.store(id) {
1059                root.record_child(name.to_string(), |n| store.record_data(n));
1060            }
1061            iter.advance().await?;
1062        }
1063        Ok(inspector)
1064    }
1065}
1066
1067/// A wrapper around a guard that needs to be taken when truncating an object.
1068#[allow(dead_code)]
1069pub struct TruncateGuard<'a>(WriteGuard<'a>);
1070
1071/// Helper method for making a new filesystem.
1072pub async fn mkfs(device: DeviceHolder) -> Result<DeviceHolder, Error> {
1073    let fs = FxFilesystem::new_empty(device).await?;
1074    fs.close().await?;
1075    Ok(fs.take_device().await)
1076}
1077
1078/// Helper method for making a new filesystem with a single named volume.
1079/// This shouldn't be used in production; instead volumes should be created with the Volumes
1080/// protocol.
1081pub async fn mkfs_with_volume(
1082    device: DeviceHolder,
1083    volume_name: &str,
1084    crypt: Option<Arc<dyn Crypt>>,
1085) -> Result<DeviceHolder, Error> {
1086    let fs = FxFilesystem::new_empty(device).await?;
1087    {
1088        // expect instead of propagating errors here, since otherwise we could drop |fs| before
1089        // close is called, which leads to confusing and unrelated error messages.
1090        let root_volume = root_volume(fs.clone()).await.expect("Open root_volume failed");
1091        root_volume
1092            .new_volume(
1093                volume_name,
1094                NewChildStoreOptions {
1095                    options: StoreOptions { crypt, ..StoreOptions::default() },
1096                    ..Default::default()
1097                },
1098            )
1099            .await
1100            .expect("Create volume failed");
1101    }
1102    fs.close().await?;
1103    Ok(fs.take_device().await)
1104}
1105
1106struct FsckAfterEveryTransaction {
1107    fs: OnceLock<Weak<FxFilesystem>>,
1108    old_hook: PostCommitHook,
1109}
1110
1111impl FsckAfterEveryTransaction {
1112    fn new(old_hook: PostCommitHook) -> Arc<Self> {
1113        Arc::new(Self { fs: OnceLock::new(), old_hook })
1114    }
1115
1116    async fn run(self: Arc<Self>) {
1117        if let Some(fs) = self.fs.get().and_then(Weak::upgrade) {
1118            let options = FsckOptions {
1119                fail_on_warning: true,
1120                no_lock: true,
1121                quiet: true,
1122                ..Default::default()
1123            };
1124            fsck_with_options(fs.clone(), &options).await.expect("fsck failed");
1125            let object_manager = fs.object_manager();
1126            for store in object_manager.unlocked_stores() {
1127                let store_id = store.store_object_id();
1128                if !object_manager.is_system_store(store_id) {
1129                    fsck_volume_with_options(fs.as_ref(), &options, store_id, None)
1130                        .await
1131                        .expect("fsck_volume_with_options failed");
1132                }
1133            }
1134        }
1135        if let Some(old_hook) = self.old_hook.as_ref() {
1136            old_hook().await;
1137        }
1138    }
1139}
1140
1141struct Pauser {
1142    pause: Condition<bool>,
1143    bounce_delay: Duration,
1144}
1145
1146impl Pauser {
1147    /// Returns a new Pauser which starts paused.
1148    fn new(bounce_delay: Duration) -> Self {
1149        Self { pause: Condition::new(true), bounce_delay }
1150    }
1151
1152    async fn maybe_pause(&self) {
1153        loop {
1154            if !*self.pause.lock() {
1155                return;
1156            }
1157            self.pause.when(|p| if **p { Poll::Pending } else { Poll::Ready(()) }).await;
1158            fasync::Timer::new(self.bounce_delay).await;
1159        }
1160    }
1161
1162    fn set_pause(&self, v: bool) {
1163        let mut guard = self.pause.lock();
1164        if *guard == v {
1165            return;
1166        }
1167        *guard = v;
1168        for waker in guard.drain_wakers() {
1169            waker.wake();
1170        }
1171    }
1172}
1173
1174trait NextLatest: Stream + Unpin {
1175    /// Gets the next item from the stream, but if multiple items are ready, returns the latest.
1176    async fn next_latest(&mut self) -> Option<Self::Item> {
1177        let Some(mut next) = self.next().await else { return None };
1178
1179        // Coalesce with any subsequent items that are ready.
1180        loop {
1181            match self.next().now_or_never() {
1182                None => return Some(next),
1183                Some(None) => return None,
1184                Some(Some(n)) => next = n,
1185            }
1186        }
1187    }
1188}
1189
1190impl<T: ?Sized + Unpin> NextLatest for T where T: Stream {}
1191
1192#[cfg(test)]
1193mod tests {
1194    use super::{FxFilesystem, FxFilesystemBuilder, FxfsError, SyncOptions};
1195    use crate::fsck::{fsck, fsck_volume};
1196    use crate::log::*;
1197    use crate::lsm_tree::Operation;
1198    use crate::lsm_tree::types::Item;
1199    use crate::object_handle::{
1200        INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle, WriteObjectHandle,
1201    };
1202    use crate::object_store::directory::{Directory, replace_child};
1203    use crate::object_store::journal::JournalOptions;
1204    use crate::object_store::journal::super_block::SuperBlockInstance;
1205    use crate::object_store::transaction::{LockKey, Options, lock_keys};
1206    use crate::object_store::volume::root_volume;
1207    use crate::object_store::{
1208        HandleOptions, NewChildStoreOptions, ObjectDescriptor, ObjectStore, StoreOptions,
1209    };
1210    use crate::range::RangeExt;
1211    use fuchsia_async as fasync;
1212    use fuchsia_sync::Mutex;
1213    use futures::future::join_all;
1214    use futures::stream::{FuturesUnordered, TryStreamExt};
1215    use fxfs_insecure_crypto::new_insecure_crypt;
1216    use rustc_hash::FxHashMap as HashMap;
1217    use std::ops::Range;
1218    use std::sync::Arc;
1219    use std::sync::atomic::{AtomicU32, Ordering};
1220    use std::time::Duration;
1221    use storage_device::DeviceHolder;
1222    use storage_device::fake_device::{self, FakeDevice};
1223    use test_case::test_case;
1224
1225    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
1226
1227    #[fuchsia::test(threads = 10)]
1228    async fn test_compaction() {
1229        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1230
1231        // If compaction is not working correctly, this test will run out of space.
1232        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1233        let root_store = fs.root_store();
1234        let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1235            .await
1236            .expect("open failed");
1237
1238        let mut tasks = Vec::new();
1239        for i in 0..2 {
1240            let mut transaction = fs
1241                .root_store()
1242                .new_transaction(
1243                    lock_keys![LockKey::object(
1244                        root_store.store_object_id(),
1245                        root_directory.object_id()
1246                    )],
1247                    Options::default(),
1248                )
1249                .await
1250                .expect("new_transaction failed");
1251            let handle = root_directory
1252                .create_child_file(&mut transaction, &format!("{}", i))
1253                .await
1254                .expect("create_child_file failed");
1255            transaction.commit().await.expect("commit failed");
1256            tasks.push(fasync::Task::spawn(async move {
1257                const TEST_DATA: &[u8] = b"hello";
1258                let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
1259                buf.as_mut_slice().copy_from_slice(TEST_DATA);
1260                for _ in 0..1500 {
1261                    handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1262                }
1263            }));
1264        }
1265        join_all(tasks).await;
1266        fs.sync(SyncOptions::default()).await.expect("sync failed");
1267
1268        fsck(fs.clone()).await.expect("fsck failed");
1269        fs.close().await.expect("Close failed");
1270    }
1271
1272    #[fuchsia::test]
1273    async fn test_enable_allocations() {
1274        // 1. enable_allocations() has no impact if image_builder_mode is not used.
1275        {
1276            let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1277            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1278            fs.enable_allocations();
1279            let root_store = fs.root_store();
1280            let root_directory =
1281                Directory::open(&root_store, root_store.root_directory_object_id())
1282                    .await
1283                    .expect("open failed");
1284            let mut transaction = fs
1285                .root_store()
1286                .new_transaction(
1287                    lock_keys![LockKey::object(
1288                        root_store.store_object_id(),
1289                        root_directory.object_id()
1290                    )],
1291                    Options::default(),
1292                )
1293                .await
1294                .expect("new_transaction failed");
1295            root_directory
1296                .create_child_file(&mut transaction, "test")
1297                .await
1298                .expect("create_child_file failed");
1299            transaction.commit().await.expect("commit failed");
1300            fs.close().await.expect("close failed");
1301        }
1302
1303        // 2. Allocations blow up if done before this call (in image_builder_mode), but work after
1304        {
1305            let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1306            let fs = FxFilesystemBuilder::new()
1307                .format(true)
1308                .image_builder_mode(Some(SuperBlockInstance::A))
1309                .open(device)
1310                .await
1311                .expect("open failed");
1312            let root_store = fs.root_store();
1313            let root_directory =
1314                Directory::open(&root_store, root_store.root_directory_object_id())
1315                    .await
1316                    .expect("open failed");
1317
1318            let mut transaction = fs
1319                .root_store()
1320                .new_transaction(
1321                    lock_keys![LockKey::object(
1322                        root_store.store_object_id(),
1323                        root_directory.object_id()
1324                    )],
1325                    Options::default(),
1326                )
1327                .await
1328                .expect("new_transaction failed");
1329            let handle = root_directory
1330                .create_child_file(&mut transaction, "test_fail")
1331                .await
1332                .expect("create_child_file failed");
1333            transaction.commit().await.expect("commit failed");
1334
1335            // Allocations should fail before enable_allocations()
1336            assert!(
1337                FxfsError::Unavailable
1338                    .matches(&handle.allocate(0..4096).await.expect_err("allocate should fail"))
1339            );
1340
1341            // Allocations should work after enable_allocations()
1342            fs.enable_allocations();
1343            handle.allocate(0..4096).await.expect("allocate should work after enable_allocations");
1344
1345            // 3. finalize() works regardless of whether enable_allocations() is called.
1346            // (We already called it above, so this verifies it works after it was called).
1347
1348            fs.close().await.expect("close failed");
1349        }
1350        // TODO(https://fxbug.dev/467401079): Add a failure test where we close without
1351        // enabling allocations. (Trivial to do, but causes error logs, which are interpreted as
1352        // test failures and only seem controllable at the BUILD target level).
1353    }
1354
1355    #[fuchsia::test(threads = 10)]
1356    async fn test_replay_is_identical() {
1357        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1358        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1359
1360        // Reopen the store, but set reclaim size to a very large value which will effectively
1361        // stop the journal from flushing and allows us to track all the mutations to the store.
1362        fs.close().await.expect("close failed");
1363        let device = fs.take_device().await;
1364        device.reopen(false);
1365
1366        struct Mutations<K, V>(Mutex<Vec<(Operation, Item<K, V>)>>);
1367
1368        impl<K: Clone, V: Clone> Mutations<K, V> {
1369            fn new() -> Self {
1370                Mutations(Mutex::new(Vec::new()))
1371            }
1372
1373            fn push(&self, operation: Operation, item: &Item<K, V>) {
1374                self.0.lock().push((operation, item.clone()));
1375            }
1376        }
1377
1378        let open_fs = |device,
1379                       object_mutations: Arc<Mutex<HashMap<_, _>>>,
1380                       allocator_mutations: Arc<Mutations<_, _>>| async {
1381            FxFilesystemBuilder::new()
1382                .journal_options(JournalOptions { reclaim_size: u64::MAX, ..Default::default() })
1383                .on_new_allocator(move |allocator| {
1384                    let allocator_mutations = allocator_mutations.clone();
1385                    allocator.tree().set_mutation_callback(Some(Box::new(move |op, item| {
1386                        allocator_mutations.push(op, item)
1387                    })));
1388                })
1389                .on_new_store(move |store| {
1390                    let mutations = Arc::new(Mutations::new());
1391                    object_mutations.lock().insert(store.store_object_id(), mutations.clone());
1392                    store.tree().set_mutation_callback(Some(Box::new(move |op, item| {
1393                        mutations.push(op, item)
1394                    })));
1395                })
1396                .open(device)
1397                .await
1398                .expect("open failed")
1399        };
1400
1401        let allocator_mutations = Arc::new(Mutations::new());
1402        let object_mutations = Arc::new(Mutex::new(HashMap::default()));
1403        let fs = open_fs(device, object_mutations.clone(), allocator_mutations.clone()).await;
1404
1405        let root_store = fs.root_store();
1406        let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1407            .await
1408            .expect("open failed");
1409
1410        let mut transaction = fs
1411            .root_store()
1412            .new_transaction(
1413                lock_keys![LockKey::object(
1414                    root_store.store_object_id(),
1415                    root_directory.object_id()
1416                )],
1417                Options::default(),
1418            )
1419            .await
1420            .expect("new_transaction failed");
1421        let object = root_directory
1422            .create_child_file(&mut transaction, "test")
1423            .await
1424            .expect("create_child_file failed");
1425        transaction.commit().await.expect("commit failed");
1426
1427        // Append some data.
1428        let buf = object.allocate_buffer(10000).await;
1429        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1430
1431        // Overwrite some data.
1432        object.write_or_append(Some(5000), buf.as_ref()).await.expect("write failed");
1433
1434        // Truncate.
1435        object.truncate(3000).await.expect("truncate failed");
1436
1437        // Delete the object.
1438        let mut transaction = fs
1439            .root_store()
1440            .new_transaction(
1441                lock_keys![
1442                    LockKey::object(root_store.store_object_id(), root_directory.object_id()),
1443                    LockKey::object(root_store.store_object_id(), object.object_id()),
1444                ],
1445                Options::default(),
1446            )
1447            .await
1448            .expect("new_transaction failed");
1449
1450        replace_child(&mut transaction, None, (&root_directory, "test"))
1451            .await
1452            .expect("replace_child failed");
1453
1454        transaction.commit().await.expect("commit failed");
1455
1456        // Finally tombstone the object.
1457        root_store
1458            .tombstone_object(object.object_id(), Options::default())
1459            .await
1460            .expect("tombstone failed");
1461
1462        // Now reopen and check that replay produces the same set of mutations.
1463        fs.close().await.expect("close failed");
1464
1465        let metadata_reservation_amount = fs.object_manager().metadata_reservation().amount();
1466
1467        let device = fs.take_device().await;
1468        device.reopen(false);
1469
1470        let replayed_object_mutations = Arc::new(Mutex::new(HashMap::default()));
1471        let replayed_allocator_mutations = Arc::new(Mutations::new());
1472        let fs = open_fs(
1473            device,
1474            replayed_object_mutations.clone(),
1475            replayed_allocator_mutations.clone(),
1476        )
1477        .await;
1478
1479        let m1 = object_mutations.lock();
1480        let m2 = replayed_object_mutations.lock();
1481        assert_eq!(m1.len(), m2.len());
1482        for (store_id, mutations) in &*m1 {
1483            let mutations = mutations.0.lock();
1484            let replayed = m2.get(&store_id).expect("Found unexpected store").0.lock();
1485            assert_eq!(mutations.len(), replayed.len());
1486            for ((op1, i1), (op2, i2)) in mutations.iter().zip(replayed.iter()) {
1487                assert_eq!(op1, op2);
1488                assert_eq!(i1.key, i2.key);
1489                assert_eq!(i1.value, i2.value);
1490            }
1491        }
1492
1493        let a1 = allocator_mutations.0.lock();
1494        let a2 = replayed_allocator_mutations.0.lock();
1495        assert_eq!(a1.len(), a2.len());
1496        for ((op1, i1), (op2, i2)) in a1.iter().zip(a2.iter()) {
1497            assert_eq!(op1, op2);
1498            assert_eq!(i1.key, i2.key);
1499            assert_eq!(i1.value, i2.value);
1500        }
1501
1502        assert_eq!(
1503            fs.object_manager().metadata_reservation().amount(),
1504            metadata_reservation_amount
1505        );
1506    }
1507
1508    #[fuchsia::test]
1509    async fn test_max_in_flight_transactions() {
1510        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1511        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1512
1513        let store = fs.root_store();
1514        let transactions = FuturesUnordered::new();
1515        for _ in 0..super::MAX_IN_FLIGHT_TRANSACTIONS {
1516            transactions.push(store.new_transaction(lock_keys![], Options::default()));
1517        }
1518        let mut transactions: Vec<_> = transactions.try_collect().await.unwrap();
1519
1520        // Trying to create another one should be blocked.
1521        let mut fut = std::pin::pin!(store.new_transaction(lock_keys![], Options::default()));
1522        assert!(futures::poll!(&mut fut).is_pending());
1523
1524        // Dropping one should allow it to proceed.
1525        transactions.pop();
1526
1527        assert!(futures::poll!(&mut fut).is_ready());
1528    }
1529
1530    // If run on a single thread, the trim tasks starve out other work.
1531    #[fuchsia::test(threads = 10)]
1532    async fn test_continuously_trim() {
1533        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1534        let fs = FxFilesystemBuilder::new()
1535            .trim_config(Some((Duration::ZERO, Duration::ZERO)))
1536            .format(true)
1537            .open(device)
1538            .await
1539            .expect("open failed");
1540        // Do a small sleep so trim has time to get going.
1541        fasync::Timer::new(Duration::from_millis(10)).await;
1542
1543        // Create and delete a bunch of files whilst trim is ongoing.  This just ensures that
1544        // regular usage isn't affected by trim.
1545        let root_store = fs.root_store();
1546        let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1547            .await
1548            .expect("open failed");
1549        for _ in 0..100 {
1550            let mut transaction = fs
1551                .root_store()
1552                .new_transaction(
1553                    lock_keys![LockKey::object(
1554                        root_store.store_object_id(),
1555                        root_directory.object_id()
1556                    )],
1557                    Options::default(),
1558                )
1559                .await
1560                .expect("new_transaction failed");
1561            let object = root_directory
1562                .create_child_file(&mut transaction, "test")
1563                .await
1564                .expect("create_child_file failed");
1565            transaction.commit().await.expect("commit failed");
1566
1567            {
1568                let buf = object.allocate_buffer(1024).await;
1569                object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1570            }
1571            std::mem::drop(object);
1572
1573            let mut transaction = root_directory
1574                .acquire_context_for_replace(None, "test", true)
1575                .await
1576                .expect("acquire_context_for_replace failed")
1577                .transaction;
1578            replace_child(&mut transaction, None, (&root_directory, "test"))
1579                .await
1580                .expect("replace_child failed");
1581            transaction.commit().await.expect("commit failed");
1582        }
1583        fs.close().await.expect("close failed");
1584    }
1585
1586    #[test_case(true; "test power fail with barriers")]
1587    #[test_case(false; "test power fail with checksums")]
1588    #[fuchsia::test]
1589    async fn test_power_fail(barriers_enabled: bool) {
1590        // This test randomly discards blocks, so we run it a few times to increase the chances
1591        // of catching an issue in a single run.
1592        for _ in 0..10 {
1593            let (store_id, device, test_file_object_id) = {
1594                let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
1595                let fs = if barriers_enabled {
1596                    FxFilesystemBuilder::new()
1597                        .barriers_enabled(true)
1598                        .format(true)
1599                        .open(device)
1600                        .await
1601                        .expect("new filesystem failed")
1602                } else {
1603                    FxFilesystem::new_empty(device).await.expect("new_empty failed")
1604                };
1605                let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
1606
1607                fs.sync(SyncOptions { flush_device: true, ..SyncOptions::default() })
1608                    .await
1609                    .expect("sync failed");
1610
1611                let store = root_volume
1612                    .new_volume(
1613                        "test",
1614                        NewChildStoreOptions {
1615                            options: StoreOptions {
1616                                crypt: Some(Arc::new(new_insecure_crypt())),
1617                                ..StoreOptions::default()
1618                            },
1619                            ..Default::default()
1620                        },
1621                    )
1622                    .await
1623                    .expect("new_volume failed");
1624                let root_directory = Directory::open(&store, store.root_directory_object_id())
1625                    .await
1626                    .expect("open failed");
1627
1628                // Create a number of files with the goal of using up more than one journal block.
1629                async fn create_files(store: &Arc<ObjectStore>, prefix: &str) {
1630                    let fs = store.filesystem();
1631                    let root_directory = Directory::open(store, store.root_directory_object_id())
1632                        .await
1633                        .expect("open failed");
1634                    for i in 0..100 {
1635                        let mut transaction = fs
1636                            .root_store()
1637                            .new_transaction(
1638                                lock_keys![LockKey::object(
1639                                    store.store_object_id(),
1640                                    store.root_directory_object_id()
1641                                )],
1642                                Options::default(),
1643                            )
1644                            .await
1645                            .expect("new_transaction failed");
1646                        root_directory
1647                            .create_child_file(&mut transaction, &format!("{prefix} {i}"))
1648                            .await
1649                            .expect("create_child_file failed");
1650                        transaction.commit().await.expect("commit failed");
1651                    }
1652                }
1653
1654                // Create one batch of files.
1655                create_files(&store, "A").await;
1656
1657                // Create a file and write something to it.  This will make sure there's a
1658                // transaction present that includes a checksum.
1659                let mut transaction = fs
1660                    .root_store()
1661                    .new_transaction(
1662                        lock_keys![LockKey::object(
1663                            store.store_object_id(),
1664                            store.root_directory_object_id()
1665                        )],
1666                        Options::default(),
1667                    )
1668                    .await
1669                    .expect("new_transaction failed");
1670                let object = root_directory
1671                    .create_child_file(&mut transaction, "test")
1672                    .await
1673                    .expect("create_child_file failed");
1674                transaction.commit().await.expect("commit failed");
1675
1676                let mut transaction =
1677                    object.new_transaction().await.expect("new_transaction failed");
1678                let mut buffer = object.allocate_buffer(4096).await;
1679                buffer.as_mut_slice().fill(0xed);
1680                object
1681                    .txn_write(&mut transaction, 0, buffer.as_ref())
1682                    .await
1683                    .expect("txn_write failed");
1684                transaction.commit().await.expect("commit failed");
1685
1686                // Create another batch of files.
1687                create_files(&store, "B").await;
1688
1689                // Sync the device, but don't flush the device. We want to do this so we can
1690                // randomly discard blocks below.
1691                fs.sync(SyncOptions::default()).await.expect("sync failed");
1692
1693                // When we call `sync` above on the filesystem, it will pad the journal so that it
1694                // will get written, but it doesn't wait for the write to occur.  We wait for a
1695                // short time here to give allow time for the journal to be written.  Adding timers
1696                // isn't great, but this test already isn't deterministic since we randomly discard
1697                // blocks.
1698                fasync::Timer::new(Duration::from_millis(10)).await;
1699
1700                (
1701                    store.store_object_id(),
1702                    fs.device().snapshot().expect("snapshot failed"),
1703                    object.object_id(),
1704                )
1705            };
1706
1707            // Randomly discard blocks since the last flush.  This simulates what might happen in
1708            // the case of power-loss.  This will be an uncontrolled unmount.
1709            device
1710                .discard_random_since_last_flush()
1711                .expect("discard_random_since_last_flush failed");
1712
1713            let fs = FxFilesystem::open(device).await.expect("open failed");
1714            fsck(fs.clone()).await.expect("fsck failed");
1715
1716            let mut check_test_file = false;
1717
1718            // If we replayed and the store exists (i.e. the transaction that created the store
1719            // made it out), start by running fsck on it.
1720            let object_id = if fs.object_manager().store(store_id).is_some() {
1721                fsck_volume(&fs, store_id, Some(Arc::new(new_insecure_crypt())))
1722                    .await
1723                    .expect("fsck_volume failed");
1724
1725                // Now we want to create another file, unmount cleanly, and then finally check that
1726                // the new file exists.  This checks that we can continue to use the filesystem
1727                // after an unclean unmount.
1728                let store = root_volume(fs.clone())
1729                    .await
1730                    .expect("root_volume failed")
1731                    .volume(
1732                        "test",
1733                        StoreOptions {
1734                            crypt: Some(Arc::new(new_insecure_crypt())),
1735                            ..StoreOptions::default()
1736                        },
1737                    )
1738                    .await
1739                    .expect("volume failed");
1740
1741                let root_directory = Directory::open(&store, store.root_directory_object_id())
1742                    .await
1743                    .expect("open failed");
1744
1745                let mut transaction = fs
1746                    .root_store()
1747                    .new_transaction(
1748                        lock_keys![LockKey::object(
1749                            store.store_object_id(),
1750                            store.root_directory_object_id()
1751                        )],
1752                        Options::default(),
1753                    )
1754                    .await
1755                    .expect("new_transaction failed");
1756                let object = root_directory
1757                    .create_child_file(&mut transaction, &format!("C"))
1758                    .await
1759                    .expect("create_child_file failed");
1760                transaction.commit().await.expect("commit failed");
1761
1762                // Write again to the test file if it exists.
1763                if let Ok(test_file) = ObjectStore::open_object(
1764                    &store,
1765                    test_file_object_id,
1766                    HandleOptions::default(),
1767                    None,
1768                )
1769                .await
1770                {
1771                    // Check it has the contents we expect.
1772                    let mut buffer = test_file.allocate_buffer(4096).await;
1773                    let bytes = test_file.read(0, buffer.as_mut()).await.expect("read failed");
1774                    if bytes == 4096 {
1775                        let expected = [0xed; 4096];
1776                        assert_eq!(buffer.as_slice(), &expected);
1777                    } else {
1778                        // If the write didn't make it, the file should have zero bytes.
1779                        assert_eq!(bytes, 0);
1780                    }
1781
1782                    // Modify the test file.
1783                    let mut transaction =
1784                        test_file.new_transaction().await.expect("new_transaction failed");
1785                    buffer.as_mut_slice().fill(0x37);
1786                    test_file
1787                        .txn_write(&mut transaction, 0, buffer.as_ref())
1788                        .await
1789                        .expect("txn_write failed");
1790                    transaction.commit().await.expect("commit failed");
1791                    check_test_file = true;
1792                }
1793
1794                object.object_id()
1795            } else {
1796                INVALID_OBJECT_ID
1797            };
1798
1799            // This will do a controlled unmount.
1800            fs.close().await.expect("close failed");
1801            let device = fs.take_device().await;
1802            device.reopen(false);
1803
1804            let fs = FxFilesystem::open(device).await.expect("open failed");
1805            fsck(fs.clone()).await.expect("fsck failed");
1806
1807            // As mentioned above, make sure that the object we created before the clean unmount
1808            // exists.
1809            if object_id != INVALID_OBJECT_ID {
1810                fsck_volume(&fs, store_id, Some(Arc::new(new_insecure_crypt())))
1811                    .await
1812                    .expect("fsck_volume failed");
1813
1814                let store = root_volume(fs.clone())
1815                    .await
1816                    .expect("root_volume failed")
1817                    .volume(
1818                        "test",
1819                        StoreOptions {
1820                            crypt: Some(Arc::new(new_insecure_crypt())),
1821                            ..StoreOptions::default()
1822                        },
1823                    )
1824                    .await
1825                    .expect("volume failed");
1826                // We should be able to open the C object.
1827                ObjectStore::open_object(&store, object_id, HandleOptions::default(), None)
1828                    .await
1829                    .expect("open_object failed");
1830
1831                // If we made the modification to the test file, check it.
1832                if check_test_file {
1833                    info!("Checking test file for modification");
1834                    let test_file = ObjectStore::open_object(
1835                        &store,
1836                        test_file_object_id,
1837                        HandleOptions::default(),
1838                        None,
1839                    )
1840                    .await
1841                    .expect("open_object failed");
1842                    let mut buffer = test_file.allocate_buffer(4096).await;
1843                    assert_eq!(
1844                        test_file.read(0, buffer.as_mut()).await.expect("read failed"),
1845                        4096
1846                    );
1847                    let expected = [0x37; 4096];
1848                    assert_eq!(buffer.as_slice(), &expected);
1849                }
1850            }
1851
1852            fs.close().await.expect("close failed");
1853        }
1854    }
1855
1856    #[fuchsia::test]
1857    async fn test_barrier_not_emitted_when_transaction_has_no_data() {
1858        let barrier_count = Arc::new(AtomicU32::new(0));
1859
1860        struct Observer(Arc<AtomicU32>);
1861
1862        impl fake_device::Observer for Observer {
1863            fn barrier(&self) {
1864                self.0.fetch_add(1, Ordering::Relaxed);
1865            }
1866        }
1867
1868        let mut fake_device = FakeDevice::new(8192, 4096);
1869        fake_device.set_observer(Box::new(Observer(barrier_count.clone())));
1870        let device = DeviceHolder::new(fake_device);
1871        let fs = FxFilesystemBuilder::new()
1872            .barriers_enabled(true)
1873            .format(true)
1874            .open(device)
1875            .await
1876            .expect("new filesystem failed");
1877
1878        {
1879            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1880            root_vol
1881                .new_volume(
1882                    "test",
1883                    NewChildStoreOptions {
1884                        options: StoreOptions {
1885                            crypt: Some(Arc::new(new_insecure_crypt())),
1886                            ..StoreOptions::default()
1887                        },
1888                        ..NewChildStoreOptions::default()
1889                    },
1890                )
1891                .await
1892                .expect("there is no test volume");
1893            fs.close().await.expect("close failed");
1894        }
1895        // Remount the filesystem to ensure that the journal flushes and we can get a reliable
1896        // measure of the number of barriers issued during setup.
1897        let device = fs.take_device().await;
1898        device.reopen(false);
1899        let fs = FxFilesystemBuilder::new()
1900            .barriers_enabled(true)
1901            .open(device)
1902            .await
1903            .expect("new filesystem failed");
1904        let expected_barrier_count = barrier_count.load(Ordering::Relaxed);
1905
1906        let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1907        let store = root_vol
1908            .volume(
1909                "test",
1910                StoreOptions {
1911                    crypt: Some(Arc::new(new_insecure_crypt())),
1912                    ..StoreOptions::default()
1913                },
1914            )
1915            .await
1916            .expect("there is no test volume");
1917
1918        // Create a number of files with the goal of using up more than one journal block.
1919        let fs = store.filesystem();
1920        let root_directory =
1921            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
1922        for i in 0..100 {
1923            let mut transaction = fs
1924                .root_store()
1925                .new_transaction(
1926                    lock_keys![LockKey::object(
1927                        store.store_object_id(),
1928                        store.root_directory_object_id()
1929                    )],
1930                    Options::default(),
1931                )
1932                .await
1933                .expect("new_transaction failed");
1934            root_directory
1935                .create_child_file(&mut transaction, &format!("A {i}"))
1936                .await
1937                .expect("create_child_file failed");
1938            transaction.commit().await.expect("commit failed");
1939        }
1940
1941        // Unmount the filesystem to ensure that the journal flushes.
1942        fs.close().await.expect("close failed");
1943        // Ensure that no barriers were emitted while creating files, as no data was written.
1944        assert_eq!(expected_barrier_count, barrier_count.load(Ordering::Relaxed));
1945    }
1946
1947    #[fuchsia::test]
1948    async fn test_barrier_emitted_when_transaction_includes_data() {
1949        let barrier_count = Arc::new(AtomicU32::new(0));
1950
1951        struct Observer(Arc<AtomicU32>);
1952
1953        impl fake_device::Observer for Observer {
1954            fn barrier(&self) {
1955                self.0.fetch_add(1, Ordering::Relaxed);
1956            }
1957        }
1958
1959        let mut fake_device = FakeDevice::new(8192, 4096);
1960        fake_device.set_observer(Box::new(Observer(barrier_count.clone())));
1961        let device = DeviceHolder::new(fake_device);
1962        let fs = FxFilesystemBuilder::new()
1963            .barriers_enabled(true)
1964            .format(true)
1965            .open(device)
1966            .await
1967            .expect("new filesystem failed");
1968
1969        {
1970            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1971            root_vol
1972                .new_volume(
1973                    "test",
1974                    NewChildStoreOptions {
1975                        options: StoreOptions {
1976                            crypt: Some(Arc::new(new_insecure_crypt())),
1977                            ..StoreOptions::default()
1978                        },
1979                        ..NewChildStoreOptions::default()
1980                    },
1981                )
1982                .await
1983                .expect("there is no test volume");
1984            fs.close().await.expect("close failed");
1985        }
1986        // Remount the filesystem to ensure that the journal flushes and we can get a reliable
1987        // measure of the number of barriers issued during setup.
1988        let device = fs.take_device().await;
1989        device.reopen(false);
1990        let fs = FxFilesystemBuilder::new()
1991            .barriers_enabled(true)
1992            .open(device)
1993            .await
1994            .expect("new filesystem failed");
1995        let expected_barrier_count = barrier_count.load(Ordering::Relaxed);
1996
1997        let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1998        let store = root_vol
1999            .volume(
2000                "test",
2001                StoreOptions {
2002                    crypt: Some(Arc::new(new_insecure_crypt())),
2003                    ..StoreOptions::default()
2004                },
2005            )
2006            .await
2007            .expect("there is no test volume");
2008
2009        // Create a file and write something to it. This should cause a barrier to be emitted.
2010        let fs: Arc<FxFilesystem> = store.filesystem();
2011        let root_directory =
2012            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
2013
2014        let mut transaction = fs
2015            .root_store()
2016            .new_transaction(
2017                lock_keys![LockKey::object(
2018                    store.store_object_id(),
2019                    store.root_directory_object_id()
2020                )],
2021                Options::default(),
2022            )
2023            .await
2024            .expect("new_transaction failed");
2025        let object = root_directory
2026            .create_child_file(&mut transaction, "test")
2027            .await
2028            .expect("create_child_file failed");
2029        transaction.commit().await.expect("commit failed");
2030
2031        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2032        let mut buffer = object.allocate_buffer(4096).await;
2033        buffer.as_mut_slice().fill(0xed);
2034        object.txn_write(&mut transaction, 0, buffer.as_ref()).await.expect("txn_write failed");
2035        transaction.commit().await.expect("commit failed");
2036
2037        // Unmount the filesystem to ensure that the journal flushes.
2038        fs.close().await.expect("close failed");
2039        // Ensure that a barrier was emitted while writing to the file.
2040        assert!(expected_barrier_count < barrier_count.load(Ordering::Relaxed));
2041    }
2042
2043    #[test_case(true; "fail when original filesystem has barriers enabled")]
2044    #[test_case(false; "fail when original filesystem has barriers disabled")]
2045    #[fuchsia::test]
2046    async fn test_switching_barrier_mode_on_existing_filesystem(original_barrier_mode: bool) {
2047        let crypt = Some(Arc::new(new_insecure_crypt()) as Arc<dyn fxfs_crypto::Crypt>);
2048        let fake_device = FakeDevice::new(8192, 4096);
2049        let device = DeviceHolder::new(fake_device);
2050        let fs: super::OpenFxFilesystem = FxFilesystemBuilder::new()
2051            .barriers_enabled(original_barrier_mode)
2052            .format(true)
2053            .open(device)
2054            .await
2055            .expect("new filesystem failed");
2056
2057        // Create a volume named test with a file inside it called file.
2058        {
2059            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2060            let store = root_vol
2061                .new_volume(
2062                    "test",
2063                    NewChildStoreOptions {
2064                        options: StoreOptions { crypt: crypt.clone(), ..Default::default() },
2065                        ..Default::default()
2066                    },
2067                )
2068                .await
2069                .expect("creating test volume");
2070            let root_dir = Directory::open(&store, store.root_directory_object_id())
2071                .await
2072                .expect("open failed");
2073            let mut transaction = fs
2074                .root_store()
2075                .new_transaction(
2076                    lock_keys![LockKey::object(
2077                        store.store_object_id(),
2078                        store.root_directory_object_id()
2079                    )],
2080                    Default::default(),
2081                )
2082                .await
2083                .expect("new_transaction failed");
2084            let object = root_dir
2085                .create_child_file(&mut transaction, "file")
2086                .await
2087                .expect("create_child_file failed");
2088            transaction.commit().await.expect("commit failed");
2089            let mut buffer = object.allocate_buffer(4096).await;
2090            buffer.as_mut_slice().fill(0xA7);
2091            let new_size = object.write_or_append(None, buffer.as_ref()).await.unwrap();
2092            assert_eq!(new_size, 4096);
2093        }
2094
2095        // Remount the filesystem with the opposite barrier mode and write more data to our file.
2096        fs.close().await.expect("close failed");
2097        let device = fs.take_device().await;
2098        device.reopen(false);
2099        let fs = FxFilesystemBuilder::new()
2100            .barriers_enabled(!original_barrier_mode)
2101            .open(device)
2102            .await
2103            .expect("new filesystem failed");
2104        {
2105            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2106            let store = root_vol
2107                .volume("test", StoreOptions { crypt: crypt.clone(), ..Default::default() })
2108                .await
2109                .expect("opening test volume");
2110            let root_dir = Directory::open(&store, store.root_directory_object_id())
2111                .await
2112                .expect("open failed");
2113            let (object_id, _, _) =
2114                root_dir.lookup("file").await.expect("lookup failed").expect("missing file");
2115            let test_file = ObjectStore::open_object(&store, object_id, Default::default(), None)
2116                .await
2117                .expect("open failed");
2118            // Write some more data.
2119            let mut buffer = test_file.allocate_buffer(4096).await;
2120            buffer.as_mut_slice().fill(0xA8);
2121            let new_size = test_file.write_or_append(None, buffer.as_ref()).await.unwrap();
2122            assert_eq!(new_size, 8192);
2123        }
2124
2125        // Lastly, remount the filesystems with the original barrier mode and make sure everything
2126        // can be read from the file as expected.
2127        fs.close().await.expect("close failed");
2128        let device = fs.take_device().await;
2129        device.reopen(false);
2130        let fs = FxFilesystemBuilder::new()
2131            .barriers_enabled(original_barrier_mode)
2132            .open(device)
2133            .await
2134            .expect("new filesystem failed");
2135        {
2136            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2137            let store = root_vol
2138                .volume("test", StoreOptions { crypt: crypt.clone(), ..Default::default() })
2139                .await
2140                .expect("opening test volume");
2141            let root_dir = Directory::open(&store, store.root_directory_object_id())
2142                .await
2143                .expect("open failed");
2144            let (object_id, _, _) =
2145                root_dir.lookup("file").await.expect("lookup failed").expect("missing file");
2146            let test_file = ObjectStore::open_object(&store, object_id, Default::default(), None)
2147                .await
2148                .expect("open failed");
2149            let mut buffer = test_file.allocate_buffer(8192).await;
2150            assert_eq!(
2151                test_file.read(0, buffer.as_mut()).await.expect("read failed"),
2152                8192,
2153                "short read"
2154            );
2155            assert_eq!(buffer.as_slice()[0..4096], [0xA7; 4096]);
2156            assert_eq!(buffer.as_slice()[4096..8192], [0xA8; 4096]);
2157        }
2158        fs.close().await.expect("close failed");
2159    }
2160
2161    #[fuchsia::test]
2162    async fn test_image_builder_mode_no_early_writes() {
2163        const BLOCK_SIZE: u32 = 4096;
2164        let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
2165        device.reopen(true);
2166        let fs = FxFilesystemBuilder::new()
2167            .format(true)
2168            .image_builder_mode(Some(SuperBlockInstance::A))
2169            .open(device)
2170            .await
2171            .expect("open failed");
2172        fs.enable_allocations();
2173        // fs.close() now performs compaction (writing superblock), so device must be writable.
2174        fs.device().reopen(false);
2175        fs.close().await.expect("closed");
2176    }
2177
2178    #[fuchsia::test]
2179    async fn test_image_builder_mode() {
2180        const BLOCK_SIZE: u32 = 4096;
2181        const EXISTING_FILE_RANGE: Range<u64> = 4096 * 1024..4096 * 1025;
2182        let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
2183
2184        // Write some fake file data at an offset in the image and confirm it as an fxfs file below.
2185        {
2186            let mut write_buf =
2187                device.allocate_buffer(EXISTING_FILE_RANGE.length().unwrap() as usize).await;
2188            write_buf.as_mut_slice().fill(0xf0);
2189            device.write(EXISTING_FILE_RANGE.start, write_buf.as_ref()).await.expect("write");
2190        }
2191
2192        device.reopen(true);
2193
2194        let device = {
2195            let fs = FxFilesystemBuilder::new()
2196                .format(true)
2197                .image_builder_mode(Some(SuperBlockInstance::B))
2198                .open(device)
2199                .await
2200                .expect("open failed");
2201            fs.enable_allocations();
2202            {
2203                let root_store = fs.root_store();
2204                let root_directory =
2205                    Directory::open(&root_store, root_store.root_directory_object_id())
2206                        .await
2207                        .expect("open failed");
2208                // Create a file referencing existing data on device.
2209                let handle;
2210                {
2211                    let mut transaction = fs
2212                        .root_store()
2213                        .new_transaction(
2214                            lock_keys![LockKey::object(
2215                                root_directory.store().store_object_id(),
2216                                root_directory.object_id()
2217                            )],
2218                            Options::default(),
2219                        )
2220                        .await
2221                        .expect("new transaction");
2222                    handle = root_directory
2223                        .create_child_file(&mut transaction, "test")
2224                        .await
2225                        .expect("create file");
2226                    handle.extend(&mut transaction, EXISTING_FILE_RANGE).await.expect("extend");
2227                    transaction.commit().await.expect("commit");
2228                }
2229            }
2230            fs.device().reopen(false);
2231            fs.close().await.expect("close");
2232            fs.take_device().await
2233        };
2234        device.reopen(false);
2235        let fs = FxFilesystem::open(device).await.expect("open failed");
2236        fsck(fs.clone()).await.expect("fsck failed");
2237
2238        // Confirm that the test file points at the correct data.
2239        let root_store = fs.root_store();
2240        let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
2241            .await
2242            .expect("open failed");
2243        let (object_id, descriptor, _) =
2244            root_directory.lookup("test").await.expect("lookup failed").unwrap();
2245        assert_eq!(descriptor, ObjectDescriptor::File);
2246        let test_file =
2247            ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
2248                .await
2249                .expect("open failed");
2250        let mut read_buf =
2251            test_file.allocate_buffer(EXISTING_FILE_RANGE.length().unwrap() as usize).await;
2252        test_file.read(0, read_buf.as_mut()).await.expect("read failed");
2253        assert_eq!(read_buf.as_slice(), [0xf0; 4096]);
2254        fs.close().await.expect("closed");
2255    }
2256
2257    #[fuchsia::test]
2258    async fn test_read_only_mount_on_full_filesystem() {
2259        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2260        let fs =
2261            FxFilesystemBuilder::new().format(true).open(device).await.expect("new_empty failed");
2262        let root_store = fs.root_store();
2263        let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
2264            .await
2265            .expect("open failed");
2266
2267        let mut transaction = fs
2268            .root_store()
2269            .new_transaction(
2270                lock_keys![LockKey::object(
2271                    root_store.store_object_id(),
2272                    root_directory.object_id()
2273                )],
2274                Options::default(),
2275            )
2276            .await
2277            .expect("new_transaction failed");
2278        let handle = root_directory
2279            .create_child_file(&mut transaction, "test")
2280            .await
2281            .expect("create_child_file failed");
2282        transaction.commit().await.expect("commit failed");
2283
2284        let mut buf = handle.allocate_buffer(4096).await;
2285        buf.as_mut_slice().fill(0xaa);
2286        loop {
2287            if handle.write_or_append(None, buf.as_ref()).await.is_err() {
2288                break;
2289            }
2290        }
2291
2292        let max_offset = fs.allocator().maximum_offset();
2293        fs.close().await.expect("Close failed");
2294
2295        let device = fs.take_device().await;
2296        device.reopen(false);
2297        let mut buffer = device
2298            .allocate_buffer(
2299                crate::round::round_up(max_offset, TEST_DEVICE_BLOCK_SIZE).unwrap() as usize
2300            )
2301            .await;
2302        device.read(0, buffer.as_mut()).await.expect("read failed");
2303
2304        let device = DeviceHolder::new(
2305            FakeDevice::from_image(&buffer.as_slice()[..], TEST_DEVICE_BLOCK_SIZE)
2306                .expect("from_image failed"),
2307        );
2308        let fs =
2309            FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
2310        fs.close().await.expect("Close failed");
2311    }
2312
2313    #[test_case(SuperBlockInstance::A; "Superblock instance A")]
2314    #[test_case(SuperBlockInstance::B; "Superblock instance B")]
2315    #[fuchsia::test]
2316    async fn test_image_builder_mode_flush_on_close_sb_a(target_sb: SuperBlockInstance) {
2317        const BLOCK_SIZE: u32 = 4096;
2318        let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
2319
2320        // 1. Initialize in image_builder_mode
2321        device.reopen(true);
2322        let fs = FxFilesystemBuilder::new()
2323            .format(true)
2324            .image_builder_mode(Some(target_sb))
2325            .open(device)
2326            .await
2327            .expect("open failed");
2328
2329        fs.enable_allocations();
2330
2331        // 2. Finalize logic (via close)
2332        fs.device().reopen(false);
2333
2334        // 3. Write data
2335        {
2336            let root_store = fs.root_store();
2337            let root_directory =
2338                Directory::open(&root_store, root_store.root_directory_object_id())
2339                    .await
2340                    .expect("open failed");
2341
2342            let mut transaction = fs
2343                .root_store()
2344                .new_transaction(
2345                    lock_keys![LockKey::object(
2346                        root_directory.store().store_object_id(),
2347                        root_directory.object_id()
2348                    )],
2349                    Options::default(),
2350                )
2351                .await
2352                .expect("new transaction");
2353            let handle = root_directory
2354                .create_child_file(&mut transaction, "post_finalize_file")
2355                .await
2356                .expect("create file");
2357            transaction.commit().await.expect("commit");
2358
2359            let mut buf = handle.allocate_buffer(BLOCK_SIZE as usize).await;
2360            buf.as_mut_slice().fill(0xaa);
2361            handle.write_or_append(None, buf.as_ref()).await.expect("write failed");
2362        }
2363
2364        // 4. Close. Should flush to `target_sb` only.
2365        fs.close().await.expect("close failed");
2366
2367        let other_sb = target_sb.next();
2368
2369        // 5. Verify `target_sb` is valid and `other_sb` is empty.
2370        let device = fs.take_device().await;
2371        device.reopen(true); // Read-only is fine for verifying.
2372        let mut buf = device.allocate_buffer(BLOCK_SIZE as usize).await;
2373
2374        device.read(target_sb.first_extent().start, buf.as_mut()).await.expect("read target_sb");
2375        assert_eq!(&buf.as_slice()[..8], b"FxfsSupr", "target_sb should have magic bytes");
2376
2377        buf.as_mut_slice().fill(0); // Clear buffer
2378        device.read(other_sb.first_extent().start, buf.as_mut()).await.expect("read other_sb");
2379        // Expecting all zeros for `other_sb`
2380        assert_eq!(buf.as_slice(), &[0; 4096], "other_sb should be zeroed");
2381    }
2382
2383    #[cfg(target_os = "fuchsia")]
2384    #[fuchsia::test(allow_stalls = false)]
2385    async fn test_trim_with_power_manager() {
2386        use anyhow::Error;
2387        use async_trait::async_trait;
2388        use fuchsia_async::TestExecutor;
2389        use futures::StreamExt;
2390
2391        TestExecutor::advance_to(fasync::MonotonicInstant::ZERO).await;
2392
2393        #[derive(Default)]
2394        struct MockPowerManager {
2395            on_battery: Mutex<bool>,
2396            event: event_listener::Event,
2397            wake_lease: Mutex<Option<zx::EventPair>>,
2398        }
2399
2400        impl MockPowerManager {
2401            fn set_on_battery(&self, v: bool) {
2402                *self.on_battery.lock() = v;
2403                self.event.notify(usize::MAX);
2404            }
2405
2406            fn is_lease_held(&self) -> bool {
2407                self.wake_lease.lock().as_ref().is_some_and(|handle| {
2408                    handle
2409                        .wait_one(
2410                            zx::Signals::EVENTPAIR_PEER_CLOSED,
2411                            zx::MonotonicInstant::INFINITE_PAST,
2412                        )
2413                        .is_err()
2414                })
2415            }
2416        }
2417
2418        impl super::PowerManager for MockPowerManager {
2419            fn watch_battery(
2420                self: Arc<Self>,
2421            ) -> futures::stream::BoxStream<'static, (bool, super::WakeLease)> {
2422                futures::stream::unfold(true, move |first| {
2423                    let this = self.clone();
2424                    async move {
2425                        if !first {
2426                            this.event.listen().await;
2427                        }
2428                        let val = *this.on_battery.lock();
2429                        let handle = if val {
2430                            zx::NullableHandle::invalid()
2431                        } else {
2432                            let (h1, h2) = zx::EventPair::create();
2433                            *this.wake_lease.lock() = Some(h2);
2434                            // SAFETY: It's clear the handle is valid.
2435                            h1.into_handle()
2436                        };
2437                        Some(((val, handle), false))
2438                    }
2439                })
2440                .boxed()
2441            }
2442        }
2443
2444        let trim_count = Arc::new(AtomicU32::new(0));
2445
2446        struct TrimTrackingDevice {
2447            inner: DeviceHolder,
2448            trim_count: Arc<AtomicU32>,
2449            power_manager: Arc<MockPowerManager>,
2450        }
2451
2452        #[async_trait]
2453        impl storage_device::Device for TrimTrackingDevice {
2454            fn allocate_buffer(&self, size: usize) -> storage_device::buffer::BufferFuture<'_> {
2455                self.inner.allocate_buffer(size)
2456            }
2457            fn block_size(&self) -> u32 {
2458                self.inner.block_size()
2459            }
2460            fn block_count(&self) -> u64 {
2461                self.inner.block_count()
2462            }
2463            async fn read_with_opts(
2464                &self,
2465                offset: u64,
2466                buffer: storage_device::buffer::MutableBufferRef<'_>,
2467                opts: storage_device::ReadOptions,
2468            ) -> Result<(), Error> {
2469                self.inner.read_with_opts(offset, buffer, opts).await
2470            }
2471            async fn write_with_opts(
2472                &self,
2473                offset: u64,
2474                buffer: storage_device::buffer::BufferRef<'_>,
2475                opts: storage_device::WriteOptions,
2476            ) -> Result<(), Error> {
2477                self.inner.write_with_opts(offset, buffer, opts).await
2478            }
2479            async fn trim(&self, range: std::ops::Range<u64>) -> Result<(), Error> {
2480                assert!(self.power_manager.is_lease_held());
2481                self.trim_count.fetch_add(1, Ordering::SeqCst);
2482                self.inner.trim(range).await
2483            }
2484            async fn flush(&self) -> Result<(), Error> {
2485                self.inner.flush().await
2486            }
2487            async fn close(&self) -> Result<(), Error> {
2488                self.inner.close().await
2489            }
2490            fn barrier(&self) {
2491                self.inner.barrier()
2492            }
2493            fn supports_trim(&self) -> bool {
2494                true
2495            }
2496            fn is_read_only(&self) -> bool {
2497                self.inner.is_read_only()
2498            }
2499            fn snapshot(&self) -> Result<DeviceHolder, Error> {
2500                Ok(DeviceHolder::new(TrimTrackingDevice {
2501                    inner: self.inner.snapshot()?,
2502                    trim_count: self.trim_count.clone(),
2503                    power_manager: self.power_manager.clone(),
2504                }))
2505            }
2506            fn reopen(&self, read_only: bool) {
2507                self.inner.reopen(read_only)
2508            }
2509        }
2510
2511        let pm = Arc::new(MockPowerManager::default());
2512
2513        // Start on battery.
2514        pm.set_on_battery(true);
2515
2516        let fake_device = FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE);
2517        let device = DeviceHolder::new(TrimTrackingDevice {
2518            inner: DeviceHolder::new(fake_device),
2519            trim_count: trim_count.clone(),
2520            power_manager: pm.clone(),
2521        });
2522
2523        let fs = FxFilesystemBuilder::new()
2524            .format(true)
2525            .power_manager(pm.clone())
2526            .trim_config(Some((Duration::ZERO, Duration::from_millis(100))))
2527            .trim_charger_wait(Duration::from_millis(10))
2528            .open(device)
2529            .await
2530            .expect("open failed");
2531
2532        // Initially on battery, so no trim should happen.
2533        TestExecutor::advance_to(fasync::MonotonicInstant::after(
2534            Duration::from_millis(500).into(),
2535        ))
2536        .await;
2537        let _ = TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
2538
2539        assert_eq!(trim_count.load(Ordering::SeqCst), 0);
2540
2541        // Make some things to trim.
2542        {
2543            let root_store = fs.root_store();
2544            let root_directory =
2545                Directory::open(&root_store, root_store.root_directory_object_id())
2546                    .await
2547                    .expect("open failed");
2548            let mut transaction = fs
2549                .root_store()
2550                .new_transaction(
2551                    lock_keys![LockKey::object(
2552                        root_store.store_object_id(),
2553                        root_directory.object_id()
2554                    )],
2555                    Options::default(),
2556                )
2557                .await
2558                .expect("new_transaction failed");
2559            let handle = root_directory
2560                .create_child_file(&mut transaction, "test")
2561                .await
2562                .expect("create_child_file failed");
2563            transaction.commit().await.expect("commit failed");
2564            handle.allocate(0..4096).await.expect("allocate failed");
2565            // Now delete it to make it trimmable.
2566            let mut transaction = fs
2567                .root_store()
2568                .new_transaction(
2569                    lock_keys![
2570                        LockKey::object(root_store.store_object_id(), root_directory.object_id()),
2571                        LockKey::object(root_store.store_object_id(), handle.object_id()),
2572                    ],
2573                    Options::default(),
2574                )
2575                .await
2576                .expect("new_transaction failed");
2577            replace_child(&mut transaction, None, (&root_directory, "test"))
2578                .await
2579                .expect("delete failed");
2580            transaction.commit().await.expect("commit failed");
2581            fs.root_store()
2582                .tombstone_object(handle.object_id(), Options::default())
2583                .await
2584                .expect("tombstone failed");
2585        }
2586
2587        // Put on external power source.
2588        pm.set_on_battery(false);
2589
2590        // Trim should start after 10ms.
2591        TestExecutor::advance_to(fasync::MonotonicInstant::after(Duration::from_millis(10).into()))
2592            .await;
2593
2594        let _ = TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
2595
2596        assert!(trim_count.load(Ordering::SeqCst) > 0);
2597
2598        // Reset trim count and take off charger.
2599        trim_count.store(0, Ordering::SeqCst);
2600        pm.set_on_battery(true);
2601
2602        // Wait and ensure no more trims.
2603        TestExecutor::advance_to(fasync::MonotonicInstant::after(
2604            Duration::from_millis(500).into(),
2605        ))
2606        .await;
2607
2608        let _ = TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
2609
2610        assert_eq!(trim_count.load(Ordering::SeqCst), 0);
2611
2612        fs.close().await.expect("close failed");
2613    }
2614}