Skip to main content

fxfs/
filesystem.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::errors::FxfsError;
6use crate::fsck::{FsckOptions, fsck_volume_with_options, fsck_with_options};
7use crate::log::*;
8use crate::metrics;
9use crate::object_store::allocator::{Allocator, Hold, Reservation};
10use crate::object_store::directory::Directory;
11use crate::object_store::graveyard::Graveyard;
12use crate::object_store::journal::super_block::{SuperBlockHeader, SuperBlockInstance};
13use crate::object_store::journal::{self, Journal, JournalCheckpoint, JournalOptions};
14use crate::object_store::object_manager::ObjectManager;
15use crate::object_store::transaction::{
16    self, AssocObj, LockKey, LockKeys, LockManager, MetadataReservation, Mutation, ReadGuard,
17    TRANSACTION_METADATA_MAX_AMOUNT, Transaction, WriteGuard, lock_keys,
18};
19use crate::object_store::volume::{VOLUMES_DIRECTORY, root_volume};
20use crate::object_store::{NewChildStoreOptions, ObjectStore, StoreOptions};
21use crate::range::RangeExt;
22use crate::serialized_types::{LATEST_VERSION, Version};
23use anyhow::{Context, Error, anyhow, bail};
24use async_trait::async_trait;
25use event_listener::Event;
26use fuchsia_async as fasync;
27use fuchsia_async::condition::Condition;
28use fuchsia_inspect::{Inspector, LazyNode, NumericProperty as _, UintProperty};
29use fuchsia_sync::Mutex;
30use futures::{FutureExt, Stream};
31use fxfs_crypto::Crypt;
32use fxfs_trace::{TraceFutureExt, trace_future_args};
33use static_assertions::const_assert;
34use std::pin::pin;
35use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
36use std::sync::{Arc, OnceLock, Weak};
37use std::task::Poll;
38use std::time::{Duration, Instant};
39use storage_device::{Device, DeviceHolder};
40
41pub const MIN_BLOCK_SIZE: u64 = 4096;
42pub const MAX_BLOCK_SIZE: u64 = u16::MAX as u64 + 1;
43
44// Whilst Fxfs could support up to u64::MAX, off_t is i64 so allowing files larger than that becomes
45// difficult to deal with via the POSIX APIs. Additionally, PagedObjectHandle only sees data get
46// modified in page chunks so to prevent writes at i64::MAX the entire page containing i64::MAX
47// needs to be excluded.
48pub const MAX_FILE_SIZE: u64 = i64::MAX as u64 - 4095;
49const_assert!(9223372036854771712 == MAX_FILE_SIZE);
50
51use futures::stream::StreamExt;
52
53// The maximum number of transactions that can be in-flight at any time.
54const MAX_IN_FLIGHT_TRANSACTIONS: u64 = 4;
55
56// Start trimming 1 hour after boot.  The idea here is to wait until the initial flurry of
57// activity during boot is finished.  This is a rough heuristic and may need to change later if
58// performance is affected.
59const TRIM_AFTER_BOOT_TIMER: Duration = Duration::from_secs(60 * 60);
60
61// After the initial trim, perform another trim every 24 hours.
62const TRIM_INTERVAL_TIMER: Duration = Duration::from_secs(60 * 60 * 24);
63
64/// How often to clean the transfer buffer.
65// TODO(https://fxbug.dev/489725256) Configure the task to run when fxfs is idle.
66const CLEAN_TRANSFER_BUFFER_INTERVAL: Duration = Duration::from_secs(60);
67
68#[cfg(target_os = "fuchsia")]
69pub type WakeLease = zx::NullableHandle;
70
71#[cfg(not(target_os = "fuchsia"))]
72pub type WakeLease = fasync::emulated_handle::Handle;
73
74pub trait PowerManager: Send + Sync {
75    /// Returns a stream of battery status changes (true if using battery).
76    fn watch_battery(self: Arc<Self>) -> futures::stream::BoxStream<'static, (bool, WakeLease)>;
77}
78
79/// Holds information on an Fxfs Filesystem
80pub struct Info {
81    pub total_bytes: u64,
82    pub used_bytes: u64,
83}
84
85pub type PostCommitHook =
86    Option<Box<dyn Fn() -> futures::future::BoxFuture<'static, ()> + Send + Sync>>;
87
88pub type PreCommitHook = Option<Box<dyn Fn(&Transaction<'_>) -> Result<(), Error> + Send + Sync>>;
89
90pub struct Options {
91    /// True if the filesystem is read-only.
92    pub read_only: bool,
93
94    /// The metadata keys will be rolled after this many bytes.  This must be large enough such that
95    /// we can't end up with more than two live keys (so it must be bigger than the maximum possible
96    /// size of unflushed journal contents).  This is exposed for testing purposes.
97    pub roll_metadata_key_byte_count: u64,
98
99    /// A callback that runs before every transaction is committed.  If this callback returns an
100    /// error then the transaction is failed with that error.
101    pub pre_commit_hook: PreCommitHook,
102
103    /// A callback that runs after every transaction has been committed.  This will be called whilst
104    /// a lock is held which will block more transactions from being committed.
105    pub post_commit_hook: PostCommitHook,
106
107    /// If true, don't do an initial reap of the graveyard at mount time.  This is useful for
108    /// testing.
109    pub skip_initial_reap: bool,
110
111    // The first duration is how long after the filesystem has been mounted to perform an initial
112    // trim.  The second is the interval to repeat trimming thereafter.  If set to None, no trimming
113    // is done.
114    // Default values are (5 minutes, 24 hours).
115    pub trim_config: Option<(Duration, Duration)>,
116
117    // If set, journal will not be used for writes. The user must call 'close' when finished.
118    // The provided superblock instance will be written upon close().
119    pub image_builder_mode: Option<SuperBlockInstance>,
120
121    // If true, the filesystem will use the hardware's inline crypto engine to write encrypted
122    // data. Requires the block device to support inline encryption and for `barriers_enabled` to
123    // be true.
124    // TODO(https://fxbug.dev/393196849): For now, this flag only prevents the filesystem from
125    // computing checksums. Update this comment when the filesystem actually uses inline
126    // encryption.
127    pub inline_crypto_enabled: bool,
128
129    // Configures the filesystem to use barriers instead of checksums to ensure consistency.
130    // Checksums may be computed and stored in extent records but will no longer be stored in the
131    // journal. The journal will use barriers to enforce proper ordering between data and metadata
132    // writes. Must be true if `inline_crypto_enabled` is true.
133    pub barriers_enabled: bool,
134
135    /// If set, this will be used to check for charger status before trimming.
136    pub power_manager: Option<Arc<dyn PowerManager>>,
137
138    /// How long to wait after being placed on a charger before starting a trim.
139    pub trim_charger_wait: Duration,
140}
141
142impl Default for Options {
143    fn default() -> Self {
144        Options {
145            roll_metadata_key_byte_count: 128 * 1024 * 1024,
146            read_only: false,
147            pre_commit_hook: None,
148            post_commit_hook: None,
149            skip_initial_reap: false,
150            trim_config: Some((TRIM_AFTER_BOOT_TIMER, TRIM_INTERVAL_TIMER)),
151            image_builder_mode: None,
152            inline_crypto_enabled: false,
153            barriers_enabled: false,
154            power_manager: None,
155            trim_charger_wait: Duration::from_secs(10),
156        }
157    }
158}
159
160/// The context in which a transaction is being applied.
161pub struct ApplyContext<'a, 'b> {
162    /// The mode indicates whether the transaction is being replayed.
163    pub mode: ApplyMode<'a, 'b>,
164
165    /// The transaction checkpoint for this mutation.
166    pub checkpoint: JournalCheckpoint,
167}
168
169/// A transaction can be applied during replay or on a live running system (in which case a
170/// transaction object will be available).
171pub enum ApplyMode<'a, 'b> {
172    Replay,
173    Live(&'a Transaction<'b>),
174}
175
176impl ApplyMode<'_, '_> {
177    pub fn is_replay(&self) -> bool {
178        matches!(self, ApplyMode::Replay)
179    }
180
181    pub fn is_live(&self) -> bool {
182        matches!(self, ApplyMode::Live(_))
183    }
184}
185
186/// Objects that use journaling to track mutations (`Allocator` and `ObjectStore`) implement this.
187/// This is primarily used by `ObjectManager` and `SuperBlock` with flush calls used in a few tests.
188#[async_trait]
189pub trait JournalingObject: Send + Sync {
190    /// This method get called when the transaction commits, which can either be during live
191    /// operation (See `ObjectManager::apply_mutation`) or during journal replay, in which case
192    /// transaction will be None (See `super_block::read`).
193    fn apply_mutation(
194        &self,
195        mutation: Mutation,
196        context: &ApplyContext<'_, '_>,
197        assoc_obj: AssocObj<'_>,
198    ) -> Result<(), Error>;
199
200    /// Called when a transaction fails to commit.
201    fn drop_mutation(&self, mutation: Mutation, transaction: &Transaction<'_>);
202
203    /// Flushes in-memory changes to the device (to allow journal space to be freed).
204    ///
205    /// Also returns the earliest version of a struct in the filesystem.
206    async fn flush(&self) -> Result<Version, Error>;
207
208    /// Writes a mutation to the journal.  This allows objects to encrypt or otherwise modify what
209    /// gets written to the journal.
210    fn write_mutation(&self, mutation: &Mutation, mut writer: journal::Writer<'_>) {
211        writer.write(mutation.clone());
212    }
213}
214
215#[derive(Default)]
216pub struct SyncOptions<'a> {
217    /// If set, the journal will be flushed, as well as the underlying block device.  This is much
218    /// more expensive, but ensures the contents of the journal are persisted (which also acts as a
219    /// barrier, ensuring all previous journal writes are observable by future operations).
220    /// Note that when this is not set, the journal is *not* synchronously flushed by the sync call,
221    /// and it will return before the journal flush completes.  In other words, some journal
222    /// mutations may still be buffered in memory after this call returns.
223    pub flush_device: bool,
224
225    /// A precondition that is evaluated whilst a lock is held that determines whether or not the
226    /// sync needs to proceed.
227    pub precondition: Option<Box<dyn FnOnce() -> bool + 'a + Send>>,
228}
229
230pub struct OpenFxFilesystem(Arc<FxFilesystem>);
231
232impl OpenFxFilesystem {
233    /// Waits for filesystem to be dropped (so callers should ensure all direct and indirect
234    /// references are dropped) and returns the device.  No attempt is made at a graceful shutdown.
235    pub async fn take_device(self) -> DeviceHolder {
236        let fut = self.device.take_when_dropped();
237        std::mem::drop(self);
238        debug_assert_not_too_long!(fut)
239    }
240}
241
242impl From<Arc<FxFilesystem>> for OpenFxFilesystem {
243    fn from(fs: Arc<FxFilesystem>) -> Self {
244        Self(fs)
245    }
246}
247
248impl Drop for OpenFxFilesystem {
249    fn drop(&mut self) {
250        if self.options.image_builder_mode.is_some()
251            && self.journal().image_builder_mode().is_some()
252        {
253            error!("OpenFxFilesystem in image_builder_mode dropped without calling close().");
254        }
255        if !self.options.read_only && !self.closed.load(Ordering::SeqCst) {
256            error!("OpenFxFilesystem dropped without first being closed. Data loss may occur.");
257        }
258    }
259}
260
261impl std::ops::Deref for OpenFxFilesystem {
262    type Target = Arc<FxFilesystem>;
263
264    fn deref(&self) -> &Self::Target {
265        &self.0
266    }
267}
268
269pub struct FxFilesystemBuilder {
270    format: bool,
271    trace: bool,
272    options: Options,
273    journal_options: JournalOptions,
274    on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
275    on_new_store: Option<Box<dyn Fn(&ObjectStore) + Send + Sync>>,
276    fsck_after_every_transaction: bool,
277}
278
279impl FxFilesystemBuilder {
280    pub fn new() -> Self {
281        Self {
282            format: false,
283            trace: false,
284            options: Options::default(),
285            journal_options: JournalOptions::default(),
286            on_new_allocator: None,
287            on_new_store: None,
288            fsck_after_every_transaction: false,
289        }
290    }
291
292    /// Sets whether the block device should be formatted when opened. Defaults to `false`.
293    pub fn format(mut self, format: bool) -> Self {
294        self.format = format;
295        self
296    }
297
298    /// Enables or disables trace level logging. Defaults to `false`.
299    pub fn trace(mut self, trace: bool) -> Self {
300        self.trace = trace;
301        self
302    }
303
304    /// Sets whether the filesystem will be opened in read-only mode. Defaults to `false`.
305    /// Incompatible with `format`.
306    pub fn read_only(mut self, read_only: bool) -> Self {
307        self.options.read_only = read_only;
308        self
309    }
310
311    /// For image building and in-place migration.
312    ///
313    /// This mode avoids the initial write of super blocks and skips the journal for all
314    /// transactions. The user *must* call `close()` before dropping the filesystem to trigger
315    /// a compaction of in-memory data structures, a minimal journal and a write to one
316    /// superblock (as specified).
317    pub fn image_builder_mode(mut self, mode: Option<SuperBlockInstance>) -> Self {
318        self.options.image_builder_mode = mode;
319        self
320    }
321
322    /// Sets how often the metadata keys are rolled. See `Options::roll_metadata_key_byte_count`.
323    pub fn roll_metadata_key_byte_count(mut self, roll_metadata_key_byte_count: u64) -> Self {
324        self.options.roll_metadata_key_byte_count = roll_metadata_key_byte_count;
325        self
326    }
327
328    /// Sets a callback that runs before every transaction. See `Options::pre_commit_hook`.
329    pub fn pre_commit_hook(
330        mut self,
331        hook: impl Fn(&Transaction<'_>) -> Result<(), Error> + Send + Sync + 'static,
332    ) -> Self {
333        self.options.pre_commit_hook = Some(Box::new(hook));
334        self
335    }
336
337    /// Sets a callback that runs after every transaction has been committed. See
338    /// `Options::post_commit_hook`.
339    pub fn post_commit_hook(
340        mut self,
341        hook: impl Fn() -> futures::future::BoxFuture<'static, ()> + Send + Sync + 'static,
342    ) -> Self {
343        self.options.post_commit_hook = Some(Box::new(hook));
344        self
345    }
346
347    /// Sets whether to do an initial reap of the graveyard at mount time. See
348    /// `Options::skip_initial_reap`. Defaults to `false`.
349    pub fn skip_initial_reap(mut self, skip_initial_reap: bool) -> Self {
350        self.options.skip_initial_reap = skip_initial_reap;
351        self
352    }
353
354    /// Sets the options for the journal.
355    pub fn journal_options(mut self, journal_options: JournalOptions) -> Self {
356        self.journal_options = journal_options;
357        self
358    }
359
360    /// Sets a method to be called immediately after creating the allocator.
361    pub fn on_new_allocator(
362        mut self,
363        on_new_allocator: impl Fn(Arc<Allocator>) + Send + Sync + 'static,
364    ) -> Self {
365        self.on_new_allocator = Some(Box::new(on_new_allocator));
366        self
367    }
368
369    /// Sets a method to be called each time a new store is registered with `ObjectManager`.
370    pub fn on_new_store(
371        mut self,
372        on_new_store: impl Fn(&ObjectStore) + Send + Sync + 'static,
373    ) -> Self {
374        self.on_new_store = Some(Box::new(on_new_store));
375        self
376    }
377
378    /// Enables or disables running fsck after every transaction. Defaults to `false`.
379    pub fn fsck_after_every_transaction(mut self, fsck_after_every_transaction: bool) -> Self {
380        self.fsck_after_every_transaction = fsck_after_every_transaction;
381        self
382    }
383
384    pub fn trim_config(mut self, delay_and_interval: Option<(Duration, Duration)>) -> Self {
385        self.options.trim_config = delay_and_interval;
386        self
387    }
388
389    pub fn power_manager(mut self, power_manager: Arc<dyn PowerManager>) -> Self {
390        self.options.power_manager = Some(power_manager);
391        self
392    }
393
394    pub fn trim_charger_wait(mut self, wait: Duration) -> Self {
395        self.options.trim_charger_wait = wait;
396        self
397    }
398
399    /// Enables or disables inline encryption. Defaults to `false`.
400    pub fn inline_crypto_enabled(mut self, inline_crypto_enabled: bool) -> Self {
401        self.options.inline_crypto_enabled = inline_crypto_enabled;
402        self
403    }
404
405    /// Enables or disables barriers in both the filesystem and journal options.
406    /// Defaults to `false`.
407    pub fn barriers_enabled(mut self, barriers_enabled: bool) -> Self {
408        self.options.barriers_enabled = barriers_enabled;
409        self.journal_options.barriers_enabled = barriers_enabled;
410        self
411    }
412
413    /// Constructs an `FxFilesystem` object with the specified settings.
414    pub async fn open(self, device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
415        let read_only = self.options.read_only;
416        if self.format && read_only {
417            bail!("Cannot initialize a filesystem as read-only");
418        }
419
420        // Inline encryption requires barriers to be enabled.
421        if self.options.inline_crypto_enabled && !self.options.barriers_enabled {
422            bail!("A filesystem using inline encryption requires barriers");
423        }
424
425        let objects = Arc::new(ObjectManager::new(self.on_new_store));
426        let journal = Arc::new(Journal::new(objects.clone(), self.journal_options));
427
428        let image_builder_mode = self.options.image_builder_mode;
429
430        let block_size = std::cmp::max(device.block_size().into(), MIN_BLOCK_SIZE);
431        assert_eq!(block_size % MIN_BLOCK_SIZE, 0);
432        assert!(block_size <= MAX_BLOCK_SIZE, "Max supported block size is 64KiB");
433
434        let mut fsck_after_every_transaction = None;
435        let mut filesystem_options = self.options;
436        if self.fsck_after_every_transaction {
437            let instance =
438                FsckAfterEveryTransaction::new(filesystem_options.post_commit_hook.take());
439            fsck_after_every_transaction = Some(instance.clone());
440            filesystem_options.post_commit_hook =
441                Some(Box::new(move || Box::pin(instance.clone().run())));
442        }
443
444        if !read_only && !self.format {
445            // See comment in JournalRecord::DidFlushDevice for why we need to flush the device
446            // before replay.
447            device.flush().await.context("Device flush failed")?;
448        }
449
450        let filesystem = Arc::new_cyclic(|weak: &Weak<FxFilesystem>| {
451            let weak = weak.clone();
452            FxFilesystem {
453                device,
454                block_size,
455                objects: objects.clone(),
456                journal,
457                commit_mutex: futures::lock::Mutex::new(()),
458                lock_manager: LockManager::new(),
459                flush_task: Mutex::new(None),
460                background_tasks: fasync::Scope::new(),
461                closed: AtomicBool::new(true),
462                trace: self.trace,
463                graveyard: Graveyard::new(objects.clone()),
464                completed_transactions: metrics::detail().create_uint("completed_transactions", 0),
465                options: filesystem_options,
466                in_flight_transactions: AtomicU64::new(0),
467                transaction_limit_event: Event::new(),
468                _stores_node: metrics::register_fs(move || {
469                    let weak = weak.clone();
470                    Box::pin(async move {
471                        if let Some(fs) = weak.upgrade() {
472                            fs.populate_stores_node().await
473                        } else {
474                            Err(anyhow!("Filesystem has been dropped"))
475                        }
476                    })
477                }),
478            }
479        });
480
481        filesystem.journal().set_image_builder_mode(image_builder_mode);
482
483        filesystem.journal.set_trace(self.trace);
484        if self.format {
485            filesystem.journal.init_empty(filesystem.clone()).await?;
486            if image_builder_mode.is_none() {
487                // The filesystem isn't valid until superblocks are written but we want to defer
488                // that until last when migrating filesystems or building system images.
489                filesystem.journal.init_superblocks().await?;
490
491                // Start the graveyard's background reaping task.
492                filesystem.graveyard.clone().reap_async();
493            }
494
495            // Create the root volume directory.
496            let root_store = filesystem.root_store();
497            root_store.set_trace(self.trace);
498            let root_directory =
499                Directory::open(&root_store, root_store.root_directory_object_id())
500                    .await
501                    .context("Unable to open root volume directory")?;
502            let mut transaction = filesystem
503                .clone()
504                .new_transaction(
505                    lock_keys![LockKey::object(
506                        root_store.store_object_id(),
507                        root_directory.object_id()
508                    )],
509                    transaction::Options::default(),
510                )
511                .await?;
512            let volume_directory =
513                root_directory.create_child_dir(&mut transaction, VOLUMES_DIRECTORY).await?;
514            transaction.commit().await?;
515            objects.set_volume_directory(volume_directory);
516        } else {
517            filesystem
518                .journal
519                .replay(filesystem.clone(), self.on_new_allocator)
520                .await
521                .context("Journal replay failed")?;
522            filesystem.root_store().set_trace(self.trace);
523
524            if !read_only {
525                // Queue all purged entries for tombstoning.  Don't start the reaper yet because
526                // that can trigger a flush which can add more entries to the graveyard which might
527                // get caught in the initial reap and cause objects to be prematurely tombstoned.
528                for store in objects.unlocked_stores() {
529                    filesystem.graveyard.initial_reap(&store).await?;
530                }
531            }
532        }
533
534        // This must be after we've formatted the filesystem; it will fail during format otherwise.
535        if let Some(fsck_after_every_transaction) = fsck_after_every_transaction {
536            fsck_after_every_transaction
537                .fs
538                .set(Arc::downgrade(&filesystem))
539                .unwrap_or_else(|_| unreachable!());
540        }
541
542        filesystem.closed.store(false, Ordering::SeqCst);
543
544        if !read_only && image_builder_mode.is_none() {
545            // Start the background tasks.
546            filesystem.graveyard.clone().reap_async();
547
548            if filesystem.options.trim_config.is_some() {
549                filesystem.start_trim_task();
550            }
551            filesystem.start_clean_transfer_buffer_task();
552        }
553
554        Ok(filesystem.into())
555    }
556}
557
558pub struct FxFilesystem {
559    block_size: u64,
560    objects: Arc<ObjectManager>,
561    journal: Arc<Journal>,
562    commit_mutex: futures::lock::Mutex<()>,
563    lock_manager: LockManager,
564    flush_task: Mutex<Option<fasync::Task<()>>>,
565    background_tasks: fasync::Scope,
566    closed: AtomicBool,
567    // An event that is signalled when the filesystem starts to shut down.
568    trace: bool,
569    graveyard: Arc<Graveyard>,
570    completed_transactions: UintProperty,
571    options: Options,
572
573    // The number of in-flight transactions which we will limit to MAX_IN_FLIGHT_TRANSACTIONS.
574    in_flight_transactions: AtomicU64,
575
576    // An event that is used to wake up tasks that are blocked due to the in-flight transaction
577    // limit.
578    transaction_limit_event: Event,
579
580    // NOTE: This *must* go last so that when users take the device from a closed filesystem, the
581    // filesystem has dropped all other members first (Rust drops members in declaration order).
582    device: DeviceHolder,
583
584    // The "stores" node in the Inspect tree.
585    _stores_node: LazyNode,
586}
587
588#[fxfs_trace::trace]
589impl FxFilesystem {
590    pub async fn new_empty(device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
591        FxFilesystemBuilder::new().format(true).open(device).await
592    }
593
594    pub async fn open(device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
595        FxFilesystemBuilder::new().open(device).await
596    }
597
598    pub fn root_parent_store(&self) -> Arc<ObjectStore> {
599        self.objects.root_parent_store()
600    }
601
602    pub async fn close(&self) -> Result<(), Error> {
603        if self.journal().image_builder_mode().is_some() {
604            self.journal().allocate_journal().await?;
605            self.journal().set_image_builder_mode(None);
606            self.journal().force_compact().await?;
607        }
608        assert_eq!(self.closed.swap(true, Ordering::SeqCst), false);
609        debug_assert_not_too_long!(self.graveyard.wait_for_reap());
610        debug_assert_not_too_long!(self.background_tasks.clone().cancel());
611        self.journal.stop_compactions().await;
612        let sync_status =
613            if self.journal().image_builder_mode().is_some() || self.options().read_only {
614                Ok(None)
615            } else {
616                self.journal.sync(SyncOptions { flush_device: true, ..Default::default() }).await
617            };
618        match &sync_status {
619            Ok(None) => {}
620            Ok(checkpoint) => info!(
621                "Filesystem closed (checkpoint={}, metadata_reservation={:?}, \
622                 reservation_required={}, borrowed={})",
623                checkpoint.as_ref().unwrap().0.file_offset,
624                self.object_manager().metadata_reservation(),
625                self.object_manager().required_reservation(),
626                self.object_manager().borrowed_metadata_space(),
627            ),
628            Err(e) => error!(error:? = e; "Failed to sync filesystem; data may be lost"),
629        }
630        self.journal.terminate();
631        let flush_task = self.flush_task.lock().take();
632        if let Some(task) = flush_task {
633            debug_assert_not_too_long!(task);
634        }
635        // Regardless of whether sync succeeds, we should close the device, since otherwise we will
636        // crash instead of exiting gracefully.
637        self.device().close().await.context("Failed to close device")?;
638        sync_status.map(|_| ())
639    }
640
641    pub fn device(&self) -> Arc<dyn Device> {
642        Arc::clone(&self.device)
643    }
644
645    pub fn root_store(&self) -> Arc<ObjectStore> {
646        self.objects.root_store()
647    }
648
649    pub fn allocator(&self) -> Arc<Allocator> {
650        self.objects.allocator()
651    }
652
653    /// Enables allocations for the allocator.
654    /// This is only used in image_builder_mode where it *must*
655    /// be called before any allocations can take place.
656    pub fn enable_allocations(&self) {
657        self.allocator().enable_allocations();
658    }
659
660    pub fn object_manager(&self) -> &Arc<ObjectManager> {
661        &self.objects
662    }
663
664    pub fn journal(&self) -> &Arc<Journal> {
665        &self.journal
666    }
667
668    pub async fn sync(&self, options: SyncOptions<'_>) -> Result<(), Error> {
669        self.journal.sync(options).await.map(|_| ())
670    }
671
672    pub fn block_size(&self) -> u64 {
673        self.block_size
674    }
675
676    pub fn get_info(&self) -> Info {
677        Info {
678            total_bytes: self.device.size(),
679            used_bytes: self.object_manager().allocator().get_used_bytes().0,
680        }
681    }
682
683    pub fn super_block_header(&self) -> SuperBlockHeader {
684        self.journal.super_block_header()
685    }
686
687    pub fn graveyard(&self) -> &Arc<Graveyard> {
688        &self.graveyard
689    }
690
691    pub fn trace(&self) -> bool {
692        self.trace
693    }
694
695    pub fn options(&self) -> &Options {
696        &self.options
697    }
698
699    /// Returns a guard that must be taken before any transaction can commence.  This guard takes a
700    /// shared lock on the filesystem.  `fsck` will take an exclusive lock so that it can get a
701    /// consistent picture of the filesystem that it can verify.  It is important that this lock is
702    /// acquired before *all* other locks.  It is also important that this lock is not taken twice
703    /// by the same task since that can lead to deadlocks if another task tries to take a write
704    /// lock.
705    pub async fn txn_guard(self: Arc<Self>) -> TxnGuard<'static> {
706        TxnGuard::Owned(
707            self.lock_manager
708                .read_lock(lock_keys!(LockKey::Filesystem))
709                .await
710                .into_owned(self.clone()),
711        )
712    }
713
714    pub async fn new_transaction<'a>(
715        self: Arc<Self>,
716        locks: LockKeys,
717        options: transaction::Options<'a>,
718    ) -> Result<Transaction<'a>, Error> {
719        let guard = if let Some(guard) = options.txn_guard.as_ref() {
720            TxnGuard::Borrowed(guard)
721        } else {
722            self.txn_guard().await
723        };
724        Transaction::new(guard, options, locks).await
725    }
726
727    #[trace]
728    pub async fn commit_transaction(
729        &self,
730        transaction: &mut Transaction<'_>,
731        callback: &mut (dyn FnMut(u64) + Send),
732    ) -> Result<u64, Error> {
733        if let Some(hook) = self.options.pre_commit_hook.as_ref() {
734            hook(transaction)?;
735        }
736        debug_assert_not_too_long!(self.lock_manager.commit_prepare(&transaction));
737        self.maybe_start_flush_task();
738        let _guard = debug_assert_not_too_long!(self.commit_mutex.lock());
739        let journal_offset = if self.journal().image_builder_mode().is_some() {
740            let journal_checkpoint =
741                JournalCheckpoint { file_offset: 0, checksum: 0, version: LATEST_VERSION };
742            let maybe_mutation = self
743                .object_manager()
744                .apply_transaction(transaction, &journal_checkpoint)
745                .expect("Transactions must not fail in image_builder_mode");
746            if let Some(mutation) = maybe_mutation {
747                assert!(matches!(mutation, Mutation::UpdateBorrowed(_)));
748                // These are Mutation::UpdateBorrowed which are normally used to track borrowing of
749                // metadata reservations. As we are image-building and not using the journal,
750                // we don't track this.
751            }
752            self.object_manager().did_commit_transaction(transaction, &journal_checkpoint, 0);
753            0
754        } else {
755            self.journal.commit(transaction).await?
756        };
757        self.completed_transactions.add(1);
758
759        // For now, call the callback whilst holding the lock.  Technically, we don't need to do
760        // that except if there's a post-commit-hook (which there usually won't be).  We can
761        // consider changing this if we need to for performance, but we'd need to double check that
762        // callers don't depend on this.
763        callback(journal_offset);
764
765        if let Some(hook) = self.options.post_commit_hook.as_ref() {
766            hook().await;
767        }
768
769        Ok(journal_offset)
770    }
771
772    pub fn lock_manager(&self) -> &LockManager {
773        &self.lock_manager
774    }
775
776    pub(crate) fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
777        if !matches!(transaction.metadata_reservation, MetadataReservation::None) {
778            self.sub_transaction();
779        }
780        // If we placed a hold for metadata space, return it now.
781        if let MetadataReservation::Hold(hold_amount) =
782            std::mem::replace(&mut transaction.metadata_reservation, MetadataReservation::None)
783        {
784            let hold = transaction
785                .allocator_reservation
786                .unwrap()
787                .reserve(0)
788                .expect("Zero should always succeed.");
789            hold.add(hold_amount);
790        }
791        self.objects.drop_transaction(transaction);
792        self.lock_manager.drop_transaction(transaction);
793    }
794
795    fn maybe_start_flush_task(&self) {
796        if self.journal.image_builder_mode().is_some() {
797            return;
798        }
799        let mut flush_task = self.flush_task.lock();
800        if flush_task.is_none() {
801            let journal = self.journal.clone();
802            *flush_task = Some(fasync::Task::spawn(
803                journal.flush_task().trace(trace_future_args!("Journal::flush_task")),
804            ));
805        }
806    }
807
808    fn start_trim_task(self: &Arc<Self>) {
809        if !self.device.supports_trim() {
810            info!("Device does not support trim; not scheduling trimming");
811            return;
812        }
813        let this = self.clone();
814        self.background_tasks
815            .spawn(this.trim_task().trace(trace_future_args!("Filesystem::trim_task")));
816    }
817
818    async fn trim_task(self: Arc<Self>) {
819        // This task will be cancelled when the filesystem is closed.
820        let Some((mut next_timer, _)) = self.options.trim_config else { return };
821        loop {
822            fasync::Timer::new(next_timer.clone()).await;
823
824            // The timer has fired indicating a trim is now due.  If we have a power manager, we
825            // now check to see if there's an external power source.
826            let start = Instant::now();
827            let result = if let Some(pm) = &self.options.power_manager {
828                let mut watcher = pm.clone().watch_battery();
829
830                // The pauser starts paused.
831                let pauser = Pauser::new(self.options.trim_charger_wait);
832
833                let mut pause_future = pin!(
834                    async {
835                        let mut wake_lease = WakeLease::invalid();
836                        loop {
837                            let Some((using_battery, new_lease)) = watcher.next_latest().await
838                            else {
839                                // If we lose the connection to the watcher, unpause and do not
840                                // worry about monitoring the power source.
841                                pauser.set_pause(false);
842                                drop(wake_lease); // Silence the compiler warnings.
843                                return;
844                            };
845
846                            // Pause if the device is using battery.
847                            pauser.set_pause(using_battery);
848
849                            // Hold onto a wake lease if we are using an external power source (and
850                            // we are therefore unpaused).
851                            if using_battery {
852                                wake_lease = WakeLease::invalid();
853                            } else if !new_lease.is_invalid() {
854                                wake_lease = new_lease;
855                            }
856                        }
857                    }
858                    .fuse()
859                );
860
861                let mut do_trim = pin!(self.do_trim(Some(&pauser)).fuse());
862
863                loop {
864                    futures::select! {
865                        _ = pause_future => {}
866                        result = do_trim => break result,
867                    }
868                }
869
870                // Now that trim has completed, we don't need to watch the power source any more, so
871                // we just drop the pauser and the future monitoring the power source.
872            } else {
873                self.do_trim(None).await
874            };
875
876            let duration = start.elapsed();
877            match result {
878                Ok(bytes_trimmed) => info!(
879                    "Trimmed {bytes_trimmed} bytes in {duration:?}.  Next trim in \
880                     {next_timer:?}",
881                ),
882                Err(error) => error!(error:?; "Failed to trim"),
883            }
884
885            let Some((_, interval)) = self.options.trim_config else { return };
886            next_timer = interval;
887            if next_timer.is_zero() {
888                fasync::yield_now().await;
889            }
890        }
891    }
892
893    // Returns the number of bytes trimmed.
894    async fn do_trim(&self, pauser: Option<&Pauser>) -> Result<usize, Error> {
895        const MAX_EXTENTS_PER_BATCH: usize = 8;
896        const MAX_EXTENT_SIZE: usize = 256 * 1024;
897        let mut offset = 0;
898        let mut bytes_trimmed = 0;
899        loop {
900            let allocator = self.allocator();
901            if let Some(pauser) = pauser {
902                pauser.maybe_pause().await;
903            }
904            let trimmable_extents =
905                allocator.take_for_trimming(offset, MAX_EXTENT_SIZE, MAX_EXTENTS_PER_BATCH).await?;
906            for device_range in trimmable_extents.extents() {
907                self.device.trim(device_range.clone()).await?;
908                bytes_trimmed += device_range.length()? as usize;
909            }
910            if let Some(device_range) = trimmable_extents.extents().last() {
911                offset = device_range.end;
912            } else {
913                break;
914            }
915        }
916        Ok(bytes_trimmed)
917    }
918
919    fn start_clean_transfer_buffer_task(self: &Arc<Self>) {
920        let this = self.clone();
921        self.background_tasks.spawn(
922            async move {
923                loop {
924                    fasync::Timer::new(CLEAN_TRANSFER_BUFFER_INTERVAL).await;
925                    this.device().clean_transfer_buffer();
926                }
927            }
928            .trace(trace_future_args!("Filesystem::clean_transfer_buffer_task")),
929        );
930    }
931
932    pub(crate) async fn reservation_for_transaction<'a>(
933        self: &Arc<Self>,
934        options: transaction::Options<'a>,
935    ) -> Result<(MetadataReservation, Option<&'a Reservation>, Option<Hold<'a>>), Error> {
936        if self.options.image_builder_mode.is_some() {
937            // Image builder mode avoids the journal so reservation tracking for metadata overheads
938            // doesn't make sense and so we essentially have 'all or nothing' semantics instead.
939            return Ok((MetadataReservation::Borrowed, None, None));
940        }
941        if !options.skip_journal_checks {
942            self.maybe_start_flush_task();
943            self.journal.check_journal_space().await?;
944        }
945
946        // We support three options for metadata space reservation:
947        //
948        //   1. We can borrow from the filesystem's metadata reservation.  This should only be
949        //      be used on the understanding that eventually, potentially after a full compaction,
950        //      there should be no net increase in space used.  For example, unlinking an object
951        //      should eventually decrease the amount of space used and setting most attributes
952        //      should not result in any change.
953        //
954        //   2. A reservation is provided in which case we'll place a hold on some of it for
955        //      metadata.
956        //
957        //   3. No reservation is supplied, so we try and reserve space with the allocator now,
958        //      and will return NoSpace if that fails.
959        let mut hold = None;
960        let metadata_reservation = if options.borrow_metadata_space {
961            MetadataReservation::Borrowed
962        } else {
963            match options.allocator_reservation {
964                Some(reservation) => {
965                    hold = Some(
966                        reservation
967                            .reserve(TRANSACTION_METADATA_MAX_AMOUNT)
968                            .ok_or(FxfsError::NoSpace)?,
969                    );
970                    MetadataReservation::Hold(TRANSACTION_METADATA_MAX_AMOUNT)
971                }
972                None => {
973                    let reservation = self
974                        .allocator()
975                        .reserve(None, TRANSACTION_METADATA_MAX_AMOUNT)
976                        .ok_or(FxfsError::NoSpace)?;
977                    MetadataReservation::Reservation(reservation)
978                }
979            }
980        };
981        Ok((metadata_reservation, options.allocator_reservation, hold))
982    }
983
984    pub(crate) async fn add_transaction(&self, skip_journal_checks: bool) {
985        if skip_journal_checks {
986            self.in_flight_transactions.fetch_add(1, Ordering::Relaxed);
987        } else {
988            let inc = || {
989                let mut in_flights = self.in_flight_transactions.load(Ordering::Relaxed);
990                while in_flights < MAX_IN_FLIGHT_TRANSACTIONS {
991                    match self.in_flight_transactions.compare_exchange_weak(
992                        in_flights,
993                        in_flights + 1,
994                        Ordering::Relaxed,
995                        Ordering::Relaxed,
996                    ) {
997                        Ok(_) => return true,
998                        Err(x) => in_flights = x,
999                    }
1000                }
1001                return false;
1002            };
1003            while !inc() {
1004                let listener = self.transaction_limit_event.listen();
1005                if inc() {
1006                    break;
1007                }
1008                listener.await;
1009            }
1010        }
1011    }
1012
1013    pub(crate) fn sub_transaction(&self) {
1014        let old = self.in_flight_transactions.fetch_sub(1, Ordering::Relaxed);
1015        assert!(old != 0);
1016        if old <= MAX_IN_FLIGHT_TRANSACTIONS {
1017            self.transaction_limit_event.notify(usize::MAX);
1018        }
1019    }
1020
1021    pub async fn truncate_guard(&self, store_id: u64, object_id: u64) -> TruncateGuard<'_> {
1022        let keys = lock_keys![LockKey::truncate(store_id, object_id,)];
1023        TruncateGuard(self.lock_manager().write_lock(keys).await)
1024    }
1025
1026    async fn populate_stores_node(&self) -> Result<Inspector, Error> {
1027        let inspector = fuchsia_inspect::Inspector::default();
1028        let root = inspector.root();
1029        root.record_child("__root", |n| self.root_store().record_data(n));
1030        root.record_child("__root_parent", |n| self.root_parent_store().record_data(n));
1031        let object_manager = self.object_manager();
1032        let volume_directory = object_manager.volume_directory();
1033        let layer_set = volume_directory.store().tree().layer_set();
1034        let mut merger = layer_set.merger();
1035        let mut iter = volume_directory.iter(&mut merger).await?;
1036        while let Some((name, id, _)) = iter.get() {
1037            if let Some(store) = object_manager.store(id) {
1038                root.record_child(name.to_string(), |n| store.record_data(n));
1039            }
1040            iter.advance().await?;
1041        }
1042        Ok(inspector)
1043    }
1044}
1045
1046pub enum TxnGuard<'a> {
1047    Borrowed(&'a TxnGuard<'a>),
1048    Owned(ReadGuard<'static>),
1049}
1050
1051impl TxnGuard<'_> {
1052    pub fn fs(&self) -> &Arc<FxFilesystem> {
1053        match self {
1054            TxnGuard::Borrowed(b) => b.fs(),
1055            TxnGuard::Owned(o) => o.fs().unwrap(),
1056        }
1057    }
1058}
1059
1060/// A wrapper around a guard that needs to be taken when truncating an object.
1061#[allow(dead_code)]
1062pub struct TruncateGuard<'a>(WriteGuard<'a>);
1063
1064/// Helper method for making a new filesystem.
1065pub async fn mkfs(device: DeviceHolder) -> Result<DeviceHolder, Error> {
1066    let fs = FxFilesystem::new_empty(device).await?;
1067    fs.close().await?;
1068    Ok(fs.take_device().await)
1069}
1070
1071/// Helper method for making a new filesystem with a single named volume.
1072/// This shouldn't be used in production; instead volumes should be created with the Volumes
1073/// protocol.
1074pub async fn mkfs_with_volume(
1075    device: DeviceHolder,
1076    volume_name: &str,
1077    crypt: Option<Arc<dyn Crypt>>,
1078) -> Result<DeviceHolder, Error> {
1079    let fs = FxFilesystem::new_empty(device).await?;
1080    {
1081        // expect instead of propagating errors here, since otherwise we could drop |fs| before
1082        // close is called, which leads to confusing and unrelated error messages.
1083        let root_volume = root_volume(fs.clone()).await.expect("Open root_volume failed");
1084        root_volume
1085            .new_volume(
1086                volume_name,
1087                NewChildStoreOptions {
1088                    options: StoreOptions { crypt, ..StoreOptions::default() },
1089                    ..Default::default()
1090                },
1091            )
1092            .await
1093            .expect("Create volume failed");
1094    }
1095    fs.close().await?;
1096    Ok(fs.take_device().await)
1097}
1098
1099struct FsckAfterEveryTransaction {
1100    fs: OnceLock<Weak<FxFilesystem>>,
1101    old_hook: PostCommitHook,
1102}
1103
1104impl FsckAfterEveryTransaction {
1105    fn new(old_hook: PostCommitHook) -> Arc<Self> {
1106        Arc::new(Self { fs: OnceLock::new(), old_hook })
1107    }
1108
1109    async fn run(self: Arc<Self>) {
1110        if let Some(fs) = self.fs.get().and_then(Weak::upgrade) {
1111            let options = FsckOptions {
1112                fail_on_warning: true,
1113                no_lock: true,
1114                quiet: true,
1115                ..Default::default()
1116            };
1117            fsck_with_options(fs.clone(), &options).await.expect("fsck failed");
1118            let object_manager = fs.object_manager();
1119            for store in object_manager.unlocked_stores() {
1120                let store_id = store.store_object_id();
1121                if !object_manager.is_system_store(store_id) {
1122                    fsck_volume_with_options(fs.as_ref(), &options, store_id, None)
1123                        .await
1124                        .expect("fsck_volume_with_options failed");
1125                }
1126            }
1127        }
1128        if let Some(old_hook) = self.old_hook.as_ref() {
1129            old_hook().await;
1130        }
1131    }
1132}
1133
1134struct Pauser {
1135    pause: Condition<bool>,
1136    bounce_delay: Duration,
1137}
1138
1139impl Pauser {
1140    /// Returns a new Pauser which starts paused.
1141    fn new(bounce_delay: Duration) -> Self {
1142        Self { pause: Condition::new(true), bounce_delay }
1143    }
1144
1145    async fn maybe_pause(&self) {
1146        loop {
1147            if !*self.pause.lock() {
1148                return;
1149            }
1150            self.pause.when(|p| if *p { Poll::Pending } else { Poll::Ready(()) }).await;
1151            fasync::Timer::new(self.bounce_delay).await;
1152        }
1153    }
1154
1155    fn set_pause(&self, v: bool) {
1156        let mut guard = self.pause.lock();
1157        if *guard == v {
1158            return;
1159        }
1160        *guard = v;
1161        for waker in guard.drain_wakers() {
1162            waker.wake();
1163        }
1164    }
1165}
1166
1167trait NextLatest: Stream + Unpin {
1168    /// Gets the next item from the stream, but if multiple items are ready, returns the latest.
1169    async fn next_latest(&mut self) -> Option<Self::Item> {
1170        let Some(mut next) = self.next().await else { return None };
1171
1172        // Coalesce with any subsequent items that are ready.
1173        loop {
1174            match self.next().now_or_never() {
1175                None => return Some(next),
1176                Some(None) => return None,
1177                Some(Some(n)) => next = n,
1178            }
1179        }
1180    }
1181}
1182
1183impl<T: ?Sized + Unpin> NextLatest for T where T: Stream {}
1184
1185#[cfg(test)]
1186mod tests {
1187    use super::{FxFilesystem, FxFilesystemBuilder, FxfsError, SyncOptions};
1188    use crate::fsck::{fsck, fsck_volume};
1189    use crate::log::*;
1190    use crate::lsm_tree::Operation;
1191    use crate::lsm_tree::types::Item;
1192    use crate::object_handle::{
1193        INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle, WriteObjectHandle,
1194    };
1195    use crate::object_store::directory::{Directory, replace_child};
1196    use crate::object_store::journal::JournalOptions;
1197    use crate::object_store::journal::super_block::SuperBlockInstance;
1198    use crate::object_store::transaction::{LockKey, Options, lock_keys};
1199    use crate::object_store::volume::root_volume;
1200    use crate::object_store::{
1201        HandleOptions, NewChildStoreOptions, ObjectDescriptor, ObjectStore, StoreOptions,
1202    };
1203    use crate::range::RangeExt;
1204    use fuchsia_async as fasync;
1205    use fuchsia_sync::Mutex;
1206    use futures::future::join_all;
1207    use futures::stream::{FuturesUnordered, TryStreamExt};
1208    use fxfs_insecure_crypto::new_insecure_crypt;
1209    use rustc_hash::FxHashMap as HashMap;
1210    use std::ops::Range;
1211    use std::sync::Arc;
1212    use std::sync::atomic::{AtomicU32, Ordering};
1213    use std::time::Duration;
1214    use storage_device::DeviceHolder;
1215    use storage_device::fake_device::{self, FakeDevice};
1216    use test_case::test_case;
1217
1218    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
1219
1220    #[fuchsia::test(threads = 10)]
1221    async fn test_compaction() {
1222        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1223
1224        // If compaction is not working correctly, this test will run out of space.
1225        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1226        let root_store = fs.root_store();
1227        let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1228            .await
1229            .expect("open failed");
1230
1231        let mut tasks = Vec::new();
1232        for i in 0..2 {
1233            let mut transaction = fs
1234                .clone()
1235                .new_transaction(
1236                    lock_keys![LockKey::object(
1237                        root_store.store_object_id(),
1238                        root_directory.object_id()
1239                    )],
1240                    Options::default(),
1241                )
1242                .await
1243                .expect("new_transaction failed");
1244            let handle = root_directory
1245                .create_child_file(&mut transaction, &format!("{}", i))
1246                .await
1247                .expect("create_child_file failed");
1248            transaction.commit().await.expect("commit failed");
1249            tasks.push(fasync::Task::spawn(async move {
1250                const TEST_DATA: &[u8] = b"hello";
1251                let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
1252                buf.as_mut_slice().copy_from_slice(TEST_DATA);
1253                for _ in 0..1500 {
1254                    handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1255                }
1256            }));
1257        }
1258        join_all(tasks).await;
1259        fs.sync(SyncOptions::default()).await.expect("sync failed");
1260
1261        fsck(fs.clone()).await.expect("fsck failed");
1262        fs.close().await.expect("Close failed");
1263    }
1264
1265    #[fuchsia::test]
1266    async fn test_enable_allocations() {
1267        // 1. enable_allocations() has no impact if image_builder_mode is not used.
1268        {
1269            let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1270            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1271            fs.enable_allocations();
1272            let root_store = fs.root_store();
1273            let root_directory =
1274                Directory::open(&root_store, root_store.root_directory_object_id())
1275                    .await
1276                    .expect("open failed");
1277            let mut transaction = fs
1278                .clone()
1279                .new_transaction(
1280                    lock_keys![LockKey::object(
1281                        root_store.store_object_id(),
1282                        root_directory.object_id()
1283                    )],
1284                    Options::default(),
1285                )
1286                .await
1287                .expect("new_transaction failed");
1288            root_directory
1289                .create_child_file(&mut transaction, "test")
1290                .await
1291                .expect("create_child_file failed");
1292            transaction.commit().await.expect("commit failed");
1293            fs.close().await.expect("close failed");
1294        }
1295
1296        // 2. Allocations blow up if done before this call (in image_builder_mode), but work after
1297        {
1298            let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1299            let fs = FxFilesystemBuilder::new()
1300                .format(true)
1301                .image_builder_mode(Some(SuperBlockInstance::A))
1302                .open(device)
1303                .await
1304                .expect("open failed");
1305            let root_store = fs.root_store();
1306            let root_directory =
1307                Directory::open(&root_store, root_store.root_directory_object_id())
1308                    .await
1309                    .expect("open failed");
1310
1311            let mut transaction = fs
1312                .clone()
1313                .new_transaction(
1314                    lock_keys![LockKey::object(
1315                        root_store.store_object_id(),
1316                        root_directory.object_id()
1317                    )],
1318                    Options::default(),
1319                )
1320                .await
1321                .expect("new_transaction failed");
1322            let handle = root_directory
1323                .create_child_file(&mut transaction, "test_fail")
1324                .await
1325                .expect("create_child_file failed");
1326            transaction.commit().await.expect("commit failed");
1327
1328            // Allocations should fail before enable_allocations()
1329            assert!(
1330                FxfsError::Unavailable
1331                    .matches(&handle.allocate(0..4096).await.expect_err("allocate should fail"))
1332            );
1333
1334            // Allocations should work after enable_allocations()
1335            fs.enable_allocations();
1336            handle.allocate(0..4096).await.expect("allocate should work after enable_allocations");
1337
1338            // 3. finalize() works regardless of whether enable_allocations() is called.
1339            // (We already called it above, so this verifies it works after it was called).
1340
1341            fs.close().await.expect("close failed");
1342        }
1343        // TODO(https://fxbug.dev/467401079): Add a failure test where we close without
1344        // enabling allocations. (Trivial to do, but causes error logs, which are interpreted as
1345        // test failures and only seem controllable at the BUILD target level).
1346    }
1347
1348    #[fuchsia::test(threads = 10)]
1349    async fn test_replay_is_identical() {
1350        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1351        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1352
1353        // Reopen the store, but set reclaim size to a very large value which will effectively
1354        // stop the journal from flushing and allows us to track all the mutations to the store.
1355        fs.close().await.expect("close failed");
1356        let device = fs.take_device().await;
1357        device.reopen(false);
1358
1359        struct Mutations<K, V>(Mutex<Vec<(Operation, Item<K, V>)>>);
1360
1361        impl<K: Clone, V: Clone> Mutations<K, V> {
1362            fn new() -> Self {
1363                Mutations(Mutex::new(Vec::new()))
1364            }
1365
1366            fn push(&self, operation: Operation, item: &Item<K, V>) {
1367                self.0.lock().push((operation, item.clone()));
1368            }
1369        }
1370
1371        let open_fs = |device,
1372                       object_mutations: Arc<Mutex<HashMap<_, _>>>,
1373                       allocator_mutations: Arc<Mutations<_, _>>| async {
1374            FxFilesystemBuilder::new()
1375                .journal_options(JournalOptions { reclaim_size: u64::MAX, ..Default::default() })
1376                .on_new_allocator(move |allocator| {
1377                    let allocator_mutations = allocator_mutations.clone();
1378                    allocator.tree().set_mutation_callback(Some(Box::new(move |op, item| {
1379                        allocator_mutations.push(op, item)
1380                    })));
1381                })
1382                .on_new_store(move |store| {
1383                    let mutations = Arc::new(Mutations::new());
1384                    object_mutations.lock().insert(store.store_object_id(), mutations.clone());
1385                    store.tree().set_mutation_callback(Some(Box::new(move |op, item| {
1386                        mutations.push(op, item)
1387                    })));
1388                })
1389                .open(device)
1390                .await
1391                .expect("open failed")
1392        };
1393
1394        let allocator_mutations = Arc::new(Mutations::new());
1395        let object_mutations = Arc::new(Mutex::new(HashMap::default()));
1396        let fs = open_fs(device, object_mutations.clone(), allocator_mutations.clone()).await;
1397
1398        let root_store = fs.root_store();
1399        let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1400            .await
1401            .expect("open failed");
1402
1403        let mut transaction = fs
1404            .clone()
1405            .new_transaction(
1406                lock_keys![LockKey::object(
1407                    root_store.store_object_id(),
1408                    root_directory.object_id()
1409                )],
1410                Options::default(),
1411            )
1412            .await
1413            .expect("new_transaction failed");
1414        let object = root_directory
1415            .create_child_file(&mut transaction, "test")
1416            .await
1417            .expect("create_child_file failed");
1418        transaction.commit().await.expect("commit failed");
1419
1420        // Append some data.
1421        let buf = object.allocate_buffer(10000).await;
1422        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1423
1424        // Overwrite some data.
1425        object.write_or_append(Some(5000), buf.as_ref()).await.expect("write failed");
1426
1427        // Truncate.
1428        object.truncate(3000).await.expect("truncate failed");
1429
1430        // Delete the object.
1431        let mut transaction = fs
1432            .clone()
1433            .new_transaction(
1434                lock_keys![
1435                    LockKey::object(root_store.store_object_id(), root_directory.object_id()),
1436                    LockKey::object(root_store.store_object_id(), object.object_id()),
1437                ],
1438                Options::default(),
1439            )
1440            .await
1441            .expect("new_transaction failed");
1442
1443        replace_child(&mut transaction, None, (&root_directory, "test"))
1444            .await
1445            .expect("replace_child failed");
1446
1447        transaction.commit().await.expect("commit failed");
1448
1449        // Finally tombstone the object.
1450        root_store
1451            .tombstone_object(object.object_id(), Options::default())
1452            .await
1453            .expect("tombstone failed");
1454
1455        // Now reopen and check that replay produces the same set of mutations.
1456        fs.close().await.expect("close failed");
1457
1458        let metadata_reservation_amount = fs.object_manager().metadata_reservation().amount();
1459
1460        let device = fs.take_device().await;
1461        device.reopen(false);
1462
1463        let replayed_object_mutations = Arc::new(Mutex::new(HashMap::default()));
1464        let replayed_allocator_mutations = Arc::new(Mutations::new());
1465        let fs = open_fs(
1466            device,
1467            replayed_object_mutations.clone(),
1468            replayed_allocator_mutations.clone(),
1469        )
1470        .await;
1471
1472        let m1 = object_mutations.lock();
1473        let m2 = replayed_object_mutations.lock();
1474        assert_eq!(m1.len(), m2.len());
1475        for (store_id, mutations) in &*m1 {
1476            let mutations = mutations.0.lock();
1477            let replayed = m2.get(&store_id).expect("Found unexpected store").0.lock();
1478            assert_eq!(mutations.len(), replayed.len());
1479            for ((op1, i1), (op2, i2)) in mutations.iter().zip(replayed.iter()) {
1480                assert_eq!(op1, op2);
1481                assert_eq!(i1.key, i2.key);
1482                assert_eq!(i1.value, i2.value);
1483                assert_eq!(i1.sequence, i2.sequence);
1484            }
1485        }
1486
1487        let a1 = allocator_mutations.0.lock();
1488        let a2 = replayed_allocator_mutations.0.lock();
1489        assert_eq!(a1.len(), a2.len());
1490        for ((op1, i1), (op2, i2)) in a1.iter().zip(a2.iter()) {
1491            assert_eq!(op1, op2);
1492            assert_eq!(i1.key, i2.key);
1493            assert_eq!(i1.value, i2.value);
1494            assert_eq!(i1.sequence, i2.sequence);
1495        }
1496
1497        assert_eq!(
1498            fs.object_manager().metadata_reservation().amount(),
1499            metadata_reservation_amount
1500        );
1501    }
1502
1503    #[fuchsia::test]
1504    async fn test_max_in_flight_transactions() {
1505        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1506        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1507
1508        let transactions = FuturesUnordered::new();
1509        for _ in 0..super::MAX_IN_FLIGHT_TRANSACTIONS {
1510            transactions.push(fs.clone().new_transaction(lock_keys![], Options::default()));
1511        }
1512        let mut transactions: Vec<_> = transactions.try_collect().await.unwrap();
1513
1514        // Trying to create another one should be blocked.
1515        let mut fut = std::pin::pin!(fs.clone().new_transaction(lock_keys![], Options::default()));
1516        assert!(futures::poll!(&mut fut).is_pending());
1517
1518        // Dropping one should allow it to proceed.
1519        transactions.pop();
1520
1521        assert!(futures::poll!(&mut fut).is_ready());
1522    }
1523
1524    // If run on a single thread, the trim tasks starve out other work.
1525    #[fuchsia::test(threads = 10)]
1526    async fn test_continuously_trim() {
1527        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1528        let fs = FxFilesystemBuilder::new()
1529            .trim_config(Some((Duration::ZERO, Duration::ZERO)))
1530            .format(true)
1531            .open(device)
1532            .await
1533            .expect("open failed");
1534        // Do a small sleep so trim has time to get going.
1535        fasync::Timer::new(Duration::from_millis(10)).await;
1536
1537        // Create and delete a bunch of files whilst trim is ongoing.  This just ensures that
1538        // regular usage isn't affected by trim.
1539        let root_store = fs.root_store();
1540        let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1541            .await
1542            .expect("open failed");
1543        for _ in 0..100 {
1544            let mut transaction = fs
1545                .clone()
1546                .new_transaction(
1547                    lock_keys![LockKey::object(
1548                        root_store.store_object_id(),
1549                        root_directory.object_id()
1550                    )],
1551                    Options::default(),
1552                )
1553                .await
1554                .expect("new_transaction failed");
1555            let object = root_directory
1556                .create_child_file(&mut transaction, "test")
1557                .await
1558                .expect("create_child_file failed");
1559            transaction.commit().await.expect("commit failed");
1560
1561            {
1562                let buf = object.allocate_buffer(1024).await;
1563                object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1564            }
1565            std::mem::drop(object);
1566
1567            let mut transaction = root_directory
1568                .acquire_context_for_replace(None, "test", true)
1569                .await
1570                .expect("acquire_context_for_replace failed")
1571                .transaction;
1572            replace_child(&mut transaction, None, (&root_directory, "test"))
1573                .await
1574                .expect("replace_child failed");
1575            transaction.commit().await.expect("commit failed");
1576        }
1577        fs.close().await.expect("close failed");
1578    }
1579
1580    #[test_case(true; "test power fail with barriers")]
1581    #[test_case(false; "test power fail with checksums")]
1582    #[fuchsia::test]
1583    async fn test_power_fail(barriers_enabled: bool) {
1584        // This test randomly discards blocks, so we run it a few times to increase the chances
1585        // of catching an issue in a single run.
1586        for _ in 0..10 {
1587            let (store_id, device, test_file_object_id) = {
1588                let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
1589                let fs = if barriers_enabled {
1590                    FxFilesystemBuilder::new()
1591                        .barriers_enabled(true)
1592                        .format(true)
1593                        .open(device)
1594                        .await
1595                        .expect("new filesystem failed")
1596                } else {
1597                    FxFilesystem::new_empty(device).await.expect("new_empty failed")
1598                };
1599                let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
1600
1601                fs.sync(SyncOptions { flush_device: true, ..SyncOptions::default() })
1602                    .await
1603                    .expect("sync failed");
1604
1605                let store = root_volume
1606                    .new_volume(
1607                        "test",
1608                        NewChildStoreOptions {
1609                            options: StoreOptions {
1610                                crypt: Some(Arc::new(new_insecure_crypt())),
1611                                ..StoreOptions::default()
1612                            },
1613                            ..Default::default()
1614                        },
1615                    )
1616                    .await
1617                    .expect("new_volume failed");
1618                let root_directory = Directory::open(&store, store.root_directory_object_id())
1619                    .await
1620                    .expect("open failed");
1621
1622                // Create a number of files with the goal of using up more than one journal block.
1623                async fn create_files(store: &Arc<ObjectStore>, prefix: &str) {
1624                    let fs = store.filesystem();
1625                    let root_directory = Directory::open(store, store.root_directory_object_id())
1626                        .await
1627                        .expect("open failed");
1628                    for i in 0..100 {
1629                        let mut transaction = fs
1630                            .clone()
1631                            .new_transaction(
1632                                lock_keys![LockKey::object(
1633                                    store.store_object_id(),
1634                                    store.root_directory_object_id()
1635                                )],
1636                                Options::default(),
1637                            )
1638                            .await
1639                            .expect("new_transaction failed");
1640                        root_directory
1641                            .create_child_file(&mut transaction, &format!("{prefix} {i}"))
1642                            .await
1643                            .expect("create_child_file failed");
1644                        transaction.commit().await.expect("commit failed");
1645                    }
1646                }
1647
1648                // Create one batch of files.
1649                create_files(&store, "A").await;
1650
1651                // Create a file and write something to it.  This will make sure there's a
1652                // transaction present that includes a checksum.
1653                let mut transaction = fs
1654                    .clone()
1655                    .new_transaction(
1656                        lock_keys![LockKey::object(
1657                            store.store_object_id(),
1658                            store.root_directory_object_id()
1659                        )],
1660                        Options::default(),
1661                    )
1662                    .await
1663                    .expect("new_transaction failed");
1664                let object = root_directory
1665                    .create_child_file(&mut transaction, "test")
1666                    .await
1667                    .expect("create_child_file failed");
1668                transaction.commit().await.expect("commit failed");
1669
1670                let mut transaction =
1671                    object.new_transaction().await.expect("new_transaction failed");
1672                let mut buffer = object.allocate_buffer(4096).await;
1673                buffer.as_mut_slice().fill(0xed);
1674                object
1675                    .txn_write(&mut transaction, 0, buffer.as_ref())
1676                    .await
1677                    .expect("txn_write failed");
1678                transaction.commit().await.expect("commit failed");
1679
1680                // Create another batch of files.
1681                create_files(&store, "B").await;
1682
1683                // Sync the device, but don't flush the device. We want to do this so we can
1684                // randomly discard blocks below.
1685                fs.sync(SyncOptions::default()).await.expect("sync failed");
1686
1687                // When we call `sync` above on the filesystem, it will pad the journal so that it
1688                // will get written, but it doesn't wait for the write to occur.  We wait for a
1689                // short time here to give allow time for the journal to be written.  Adding timers
1690                // isn't great, but this test already isn't deterministic since we randomly discard
1691                // blocks.
1692                fasync::Timer::new(Duration::from_millis(10)).await;
1693
1694                (
1695                    store.store_object_id(),
1696                    fs.device().snapshot().expect("snapshot failed"),
1697                    object.object_id(),
1698                )
1699            };
1700
1701            // Randomly discard blocks since the last flush.  This simulates what might happen in
1702            // the case of power-loss.  This will be an uncontrolled unmount.
1703            device
1704                .discard_random_since_last_flush()
1705                .expect("discard_random_since_last_flush failed");
1706
1707            let fs = FxFilesystem::open(device).await.expect("open failed");
1708            fsck(fs.clone()).await.expect("fsck failed");
1709
1710            let mut check_test_file = false;
1711
1712            // If we replayed and the store exists (i.e. the transaction that created the store
1713            // made it out), start by running fsck on it.
1714            let object_id = if fs.object_manager().store(store_id).is_some() {
1715                fsck_volume(&fs, store_id, Some(Arc::new(new_insecure_crypt())))
1716                    .await
1717                    .expect("fsck_volume failed");
1718
1719                // Now we want to create another file, unmount cleanly, and then finally check that
1720                // the new file exists.  This checks that we can continue to use the filesystem
1721                // after an unclean unmount.
1722                let store = root_volume(fs.clone())
1723                    .await
1724                    .expect("root_volume failed")
1725                    .volume(
1726                        "test",
1727                        StoreOptions {
1728                            crypt: Some(Arc::new(new_insecure_crypt())),
1729                            ..StoreOptions::default()
1730                        },
1731                    )
1732                    .await
1733                    .expect("volume failed");
1734
1735                let root_directory = Directory::open(&store, store.root_directory_object_id())
1736                    .await
1737                    .expect("open failed");
1738
1739                let mut transaction = fs
1740                    .clone()
1741                    .new_transaction(
1742                        lock_keys![LockKey::object(
1743                            store.store_object_id(),
1744                            store.root_directory_object_id()
1745                        )],
1746                        Options::default(),
1747                    )
1748                    .await
1749                    .expect("new_transaction failed");
1750                let object = root_directory
1751                    .create_child_file(&mut transaction, &format!("C"))
1752                    .await
1753                    .expect("create_child_file failed");
1754                transaction.commit().await.expect("commit failed");
1755
1756                // Write again to the test file if it exists.
1757                if let Ok(test_file) = ObjectStore::open_object(
1758                    &store,
1759                    test_file_object_id,
1760                    HandleOptions::default(),
1761                    None,
1762                )
1763                .await
1764                {
1765                    // Check it has the contents we expect.
1766                    let mut buffer = test_file.allocate_buffer(4096).await;
1767                    let bytes = test_file.read(0, buffer.as_mut()).await.expect("read failed");
1768                    if bytes == 4096 {
1769                        let expected = [0xed; 4096];
1770                        assert_eq!(buffer.as_slice(), &expected);
1771                    } else {
1772                        // If the write didn't make it, the file should have zero bytes.
1773                        assert_eq!(bytes, 0);
1774                    }
1775
1776                    // Modify the test file.
1777                    let mut transaction =
1778                        test_file.new_transaction().await.expect("new_transaction failed");
1779                    buffer.as_mut_slice().fill(0x37);
1780                    test_file
1781                        .txn_write(&mut transaction, 0, buffer.as_ref())
1782                        .await
1783                        .expect("txn_write failed");
1784                    transaction.commit().await.expect("commit failed");
1785                    check_test_file = true;
1786                }
1787
1788                object.object_id()
1789            } else {
1790                INVALID_OBJECT_ID
1791            };
1792
1793            // This will do a controlled unmount.
1794            fs.close().await.expect("close failed");
1795            let device = fs.take_device().await;
1796            device.reopen(false);
1797
1798            let fs = FxFilesystem::open(device).await.expect("open failed");
1799            fsck(fs.clone()).await.expect("fsck failed");
1800
1801            // As mentioned above, make sure that the object we created before the clean unmount
1802            // exists.
1803            if object_id != INVALID_OBJECT_ID {
1804                fsck_volume(&fs, store_id, Some(Arc::new(new_insecure_crypt())))
1805                    .await
1806                    .expect("fsck_volume failed");
1807
1808                let store = root_volume(fs.clone())
1809                    .await
1810                    .expect("root_volume failed")
1811                    .volume(
1812                        "test",
1813                        StoreOptions {
1814                            crypt: Some(Arc::new(new_insecure_crypt())),
1815                            ..StoreOptions::default()
1816                        },
1817                    )
1818                    .await
1819                    .expect("volume failed");
1820                // We should be able to open the C object.
1821                ObjectStore::open_object(&store, object_id, HandleOptions::default(), None)
1822                    .await
1823                    .expect("open_object failed");
1824
1825                // If we made the modification to the test file, check it.
1826                if check_test_file {
1827                    info!("Checking test file for modification");
1828                    let test_file = ObjectStore::open_object(
1829                        &store,
1830                        test_file_object_id,
1831                        HandleOptions::default(),
1832                        None,
1833                    )
1834                    .await
1835                    .expect("open_object failed");
1836                    let mut buffer = test_file.allocate_buffer(4096).await;
1837                    assert_eq!(
1838                        test_file.read(0, buffer.as_mut()).await.expect("read failed"),
1839                        4096
1840                    );
1841                    let expected = [0x37; 4096];
1842                    assert_eq!(buffer.as_slice(), &expected);
1843                }
1844            }
1845
1846            fs.close().await.expect("close failed");
1847        }
1848    }
1849
1850    #[fuchsia::test]
1851    async fn test_barrier_not_emitted_when_transaction_has_no_data() {
1852        let barrier_count = Arc::new(AtomicU32::new(0));
1853
1854        struct Observer(Arc<AtomicU32>);
1855
1856        impl fake_device::Observer for Observer {
1857            fn barrier(&self) {
1858                self.0.fetch_add(1, Ordering::Relaxed);
1859            }
1860        }
1861
1862        let mut fake_device = FakeDevice::new(8192, 4096);
1863        fake_device.set_observer(Box::new(Observer(barrier_count.clone())));
1864        let device = DeviceHolder::new(fake_device);
1865        let fs = FxFilesystemBuilder::new()
1866            .barriers_enabled(true)
1867            .format(true)
1868            .open(device)
1869            .await
1870            .expect("new filesystem failed");
1871
1872        {
1873            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1874            root_vol
1875                .new_volume(
1876                    "test",
1877                    NewChildStoreOptions {
1878                        options: StoreOptions {
1879                            crypt: Some(Arc::new(new_insecure_crypt())),
1880                            ..StoreOptions::default()
1881                        },
1882                        ..NewChildStoreOptions::default()
1883                    },
1884                )
1885                .await
1886                .expect("there is no test volume");
1887            fs.close().await.expect("close failed");
1888        }
1889        // Remount the filesystem to ensure that the journal flushes and we can get a reliable
1890        // measure of the number of barriers issued during setup.
1891        let device = fs.take_device().await;
1892        device.reopen(false);
1893        let fs = FxFilesystemBuilder::new()
1894            .barriers_enabled(true)
1895            .open(device)
1896            .await
1897            .expect("new filesystem failed");
1898        let expected_barrier_count = barrier_count.load(Ordering::Relaxed);
1899
1900        let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1901        let store = root_vol
1902            .volume(
1903                "test",
1904                StoreOptions {
1905                    crypt: Some(Arc::new(new_insecure_crypt())),
1906                    ..StoreOptions::default()
1907                },
1908            )
1909            .await
1910            .expect("there is no test volume");
1911
1912        // Create a number of files with the goal of using up more than one journal block.
1913        let fs = store.filesystem();
1914        let root_directory =
1915            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
1916        for i in 0..100 {
1917            let mut transaction = fs
1918                .clone()
1919                .new_transaction(
1920                    lock_keys![LockKey::object(
1921                        store.store_object_id(),
1922                        store.root_directory_object_id()
1923                    )],
1924                    Options::default(),
1925                )
1926                .await
1927                .expect("new_transaction failed");
1928            root_directory
1929                .create_child_file(&mut transaction, &format!("A {i}"))
1930                .await
1931                .expect("create_child_file failed");
1932            transaction.commit().await.expect("commit failed");
1933        }
1934
1935        // Unmount the filesystem to ensure that the journal flushes.
1936        fs.close().await.expect("close failed");
1937        // Ensure that no barriers were emitted while creating files, as no data was written.
1938        assert_eq!(expected_barrier_count, barrier_count.load(Ordering::Relaxed));
1939    }
1940
1941    #[fuchsia::test]
1942    async fn test_barrier_emitted_when_transaction_includes_data() {
1943        let barrier_count = Arc::new(AtomicU32::new(0));
1944
1945        struct Observer(Arc<AtomicU32>);
1946
1947        impl fake_device::Observer for Observer {
1948            fn barrier(&self) {
1949                self.0.fetch_add(1, Ordering::Relaxed);
1950            }
1951        }
1952
1953        let mut fake_device = FakeDevice::new(8192, 4096);
1954        fake_device.set_observer(Box::new(Observer(barrier_count.clone())));
1955        let device = DeviceHolder::new(fake_device);
1956        let fs = FxFilesystemBuilder::new()
1957            .barriers_enabled(true)
1958            .format(true)
1959            .open(device)
1960            .await
1961            .expect("new filesystem failed");
1962
1963        {
1964            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1965            root_vol
1966                .new_volume(
1967                    "test",
1968                    NewChildStoreOptions {
1969                        options: StoreOptions {
1970                            crypt: Some(Arc::new(new_insecure_crypt())),
1971                            ..StoreOptions::default()
1972                        },
1973                        ..NewChildStoreOptions::default()
1974                    },
1975                )
1976                .await
1977                .expect("there is no test volume");
1978            fs.close().await.expect("close failed");
1979        }
1980        // Remount the filesystem to ensure that the journal flushes and we can get a reliable
1981        // measure of the number of barriers issued during setup.
1982        let device = fs.take_device().await;
1983        device.reopen(false);
1984        let fs = FxFilesystemBuilder::new()
1985            .barriers_enabled(true)
1986            .open(device)
1987            .await
1988            .expect("new filesystem failed");
1989        let expected_barrier_count = barrier_count.load(Ordering::Relaxed);
1990
1991        let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1992        let store = root_vol
1993            .volume(
1994                "test",
1995                StoreOptions {
1996                    crypt: Some(Arc::new(new_insecure_crypt())),
1997                    ..StoreOptions::default()
1998                },
1999            )
2000            .await
2001            .expect("there is no test volume");
2002
2003        // Create a file and write something to it. This should cause a barrier to be emitted.
2004        let fs: Arc<FxFilesystem> = store.filesystem();
2005        let root_directory =
2006            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
2007
2008        let mut transaction = fs
2009            .clone()
2010            .new_transaction(
2011                lock_keys![LockKey::object(
2012                    store.store_object_id(),
2013                    store.root_directory_object_id()
2014                )],
2015                Options::default(),
2016            )
2017            .await
2018            .expect("new_transaction failed");
2019        let object = root_directory
2020            .create_child_file(&mut transaction, "test")
2021            .await
2022            .expect("create_child_file failed");
2023        transaction.commit().await.expect("commit failed");
2024
2025        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2026        let mut buffer = object.allocate_buffer(4096).await;
2027        buffer.as_mut_slice().fill(0xed);
2028        object.txn_write(&mut transaction, 0, buffer.as_ref()).await.expect("txn_write failed");
2029        transaction.commit().await.expect("commit failed");
2030
2031        // Unmount the filesystem to ensure that the journal flushes.
2032        fs.close().await.expect("close failed");
2033        // Ensure that a barrier was emitted while writing to the file.
2034        assert!(expected_barrier_count < barrier_count.load(Ordering::Relaxed));
2035    }
2036
2037    #[test_case(true; "fail when original filesystem has barriers enabled")]
2038    #[test_case(false; "fail when original filesystem has barriers disabled")]
2039    #[fuchsia::test]
2040    async fn test_switching_barrier_mode_on_existing_filesystem(original_barrier_mode: bool) {
2041        let crypt = Some(Arc::new(new_insecure_crypt()) as Arc<dyn fxfs_crypto::Crypt>);
2042        let fake_device = FakeDevice::new(8192, 4096);
2043        let device = DeviceHolder::new(fake_device);
2044        let fs: super::OpenFxFilesystem = FxFilesystemBuilder::new()
2045            .barriers_enabled(original_barrier_mode)
2046            .format(true)
2047            .open(device)
2048            .await
2049            .expect("new filesystem failed");
2050
2051        // Create a volume named test with a file inside it called file.
2052        {
2053            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2054            let store = root_vol
2055                .new_volume(
2056                    "test",
2057                    NewChildStoreOptions {
2058                        options: StoreOptions { crypt: crypt.clone(), ..Default::default() },
2059                        ..Default::default()
2060                    },
2061                )
2062                .await
2063                .expect("creating test volume");
2064            let root_dir = Directory::open(&store, store.root_directory_object_id())
2065                .await
2066                .expect("open failed");
2067            let mut transaction = fs
2068                .clone()
2069                .new_transaction(
2070                    lock_keys![LockKey::object(
2071                        store.store_object_id(),
2072                        store.root_directory_object_id()
2073                    )],
2074                    Default::default(),
2075                )
2076                .await
2077                .expect("new_transaction failed");
2078            let object = root_dir
2079                .create_child_file(&mut transaction, "file")
2080                .await
2081                .expect("create_child_file failed");
2082            transaction.commit().await.expect("commit failed");
2083            let mut buffer = object.allocate_buffer(4096).await;
2084            buffer.as_mut_slice().fill(0xA7);
2085            let new_size = object.write_or_append(None, buffer.as_ref()).await.unwrap();
2086            assert_eq!(new_size, 4096);
2087        }
2088
2089        // Remount the filesystem with the opposite barrier mode and write more data to our file.
2090        fs.close().await.expect("close failed");
2091        let device = fs.take_device().await;
2092        device.reopen(false);
2093        let fs = FxFilesystemBuilder::new()
2094            .barriers_enabled(!original_barrier_mode)
2095            .open(device)
2096            .await
2097            .expect("new filesystem failed");
2098        {
2099            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2100            let store = root_vol
2101                .volume("test", StoreOptions { crypt: crypt.clone(), ..Default::default() })
2102                .await
2103                .expect("opening test volume");
2104            let root_dir = Directory::open(&store, store.root_directory_object_id())
2105                .await
2106                .expect("open failed");
2107            let (object_id, _, _) =
2108                root_dir.lookup("file").await.expect("lookup failed").expect("missing file");
2109            let test_file = ObjectStore::open_object(&store, object_id, Default::default(), None)
2110                .await
2111                .expect("open failed");
2112            // Write some more data.
2113            let mut buffer = test_file.allocate_buffer(4096).await;
2114            buffer.as_mut_slice().fill(0xA8);
2115            let new_size = test_file.write_or_append(None, buffer.as_ref()).await.unwrap();
2116            assert_eq!(new_size, 8192);
2117        }
2118
2119        // Lastly, remount the filesystems with the original barrier mode and make sure everything
2120        // can be read from the file as expected.
2121        fs.close().await.expect("close failed");
2122        let device = fs.take_device().await;
2123        device.reopen(false);
2124        let fs = FxFilesystemBuilder::new()
2125            .barriers_enabled(original_barrier_mode)
2126            .open(device)
2127            .await
2128            .expect("new filesystem failed");
2129        {
2130            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2131            let store = root_vol
2132                .volume("test", StoreOptions { crypt: crypt.clone(), ..Default::default() })
2133                .await
2134                .expect("opening test volume");
2135            let root_dir = Directory::open(&store, store.root_directory_object_id())
2136                .await
2137                .expect("open failed");
2138            let (object_id, _, _) =
2139                root_dir.lookup("file").await.expect("lookup failed").expect("missing file");
2140            let test_file = ObjectStore::open_object(&store, object_id, Default::default(), None)
2141                .await
2142                .expect("open failed");
2143            let mut buffer = test_file.allocate_buffer(8192).await;
2144            assert_eq!(
2145                test_file.read(0, buffer.as_mut()).await.expect("read failed"),
2146                8192,
2147                "short read"
2148            );
2149            assert_eq!(buffer.as_slice()[0..4096], [0xA7; 4096]);
2150            assert_eq!(buffer.as_slice()[4096..8192], [0xA8; 4096]);
2151        }
2152        fs.close().await.expect("close failed");
2153    }
2154
2155    #[fuchsia::test]
2156    async fn test_image_builder_mode_no_early_writes() {
2157        const BLOCK_SIZE: u32 = 4096;
2158        let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
2159        device.reopen(true);
2160        let fs = FxFilesystemBuilder::new()
2161            .format(true)
2162            .image_builder_mode(Some(SuperBlockInstance::A))
2163            .open(device)
2164            .await
2165            .expect("open failed");
2166        fs.enable_allocations();
2167        // fs.close() now performs compaction (writing superblock), so device must be writable.
2168        fs.device().reopen(false);
2169        fs.close().await.expect("closed");
2170    }
2171
2172    #[fuchsia::test]
2173    async fn test_image_builder_mode() {
2174        const BLOCK_SIZE: u32 = 4096;
2175        const EXISTING_FILE_RANGE: Range<u64> = 4096 * 1024..4096 * 1025;
2176        let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
2177
2178        // Write some fake file data at an offset in the image and confirm it as an fxfs file below.
2179        {
2180            let mut write_buf =
2181                device.allocate_buffer(EXISTING_FILE_RANGE.length().unwrap() as usize).await;
2182            write_buf.as_mut_slice().fill(0xf0);
2183            device.write(EXISTING_FILE_RANGE.start, write_buf.as_ref()).await.expect("write");
2184        }
2185
2186        device.reopen(true);
2187
2188        let device = {
2189            let fs = FxFilesystemBuilder::new()
2190                .format(true)
2191                .image_builder_mode(Some(SuperBlockInstance::B))
2192                .open(device)
2193                .await
2194                .expect("open failed");
2195            fs.enable_allocations();
2196            {
2197                let root_store = fs.root_store();
2198                let root_directory =
2199                    Directory::open(&root_store, root_store.root_directory_object_id())
2200                        .await
2201                        .expect("open failed");
2202                // Create a file referencing existing data on device.
2203                let handle;
2204                {
2205                    let mut transaction = fs
2206                        .clone()
2207                        .new_transaction(
2208                            lock_keys![LockKey::object(
2209                                root_directory.store().store_object_id(),
2210                                root_directory.object_id()
2211                            )],
2212                            Options::default(),
2213                        )
2214                        .await
2215                        .expect("new transaction");
2216                    handle = root_directory
2217                        .create_child_file(&mut transaction, "test")
2218                        .await
2219                        .expect("create file");
2220                    handle.extend(&mut transaction, EXISTING_FILE_RANGE).await.expect("extend");
2221                    transaction.commit().await.expect("commit");
2222                }
2223            }
2224            fs.device().reopen(false);
2225            fs.close().await.expect("close");
2226            fs.take_device().await
2227        };
2228        device.reopen(false);
2229        let fs = FxFilesystem::open(device).await.expect("open failed");
2230        fsck(fs.clone()).await.expect("fsck failed");
2231
2232        // Confirm that the test file points at the correct data.
2233        let root_store = fs.root_store();
2234        let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
2235            .await
2236            .expect("open failed");
2237        let (object_id, descriptor, _) =
2238            root_directory.lookup("test").await.expect("lookup failed").unwrap();
2239        assert_eq!(descriptor, ObjectDescriptor::File);
2240        let test_file =
2241            ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
2242                .await
2243                .expect("open failed");
2244        let mut read_buf =
2245            test_file.allocate_buffer(EXISTING_FILE_RANGE.length().unwrap() as usize).await;
2246        test_file.read(0, read_buf.as_mut()).await.expect("read failed");
2247        assert_eq!(read_buf.as_slice(), [0xf0; 4096]);
2248        fs.close().await.expect("closed");
2249    }
2250
2251    #[fuchsia::test]
2252    async fn test_read_only_mount_on_full_filesystem() {
2253        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2254        let fs =
2255            FxFilesystemBuilder::new().format(true).open(device).await.expect("new_empty failed");
2256        let root_store = fs.root_store();
2257        let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
2258            .await
2259            .expect("open failed");
2260
2261        let mut transaction = fs
2262            .clone()
2263            .new_transaction(
2264                lock_keys![LockKey::object(
2265                    root_store.store_object_id(),
2266                    root_directory.object_id()
2267                )],
2268                Options::default(),
2269            )
2270            .await
2271            .expect("new_transaction failed");
2272        let handle = root_directory
2273            .create_child_file(&mut transaction, "test")
2274            .await
2275            .expect("create_child_file failed");
2276        transaction.commit().await.expect("commit failed");
2277
2278        let mut buf = handle.allocate_buffer(4096).await;
2279        buf.as_mut_slice().fill(0xaa);
2280        loop {
2281            if handle.write_or_append(None, buf.as_ref()).await.is_err() {
2282                break;
2283            }
2284        }
2285
2286        let max_offset = fs.allocator().maximum_offset();
2287        fs.close().await.expect("Close failed");
2288
2289        let device = fs.take_device().await;
2290        device.reopen(false);
2291        let mut buffer = device
2292            .allocate_buffer(
2293                crate::round::round_up(max_offset, TEST_DEVICE_BLOCK_SIZE).unwrap() as usize
2294            )
2295            .await;
2296        device.read(0, buffer.as_mut()).await.expect("read failed");
2297
2298        let device = DeviceHolder::new(
2299            FakeDevice::from_image(&buffer.as_slice()[..], TEST_DEVICE_BLOCK_SIZE)
2300                .expect("from_image failed"),
2301        );
2302        let fs =
2303            FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
2304        fs.close().await.expect("Close failed");
2305    }
2306
2307    #[test_case(SuperBlockInstance::A; "Superblock instance A")]
2308    #[test_case(SuperBlockInstance::B; "Superblock instance B")]
2309    #[fuchsia::test]
2310    async fn test_image_builder_mode_flush_on_close_sb_a(target_sb: SuperBlockInstance) {
2311        const BLOCK_SIZE: u32 = 4096;
2312        let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
2313
2314        // 1. Initialize in image_builder_mode
2315        device.reopen(true);
2316        let fs = FxFilesystemBuilder::new()
2317            .format(true)
2318            .image_builder_mode(Some(target_sb))
2319            .open(device)
2320            .await
2321            .expect("open failed");
2322
2323        fs.enable_allocations();
2324
2325        // 2. Finalize logic (via close)
2326        fs.device().reopen(false);
2327
2328        // 3. Write data
2329        {
2330            let root_store = fs.root_store();
2331            let root_directory =
2332                Directory::open(&root_store, root_store.root_directory_object_id())
2333                    .await
2334                    .expect("open failed");
2335
2336            let mut transaction = fs
2337                .clone()
2338                .new_transaction(
2339                    lock_keys![LockKey::object(
2340                        root_directory.store().store_object_id(),
2341                        root_directory.object_id()
2342                    )],
2343                    Options::default(),
2344                )
2345                .await
2346                .expect("new transaction");
2347            let handle = root_directory
2348                .create_child_file(&mut transaction, "post_finalize_file")
2349                .await
2350                .expect("create file");
2351            transaction.commit().await.expect("commit");
2352
2353            let mut buf = handle.allocate_buffer(BLOCK_SIZE as usize).await;
2354            buf.as_mut_slice().fill(0xaa);
2355            handle.write_or_append(None, buf.as_ref()).await.expect("write failed");
2356        }
2357
2358        // 4. Close. Should flush to `target_sb` only.
2359        fs.close().await.expect("close failed");
2360
2361        let other_sb = target_sb.next();
2362
2363        // 5. Verify `target_sb` is valid and `other_sb` is empty.
2364        let device = fs.take_device().await;
2365        device.reopen(true); // Read-only is fine for verifying.
2366        let mut buf = device.allocate_buffer(BLOCK_SIZE as usize).await;
2367
2368        device.read(target_sb.first_extent().start, buf.as_mut()).await.expect("read target_sb");
2369        assert_eq!(&buf.as_slice()[..8], b"FxfsSupr", "target_sb should have magic bytes");
2370
2371        buf.as_mut_slice().fill(0); // Clear buffer
2372        device.read(other_sb.first_extent().start, buf.as_mut()).await.expect("read other_sb");
2373        // Expecting all zeros for `other_sb`
2374        assert_eq!(buf.as_slice(), &[0; 4096], "other_sb should be zeroed");
2375    }
2376
2377    #[cfg(target_os = "fuchsia")]
2378    #[fuchsia::test(allow_stalls = false)]
2379    async fn test_trim_with_power_manager() {
2380        use anyhow::Error;
2381        use async_trait::async_trait;
2382        use fuchsia_async::TestExecutor;
2383        use futures::StreamExt;
2384        use zx::HandleBased;
2385
2386        TestExecutor::advance_to(fasync::MonotonicInstant::ZERO).await;
2387
2388        #[derive(Default)]
2389        struct MockPowerManager {
2390            on_battery: Mutex<bool>,
2391            event: event_listener::Event,
2392            wake_lease: Mutex<Option<zx::EventPair>>,
2393        }
2394
2395        impl MockPowerManager {
2396            fn set_on_battery(&self, v: bool) {
2397                *self.on_battery.lock() = v;
2398                self.event.notify(usize::MAX);
2399            }
2400
2401            fn is_lease_held(&self) -> bool {
2402                self.wake_lease.lock().as_ref().is_some_and(|handle| {
2403                    handle
2404                        .wait_one(
2405                            zx::Signals::EVENTPAIR_PEER_CLOSED,
2406                            zx::MonotonicInstant::INFINITE_PAST,
2407                        )
2408                        .is_err()
2409                })
2410            }
2411        }
2412
2413        impl super::PowerManager for MockPowerManager {
2414            fn watch_battery(
2415                self: Arc<Self>,
2416            ) -> futures::stream::BoxStream<'static, (bool, super::WakeLease)> {
2417                futures::stream::unfold(true, move |first| {
2418                    let this = self.clone();
2419                    async move {
2420                        if !first {
2421                            this.event.listen().await;
2422                        }
2423                        let val = *this.on_battery.lock();
2424                        let handle = if val {
2425                            zx::NullableHandle::invalid()
2426                        } else {
2427                            let (h1, h2) = zx::EventPair::create();
2428                            *this.wake_lease.lock() = Some(h2);
2429                            // SAFETY: It's clear the handle is valid.
2430                            h1.into_handle()
2431                        };
2432                        Some(((val, handle), false))
2433                    }
2434                })
2435                .boxed()
2436            }
2437        }
2438
2439        let trim_count = Arc::new(AtomicU32::new(0));
2440
2441        struct TrimTrackingDevice {
2442            inner: DeviceHolder,
2443            trim_count: Arc<AtomicU32>,
2444            power_manager: Arc<MockPowerManager>,
2445        }
2446
2447        #[async_trait]
2448        impl storage_device::Device for TrimTrackingDevice {
2449            fn allocate_buffer(&self, size: usize) -> storage_device::buffer::BufferFuture<'_> {
2450                self.inner.allocate_buffer(size)
2451            }
2452            fn block_size(&self) -> u32 {
2453                self.inner.block_size()
2454            }
2455            fn block_count(&self) -> u64 {
2456                self.inner.block_count()
2457            }
2458            async fn read_with_opts(
2459                &self,
2460                offset: u64,
2461                buffer: storage_device::buffer::MutableBufferRef<'_>,
2462                opts: storage_device::ReadOptions,
2463            ) -> Result<(), Error> {
2464                self.inner.read_with_opts(offset, buffer, opts).await
2465            }
2466            async fn write_with_opts(
2467                &self,
2468                offset: u64,
2469                buffer: storage_device::buffer::BufferRef<'_>,
2470                opts: storage_device::WriteOptions,
2471            ) -> Result<(), Error> {
2472                self.inner.write_with_opts(offset, buffer, opts).await
2473            }
2474            async fn trim(&self, range: std::ops::Range<u64>) -> Result<(), Error> {
2475                assert!(self.power_manager.is_lease_held());
2476                self.trim_count.fetch_add(1, Ordering::SeqCst);
2477                self.inner.trim(range).await
2478            }
2479            async fn flush(&self) -> Result<(), Error> {
2480                self.inner.flush().await
2481            }
2482            async fn close(&self) -> Result<(), Error> {
2483                self.inner.close().await
2484            }
2485            fn barrier(&self) {
2486                self.inner.barrier()
2487            }
2488            fn supports_trim(&self) -> bool {
2489                true
2490            }
2491            fn is_read_only(&self) -> bool {
2492                self.inner.is_read_only()
2493            }
2494            fn snapshot(&self) -> Result<DeviceHolder, Error> {
2495                Ok(DeviceHolder::new(TrimTrackingDevice {
2496                    inner: self.inner.snapshot()?,
2497                    trim_count: self.trim_count.clone(),
2498                    power_manager: self.power_manager.clone(),
2499                }))
2500            }
2501            fn reopen(&self, read_only: bool) {
2502                self.inner.reopen(read_only)
2503            }
2504        }
2505
2506        let pm = Arc::new(MockPowerManager::default());
2507
2508        // Start on battery.
2509        pm.set_on_battery(true);
2510
2511        let fake_device = FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE);
2512        let device = DeviceHolder::new(TrimTrackingDevice {
2513            inner: DeviceHolder::new(fake_device),
2514            trim_count: trim_count.clone(),
2515            power_manager: pm.clone(),
2516        });
2517
2518        let fs = FxFilesystemBuilder::new()
2519            .format(true)
2520            .power_manager(pm.clone())
2521            .trim_config(Some((Duration::ZERO, Duration::from_millis(100))))
2522            .trim_charger_wait(Duration::from_millis(10))
2523            .open(device)
2524            .await
2525            .expect("open failed");
2526
2527        // Initially on battery, so no trim should happen.
2528        TestExecutor::advance_to(fasync::MonotonicInstant::after(
2529            Duration::from_millis(500).into(),
2530        ))
2531        .await;
2532        let _ = TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
2533
2534        assert_eq!(trim_count.load(Ordering::SeqCst), 0);
2535
2536        // Make some things to trim.
2537        {
2538            let root_store = fs.root_store();
2539            let root_directory =
2540                Directory::open(&root_store, root_store.root_directory_object_id())
2541                    .await
2542                    .expect("open failed");
2543            let mut transaction = fs
2544                .clone()
2545                .new_transaction(
2546                    lock_keys![LockKey::object(
2547                        root_store.store_object_id(),
2548                        root_directory.object_id()
2549                    )],
2550                    Options::default(),
2551                )
2552                .await
2553                .expect("new_transaction failed");
2554            let handle = root_directory
2555                .create_child_file(&mut transaction, "test")
2556                .await
2557                .expect("create_child_file failed");
2558            transaction.commit().await.expect("commit failed");
2559            handle.allocate(0..4096).await.expect("allocate failed");
2560            // Now delete it to make it trimmable.
2561            let mut transaction = fs
2562                .clone()
2563                .new_transaction(
2564                    lock_keys![
2565                        LockKey::object(root_store.store_object_id(), root_directory.object_id()),
2566                        LockKey::object(root_store.store_object_id(), handle.object_id()),
2567                    ],
2568                    Options::default(),
2569                )
2570                .await
2571                .expect("new_transaction failed");
2572            replace_child(&mut transaction, None, (&root_directory, "test"))
2573                .await
2574                .expect("delete failed");
2575            transaction.commit().await.expect("commit failed");
2576            fs.root_store()
2577                .tombstone_object(handle.object_id(), Options::default())
2578                .await
2579                .expect("tombstone failed");
2580        }
2581
2582        // Put on external power source.
2583        pm.set_on_battery(false);
2584
2585        // Trim should start after 10ms.
2586        TestExecutor::advance_to(fasync::MonotonicInstant::after(Duration::from_millis(10).into()))
2587            .await;
2588
2589        let _ = TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
2590
2591        assert!(trim_count.load(Ordering::SeqCst) > 0);
2592
2593        // Reset trim count and take off charger.
2594        trim_count.store(0, Ordering::SeqCst);
2595        pm.set_on_battery(true);
2596
2597        // Wait and ensure no more trims.
2598        TestExecutor::advance_to(fasync::MonotonicInstant::after(
2599            Duration::from_millis(500).into(),
2600        ))
2601        .await;
2602
2603        let _ = TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
2604
2605        assert_eq!(trim_count.load(Ordering::SeqCst), 0);
2606
2607        fs.close().await.expect("close failed");
2608    }
2609}