fxfs/
filesystem.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::errors::FxfsError;
6use crate::fsck::{fsck_volume_with_options, fsck_with_options, FsckOptions};
7use crate::log::*;
8use crate::object_store::allocator::{Allocator, Hold, Reservation};
9use crate::object_store::directory::Directory;
10use crate::object_store::graveyard::Graveyard;
11use crate::object_store::journal::super_block::SuperBlockHeader;
12use crate::object_store::journal::{self, Journal, JournalCheckpoint, JournalOptions};
13use crate::object_store::object_manager::ObjectManager;
14use crate::object_store::transaction::{
15    self, lock_keys, AssocObj, LockKey, LockKeys, LockManager, MetadataReservation, Mutation,
16    ReadGuard, Transaction, TRANSACTION_METADATA_MAX_AMOUNT,
17};
18use crate::object_store::volume::{root_volume, VOLUMES_DIRECTORY};
19use crate::object_store::{ObjectStore, NO_OWNER};
20use crate::range::RangeExt;
21use crate::serialized_types::{Version, LATEST_VERSION};
22use crate::{debug_assert_not_too_long, metrics};
23use anyhow::{bail, ensure, Context, Error};
24use async_trait::async_trait;
25use event_listener::Event;
26use fuchsia_async as fasync;
27use fuchsia_inspect::{NumericProperty as _, UintProperty};
28use fuchsia_sync::Mutex;
29use futures::FutureExt;
30use fxfs_crypto::Crypt;
31use once_cell::sync::OnceCell;
32use static_assertions::const_assert;
33use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
34use std::sync::{Arc, Weak};
35use storage_device::{Device, DeviceHolder};
36
37pub const MIN_BLOCK_SIZE: u64 = 4096;
38pub const MAX_BLOCK_SIZE: u64 = u16::MAX as u64 + 1;
39
40// Whilst Fxfs could support up to u64::MAX, off_t is i64 so allowing files larger than that becomes
41// difficult to deal with via the POSIX APIs. Additionally, PagedObjectHandle only sees data get
42// modified in page chunks so to prevent writes at i64::MAX the entire page containing i64::MAX
43// needs to be excluded.
44pub const MAX_FILE_SIZE: u64 = i64::MAX as u64 - 4095;
45const_assert!(9223372036854771712 == MAX_FILE_SIZE);
46
47// The maximum number of transactions that can be in-flight at any time.
48const MAX_IN_FLIGHT_TRANSACTIONS: u64 = 4;
49
50// Start trimming 1 hour after boot.  The idea here is to wait until the initial flurry of
51// activity during boot is finished.  This is a rough heuristic and may need to change later if
52// performance is affected.
53const TRIM_AFTER_BOOT_TIMER: std::time::Duration = std::time::Duration::from_secs(60 * 60);
54
55// After the initial trim, perform another trim every 24 hours.
56const TRIM_INTERVAL_TIMER: std::time::Duration = std::time::Duration::from_secs(60 * 60 * 24);
57
58/// Holds information on an Fxfs Filesystem
59pub struct Info {
60    pub total_bytes: u64,
61    pub used_bytes: u64,
62}
63
64pub type PostCommitHook =
65    Option<Box<dyn Fn() -> futures::future::BoxFuture<'static, ()> + Send + Sync>>;
66
67pub type PreCommitHook = Option<Box<dyn Fn(&Transaction<'_>) -> Result<(), Error> + Send + Sync>>;
68
69pub struct Options {
70    /// True if the filesystem is read-only.
71    pub read_only: bool,
72
73    /// The metadata keys will be rolled after this many bytes.  This must be large enough such that
74    /// we can't end up with more than two live keys (so it must be bigger than the maximum possible
75    /// size of unflushed journal contents).  This is exposed for testing purposes.
76    pub roll_metadata_key_byte_count: u64,
77
78    /// A callback that runs before every transaction is committed.  If this callback returns an
79    /// error then the transaction is failed with that error.
80    pub pre_commit_hook: PreCommitHook,
81
82    /// A callback that runs after every transaction has been committed.  This will be called whilst
83    /// a lock is held which will block more transactions from being committed.
84    pub post_commit_hook: PostCommitHook,
85
86    /// If true, don't do an initial reap of the graveyard at mount time.  This is useful for
87    /// testing.
88    pub skip_initial_reap: bool,
89
90    // The first duration is how long after the filesystem has been mounted to perform an initial
91    // trim.  The second is the interval to repeat trimming thereafter.  If set to None, no trimming
92    // is done.
93    // Default values are (5 minutes, 24 hours).
94    pub trim_config: Option<(std::time::Duration, std::time::Duration)>,
95
96    // If true, journal will not be used for writes.
97    pub image_builder_mode: bool,
98}
99
100impl Default for Options {
101    fn default() -> Self {
102        Options {
103            roll_metadata_key_byte_count: 128 * 1024 * 1024,
104            read_only: false,
105            pre_commit_hook: None,
106            post_commit_hook: None,
107            skip_initial_reap: false,
108            trim_config: Some((TRIM_AFTER_BOOT_TIMER, TRIM_INTERVAL_TIMER)),
109            image_builder_mode: false,
110        }
111    }
112}
113
114/// The context in which a transaction is being applied.
115pub struct ApplyContext<'a, 'b> {
116    /// The mode indicates whether the transaction is being replayed.
117    pub mode: ApplyMode<'a, 'b>,
118
119    /// The transaction checkpoint for this mutation.
120    pub checkpoint: JournalCheckpoint,
121}
122
123/// A transaction can be applied during replay or on a live running system (in which case a
124/// transaction object will be available).
125pub enum ApplyMode<'a, 'b> {
126    Replay,
127    Live(&'a Transaction<'b>),
128}
129
130impl ApplyMode<'_, '_> {
131    pub fn is_replay(&self) -> bool {
132        matches!(self, ApplyMode::Replay)
133    }
134
135    pub fn is_live(&self) -> bool {
136        matches!(self, ApplyMode::Live(_))
137    }
138}
139
140/// Objects that use journaling to track mutations (`Allocator` and `ObjectStore`) implement this.
141/// This is primarily used by `ObjectManager` and `SuperBlock` with flush calls used in a few tests.
142#[async_trait]
143pub trait JournalingObject: Send + Sync {
144    /// This method get called when the transaction commits, which can either be during live
145    /// operation (See `ObjectManager::apply_mutation`) or during journal replay, in which case
146    /// transaction will be None (See `super_block::read`).
147    fn apply_mutation(
148        &self,
149        mutation: Mutation,
150        context: &ApplyContext<'_, '_>,
151        assoc_obj: AssocObj<'_>,
152    ) -> Result<(), Error>;
153
154    /// Called when a transaction fails to commit.
155    fn drop_mutation(&self, mutation: Mutation, transaction: &Transaction<'_>);
156
157    /// Flushes in-memory changes to the device (to allow journal space to be freed).
158    ///
159    /// Also returns the earliest version of a struct in the filesystem.
160    async fn flush(&self) -> Result<Version, Error>;
161
162    /// Writes a mutation to the journal.  This allows objects to encrypt or otherwise modify what
163    /// gets written to the journal.
164    fn write_mutation(&self, mutation: &Mutation, mut writer: journal::Writer<'_>) {
165        writer.write(mutation.clone());
166    }
167}
168
169#[derive(Default)]
170pub struct SyncOptions<'a> {
171    /// If set, the journal will be flushed, as well as the underlying block device.  This is much
172    /// more expensive, but ensures the contents of the journal are persisted (which also acts as a
173    /// barrier, ensuring all previous journal writes are observable by future operations).
174    /// Note that when this is not set, the journal is *not* synchronously flushed by the sync call,
175    /// and it will return before the journal flush completes.  In other words, some journal
176    /// mutations may still be buffered in memory after this call returns.
177    pub flush_device: bool,
178
179    /// A precondition that is evaluated whilst a lock is held that determines whether or not the
180    /// sync needs to proceed.
181    pub precondition: Option<Box<dyn FnOnce() -> bool + 'a + Send>>,
182}
183
184pub struct OpenFxFilesystem(Arc<FxFilesystem>);
185
186impl OpenFxFilesystem {
187    /// Waits for filesystem to be dropped (so callers should ensure all direct and indirect
188    /// references are dropped) and returns the device.  No attempt is made at a graceful shutdown.
189    pub async fn take_device(self) -> DeviceHolder {
190        let fut = self.device.take_when_dropped();
191        std::mem::drop(self);
192        debug_assert_not_too_long!(fut)
193    }
194
195    /// Used to finalize a filesystem image when in image_builder_mode.
196    /// Returns the actual number of bytes used to store data, which may be useful in truncating
197    /// the image size down.
198    pub async fn finalize(self) -> Result<(DeviceHolder, u64), Error> {
199        ensure!(
200            self.journal().image_builder_mode(),
201            "finalize() only valid in image_builder_mode."
202        );
203        self.journal().allocate_journal().await?;
204        self.journal().set_image_builder_mode(false);
205        self.journal().compact().await?;
206        let actual_size = self.allocator().maximum_offset();
207        self.close().await?;
208        Ok((self.take_device().await, actual_size))
209    }
210}
211
212impl From<Arc<FxFilesystem>> for OpenFxFilesystem {
213    fn from(fs: Arc<FxFilesystem>) -> Self {
214        Self(fs)
215    }
216}
217
218impl Drop for OpenFxFilesystem {
219    fn drop(&mut self) {
220        if self.options.image_builder_mode && self.journal().image_builder_mode() {
221            error!("OpenFxFilesystem in image_builder_mode dropped without calling finalize().");
222        }
223        if !self.options.read_only && !self.closed.load(Ordering::SeqCst) {
224            error!("OpenFxFilesystem dropped without first being closed. Data loss may occur.");
225        }
226    }
227}
228
229impl std::ops::Deref for OpenFxFilesystem {
230    type Target = Arc<FxFilesystem>;
231
232    fn deref(&self) -> &Self::Target {
233        &self.0
234    }
235}
236
237pub struct FxFilesystemBuilder {
238    format: bool,
239    trace: bool,
240    options: Options,
241    journal_options: JournalOptions,
242    on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
243    on_new_store: Option<Box<dyn Fn(&ObjectStore) + Send + Sync>>,
244    fsck_after_every_transaction: bool,
245}
246
247impl FxFilesystemBuilder {
248    pub fn new() -> Self {
249        Self {
250            format: false,
251            trace: false,
252            options: Options::default(),
253            journal_options: JournalOptions::default(),
254            on_new_allocator: None,
255            on_new_store: None,
256            fsck_after_every_transaction: false,
257        }
258    }
259
260    /// Sets whether the block device should be formatted when opened. Defaults to `false`.
261    pub fn format(mut self, format: bool) -> Self {
262        self.format = format;
263        self
264    }
265
266    /// Enables or disables trace level logging. Defaults to `false`.
267    pub fn trace(mut self, trace: bool) -> Self {
268        self.trace = trace;
269        self
270    }
271
272    /// Sets whether the filesystem will be opened in read-only mode. Defaults to `false`.
273    /// Incompatible with `format`.
274    pub fn read_only(mut self, read_only: bool) -> Self {
275        self.options.read_only = read_only;
276        self
277    }
278
279    /// For image building and in-place migration.
280    ///
281    /// This mode avoids the initial write of super blocks and skips the journal for all
282    /// transactions. The user *must* call `finalize()` before closing the filesystem to trigger
283    /// a compaction of in-memory data structures, a minimal journal and a write to SuperBlock A.
284    pub fn image_builder_mode(mut self, enabled: bool) -> Self {
285        self.options.image_builder_mode = enabled;
286        self
287    }
288
289    /// Sets how often the metadata keys are rolled. See `Options::roll_metadata_key_byte_count`.
290    pub fn roll_metadata_key_byte_count(mut self, roll_metadata_key_byte_count: u64) -> Self {
291        self.options.roll_metadata_key_byte_count = roll_metadata_key_byte_count;
292        self
293    }
294
295    /// Sets a callback that runs before every transaction. See `Options::pre_commit_hook`.
296    pub fn pre_commit_hook(
297        mut self,
298        hook: impl Fn(&Transaction<'_>) -> Result<(), Error> + Send + Sync + 'static,
299    ) -> Self {
300        self.options.pre_commit_hook = Some(Box::new(hook));
301        self
302    }
303
304    /// Sets a callback that runs after every transaction has been committed. See
305    /// `Options::post_commit_hook`.
306    pub fn post_commit_hook(
307        mut self,
308        hook: impl Fn() -> futures::future::BoxFuture<'static, ()> + Send + Sync + 'static,
309    ) -> Self {
310        self.options.post_commit_hook = Some(Box::new(hook));
311        self
312    }
313
314    /// Sets whether to do an initial reap of the graveyard at mount time. See
315    /// `Options::skip_initial_reap`. Defaults to `false`.
316    pub fn skip_initial_reap(mut self, skip_initial_reap: bool) -> Self {
317        self.options.skip_initial_reap = skip_initial_reap;
318        self
319    }
320
321    /// Sets the options for the journal.
322    pub fn journal_options(mut self, journal_options: JournalOptions) -> Self {
323        self.journal_options = journal_options;
324        self
325    }
326
327    /// Sets a method to be called immediately after creating the allocator.
328    pub fn on_new_allocator(
329        mut self,
330        on_new_allocator: impl Fn(Arc<Allocator>) + Send + Sync + 'static,
331    ) -> Self {
332        self.on_new_allocator = Some(Box::new(on_new_allocator));
333        self
334    }
335
336    /// Sets a method to be called each time a new store is registered with `ObjectManager`.
337    pub fn on_new_store(
338        mut self,
339        on_new_store: impl Fn(&ObjectStore) + Send + Sync + 'static,
340    ) -> Self {
341        self.on_new_store = Some(Box::new(on_new_store));
342        self
343    }
344
345    /// Enables or disables running fsck after every transaction. Defaults to `false`.
346    pub fn fsck_after_every_transaction(mut self, fsck_after_every_transaction: bool) -> Self {
347        self.fsck_after_every_transaction = fsck_after_every_transaction;
348        self
349    }
350
351    pub fn trim_config(
352        mut self,
353        delay_and_interval: Option<(std::time::Duration, std::time::Duration)>,
354    ) -> Self {
355        self.options.trim_config = delay_and_interval;
356        self
357    }
358
359    /// Constructs an `FxFilesystem` object with the specified settings.
360    pub async fn open(self, device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
361        let read_only = self.options.read_only;
362        if self.format && read_only {
363            bail!("Cannot initialize a filesystem as read-only");
364        }
365
366        let objects = Arc::new(ObjectManager::new(self.on_new_store));
367        let journal = Arc::new(Journal::new(objects.clone(), self.journal_options));
368
369        let image_builder_mode = self.options.image_builder_mode;
370
371        let block_size = std::cmp::max(device.block_size().into(), MIN_BLOCK_SIZE);
372        assert_eq!(block_size % MIN_BLOCK_SIZE, 0);
373        assert!(block_size <= MAX_BLOCK_SIZE, "Max supported block size is 64KiB");
374
375        let mut fsck_after_every_transaction = None;
376        let mut filesystem_options = self.options;
377        if self.fsck_after_every_transaction {
378            let instance =
379                FsckAfterEveryTransaction::new(filesystem_options.post_commit_hook.take());
380            fsck_after_every_transaction = Some(instance.clone());
381            filesystem_options.post_commit_hook =
382                Some(Box::new(move || instance.clone().run().boxed()));
383        }
384
385        if !read_only && !self.format {
386            // See comment in JournalRecord::DidFlushDevice for why we need to flush the device
387            // before replay.
388            device.flush().await.context("Device flush failed")?;
389        }
390
391        let filesystem = Arc::new(FxFilesystem {
392            device,
393            block_size,
394            objects: objects.clone(),
395            journal,
396            commit_mutex: futures::lock::Mutex::new(()),
397            lock_manager: LockManager::new(),
398            flush_task: Mutex::new(None),
399            trim_task: Mutex::new(None),
400            closed: AtomicBool::new(true),
401            shutdown_event: Event::new(),
402            trace: self.trace,
403            graveyard: Graveyard::new(objects.clone()),
404            completed_transactions: metrics::detail().create_uint("completed_transactions", 0),
405            options: filesystem_options,
406            in_flight_transactions: AtomicU64::new(0),
407            transaction_limit_event: Event::new(),
408        });
409
410        if image_builder_mode {
411            filesystem.journal().set_image_builder_mode(true);
412        }
413
414        filesystem.journal.set_trace(self.trace);
415        if self.format {
416            filesystem.journal.init_empty(filesystem.clone()).await?;
417            if !image_builder_mode {
418                // The filesystem isn't valid until superblocks are written but we want to defer
419                // that until last when migrating filesystems or building system images.
420                filesystem.journal.init_superblocks().await?;
421
422                // Start the graveyard's background reaping task.
423                filesystem.graveyard.clone().reap_async();
424            }
425
426            // Create the root volume directory.
427            let root_store = filesystem.root_store();
428            root_store.set_trace(self.trace);
429            let root_directory =
430                Directory::open(&root_store, root_store.root_directory_object_id())
431                    .await
432                    .context("Unable to open root volume directory")?;
433            let mut transaction = filesystem
434                .clone()
435                .new_transaction(
436                    lock_keys![LockKey::object(
437                        root_store.store_object_id(),
438                        root_directory.object_id()
439                    )],
440                    transaction::Options::default(),
441                )
442                .await?;
443            let volume_directory =
444                root_directory.create_child_dir(&mut transaction, VOLUMES_DIRECTORY).await?;
445            transaction.commit().await?;
446            objects.set_volume_directory(volume_directory);
447        } else {
448            filesystem
449                .journal
450                .replay(filesystem.clone(), self.on_new_allocator)
451                .await
452                .context("Journal replay failed")?;
453            filesystem.root_store().set_trace(self.trace);
454
455            if !read_only {
456                // Queue all purged entries for tombstoning.  Don't start the reaper yet because
457                // that can trigger a flush which can add more entries to the graveyard which might
458                // get caught in the initial reap and cause objects to be prematurely tombstoned.
459                for store in objects.unlocked_stores() {
460                    filesystem.graveyard.initial_reap(&store).await?;
461                }
462            }
463        }
464
465        // This must be after we've formatted the filesystem; it will fail during format otherwise.
466        if let Some(fsck_after_every_transaction) = fsck_after_every_transaction {
467            fsck_after_every_transaction
468                .fs
469                .set(Arc::downgrade(&filesystem))
470                .unwrap_or_else(|_| unreachable!());
471        }
472
473        filesystem.closed.store(false, Ordering::SeqCst);
474
475        if !read_only && !image_builder_mode {
476            // Start the background tasks.
477            filesystem.graveyard.clone().reap_async();
478
479            if let Some((delay, interval)) = filesystem.options.trim_config.clone() {
480                filesystem.start_trim_task(delay, interval);
481            }
482        }
483
484        Ok(filesystem.into())
485    }
486}
487
488pub struct FxFilesystem {
489    block_size: u64,
490    objects: Arc<ObjectManager>,
491    journal: Arc<Journal>,
492    commit_mutex: futures::lock::Mutex<()>,
493    lock_manager: LockManager,
494    flush_task: Mutex<Option<fasync::Task<()>>>,
495    trim_task: Mutex<Option<fasync::Task<()>>>,
496    closed: AtomicBool,
497    // An event that is signalled when the filesystem starts to shut down.
498    shutdown_event: Event,
499    trace: bool,
500    graveyard: Arc<Graveyard>,
501    completed_transactions: UintProperty,
502    options: Options,
503
504    // The number of in-flight transactions which we will limit to MAX_IN_FLIGHT_TRANSACTIONS.
505    in_flight_transactions: AtomicU64,
506
507    // An event that is used to wake up tasks that are blocked due to the in-flight transaction
508    // limit.
509    transaction_limit_event: Event,
510
511    // NOTE: This *must* go last so that when users take the device from a closed filesystem, the
512    // filesystem has dropped all other members first (Rust drops members in declaration order).
513    device: DeviceHolder,
514}
515
516#[fxfs_trace::trace]
517impl FxFilesystem {
518    pub async fn new_empty(device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
519        FxFilesystemBuilder::new().format(true).open(device).await
520    }
521
522    pub async fn open(device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
523        FxFilesystemBuilder::new().open(device).await
524    }
525
526    pub fn root_parent_store(&self) -> Arc<ObjectStore> {
527        self.objects.root_parent_store()
528    }
529
530    pub async fn close(&self) -> Result<(), Error> {
531        assert_eq!(self.closed.swap(true, Ordering::SeqCst), false);
532        self.shutdown_event.notify(usize::MAX);
533        debug_assert_not_too_long!(self.graveyard.wait_for_reap());
534        let trim_task = self.trim_task.lock().take();
535        if let Some(task) = trim_task {
536            debug_assert_not_too_long!(task);
537        }
538        self.journal.stop_compactions().await;
539        let sync_status =
540            self.journal.sync(SyncOptions { flush_device: true, ..Default::default() }).await;
541        match &sync_status {
542            Ok(checkpoint) => info!(
543                "Filesystem closed (checkpoint={}, metadata_reservation={:?}, \
544                 reservation_required={}, borrowed={})",
545                checkpoint.as_ref().unwrap().0.file_offset,
546                self.object_manager().metadata_reservation(),
547                self.object_manager().required_reservation(),
548                self.object_manager().borrowed_metadata_space(),
549            ),
550            Err(e) => error!(error:? = e; "Failed to sync filesystem; data may be lost"),
551        }
552        self.journal.terminate();
553        let flush_task = self.flush_task.lock().take();
554        if let Some(task) = flush_task {
555            debug_assert_not_too_long!(task);
556        }
557        // Regardless of whether sync succeeds, we should close the device, since otherwise we will
558        // crash instead of exiting gracefully.
559        self.device().close().await.context("Failed to close device")?;
560        sync_status.map(|_| ())
561    }
562
563    pub fn device(&self) -> Arc<dyn Device> {
564        Arc::clone(&self.device)
565    }
566
567    pub fn root_store(&self) -> Arc<ObjectStore> {
568        self.objects.root_store()
569    }
570
571    pub fn allocator(&self) -> Arc<Allocator> {
572        self.objects.allocator()
573    }
574
575    pub fn object_manager(&self) -> &Arc<ObjectManager> {
576        &self.objects
577    }
578
579    pub fn journal(&self) -> &Arc<Journal> {
580        &self.journal
581    }
582
583    pub async fn sync(&self, options: SyncOptions<'_>) -> Result<(), Error> {
584        self.journal.sync(options).await.map(|_| ())
585    }
586
587    pub fn block_size(&self) -> u64 {
588        self.block_size
589    }
590
591    pub fn get_info(&self) -> Info {
592        Info {
593            total_bytes: self.device.size(),
594            used_bytes: self.object_manager().allocator().get_used_bytes().0,
595        }
596    }
597
598    pub fn super_block_header(&self) -> SuperBlockHeader {
599        self.journal.super_block_header()
600    }
601
602    pub fn graveyard(&self) -> &Arc<Graveyard> {
603        &self.graveyard
604    }
605
606    pub fn trace(&self) -> bool {
607        self.trace
608    }
609
610    pub fn options(&self) -> &Options {
611        &self.options
612    }
613
614    /// Returns a guard that must be taken before any transaction can commence.  This guard takes a
615    /// shared lock on the filesystem.  `fsck` will take an exclusive lock so that it can get a
616    /// consistent picture of the filesystem that it can verify.  It is important that this lock is
617    /// acquired before *all* other locks.  It is also important that this lock is not taken twice
618    /// by the same task since that can lead to deadlocks if another task tries to take a write
619    /// lock.
620    pub async fn txn_guard(self: Arc<Self>) -> TxnGuard<'static> {
621        TxnGuard::Owned(
622            self.lock_manager
623                .read_lock(lock_keys!(LockKey::Filesystem))
624                .await
625                .into_owned(self.clone()),
626        )
627    }
628
629    pub async fn new_transaction<'a>(
630        self: Arc<Self>,
631        locks: LockKeys,
632        options: transaction::Options<'a>,
633    ) -> Result<Transaction<'a>, Error> {
634        let guard = if let Some(guard) = options.txn_guard.as_ref() {
635            TxnGuard::Borrowed(guard)
636        } else {
637            self.txn_guard().await
638        };
639        Transaction::new(guard, options, locks).await
640    }
641
642    #[trace]
643    pub async fn commit_transaction(
644        &self,
645        transaction: &mut Transaction<'_>,
646        callback: &mut (dyn FnMut(u64) + Send),
647    ) -> Result<u64, Error> {
648        if let Some(hook) = self.options.pre_commit_hook.as_ref() {
649            hook(transaction)?;
650        }
651        debug_assert_not_too_long!(self.lock_manager.commit_prepare(&transaction));
652        self.maybe_start_flush_task();
653        let _guard = debug_assert_not_too_long!(self.commit_mutex.lock());
654        let journal_offset = if self.journal().image_builder_mode() {
655            let journal_checkpoint =
656                JournalCheckpoint { file_offset: 0, checksum: 0, version: LATEST_VERSION };
657            let maybe_mutation = self
658                .object_manager()
659                .apply_transaction(transaction, &journal_checkpoint)
660                .expect("Transactions must not fail in image_builder_mode");
661            if let Some(mutation) = maybe_mutation {
662                assert!(matches!(mutation, Mutation::UpdateBorrowed(_)));
663                // These are Mutation::UpdateBorrowed which are normally used to track borrowing of
664                // metadata reservations. As we are image-building and not using the journal,
665                // we don't track this.
666            }
667            self.object_manager().did_commit_transaction(transaction, &journal_checkpoint, 0);
668            0
669        } else {
670            self.journal.commit(transaction).await?
671        };
672        self.completed_transactions.add(1);
673
674        // For now, call the callback whilst holding the lock.  Technically, we don't need to do
675        // that except if there's a post-commit-hook (which there usually won't be).  We can
676        // consider changing this if we need to for performance, but we'd need to double check that
677        // callers don't depend on this.
678        callback(journal_offset);
679
680        if let Some(hook) = self.options.post_commit_hook.as_ref() {
681            hook().await;
682        }
683
684        Ok(journal_offset)
685    }
686
687    pub fn lock_manager(&self) -> &LockManager {
688        &self.lock_manager
689    }
690
691    pub(crate) fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
692        if !matches!(transaction.metadata_reservation, MetadataReservation::None) {
693            self.sub_transaction();
694        }
695        // If we placed a hold for metadata space, return it now.
696        if let MetadataReservation::Hold(hold_amount) =
697            std::mem::replace(&mut transaction.metadata_reservation, MetadataReservation::None)
698        {
699            let hold = transaction
700                .allocator_reservation
701                .unwrap()
702                .reserve(0)
703                .expect("Zero should always succeed.");
704            hold.add(hold_amount);
705        }
706        self.objects.drop_transaction(transaction);
707        self.lock_manager.drop_transaction(transaction);
708    }
709
710    fn maybe_start_flush_task(&self) {
711        let mut flush_task = self.flush_task.lock();
712        if flush_task.is_none() {
713            let journal = self.journal.clone();
714            *flush_task = Some(fasync::Task::spawn(journal.flush_task()));
715        }
716    }
717
718    // Returns the number of bytes trimmed.
719    async fn do_trim(&self) -> Result<usize, Error> {
720        const MAX_EXTENTS_PER_BATCH: usize = 8;
721        const MAX_EXTENT_SIZE: usize = 256 * 1024;
722        let mut offset = 0;
723        let mut bytes_trimmed = 0;
724        loop {
725            if self.closed.load(Ordering::Relaxed) {
726                info!("Filesystem is closed, nothing to trim");
727                return Ok(bytes_trimmed);
728            }
729            let allocator = self.allocator();
730            let trimmable_extents =
731                allocator.take_for_trimming(offset, MAX_EXTENT_SIZE, MAX_EXTENTS_PER_BATCH).await?;
732            for device_range in trimmable_extents.extents() {
733                self.device.trim(device_range.clone()).await?;
734                bytes_trimmed += device_range.length()? as usize;
735            }
736            if let Some(device_range) = trimmable_extents.extents().last() {
737                offset = device_range.end;
738            } else {
739                break;
740            }
741        }
742        Ok(bytes_trimmed)
743    }
744
745    fn start_trim_task(
746        self: &Arc<Self>,
747        delay: std::time::Duration,
748        interval: std::time::Duration,
749    ) {
750        if !self.device.supports_trim() {
751            info!("Device does not support trim; not scheduling trimming");
752            return;
753        }
754        let this = self.clone();
755        let mut next_timer = delay;
756        *self.trim_task.lock() = Some(fasync::Task::spawn(async move {
757            loop {
758                let shutdown_listener = this.shutdown_event.listen();
759                // Note that we need to check if the filesystem was closed after we start listening
760                // to the shutdown event, but before we start waiting on `timer`, because otherwise
761                // we might start listening on `shutdown_event` *after* the event was signaled, and
762                // so `shutdown_listener` will never fire, and this task will get stuck until
763                // `timer` expires.
764                if this.closed.load(Ordering::SeqCst) {
765                    return;
766                }
767                futures::select!(
768                    () = fasync::Timer::new(next_timer.clone()).fuse() => {},
769                    () = shutdown_listener.fuse() => return,
770                );
771                let start_time = std::time::Instant::now();
772                let res = this.do_trim().await;
773                let duration = std::time::Instant::now() - start_time;
774                next_timer = interval.clone();
775                match res {
776                    Ok(bytes_trimmed) => info!(
777                        "Trimmed {bytes_trimmed} bytes in {duration:?}.  Next trim in \
778                        {next_timer:?}",
779                    ),
780                    Err(e) => error!(e:?; "Failed to trim"),
781                }
782            }
783        }));
784    }
785
786    pub(crate) async fn reservation_for_transaction<'a>(
787        self: &Arc<Self>,
788        options: transaction::Options<'a>,
789    ) -> Result<(MetadataReservation, Option<&'a Reservation>, Option<Hold<'a>>), Error> {
790        if self.options.image_builder_mode {
791            // Image builder mode avoids the journal so reservation tracking for metadata overheads
792            // doesn't make sense and so we essentially have 'all or nothing' semantics instead.
793            return Ok((MetadataReservation::Borrowed, None, None));
794        }
795        if !options.skip_journal_checks {
796            self.maybe_start_flush_task();
797            self.journal.check_journal_space().await?;
798        }
799
800        // We support three options for metadata space reservation:
801        //
802        //   1. We can borrow from the filesystem's metadata reservation.  This should only be
803        //      be used on the understanding that eventually, potentially after a full compaction,
804        //      there should be no net increase in space used.  For example, unlinking an object
805        //      should eventually decrease the amount of space used and setting most attributes
806        //      should not result in any change.
807        //
808        //   2. A reservation is provided in which case we'll place a hold on some of it for
809        //      metadata.
810        //
811        //   3. No reservation is supplied, so we try and reserve space with the allocator now,
812        //      and will return NoSpace if that fails.
813        let mut hold = None;
814        let metadata_reservation = if options.borrow_metadata_space {
815            MetadataReservation::Borrowed
816        } else {
817            match options.allocator_reservation {
818                Some(reservation) => {
819                    hold = Some(
820                        reservation
821                            .reserve(TRANSACTION_METADATA_MAX_AMOUNT)
822                            .ok_or(FxfsError::NoSpace)?,
823                    );
824                    MetadataReservation::Hold(TRANSACTION_METADATA_MAX_AMOUNT)
825                }
826                None => {
827                    let reservation = self
828                        .allocator()
829                        .reserve(None, TRANSACTION_METADATA_MAX_AMOUNT)
830                        .ok_or(FxfsError::NoSpace)?;
831                    MetadataReservation::Reservation(reservation)
832                }
833            }
834        };
835        Ok((metadata_reservation, options.allocator_reservation, hold))
836    }
837
838    pub(crate) async fn add_transaction(&self, skip_journal_checks: bool) {
839        if skip_journal_checks {
840            self.in_flight_transactions.fetch_add(1, Ordering::Relaxed);
841        } else {
842            let inc = || {
843                let mut in_flights = self.in_flight_transactions.load(Ordering::Relaxed);
844                while in_flights < MAX_IN_FLIGHT_TRANSACTIONS {
845                    match self.in_flight_transactions.compare_exchange_weak(
846                        in_flights,
847                        in_flights + 1,
848                        Ordering::Relaxed,
849                        Ordering::Relaxed,
850                    ) {
851                        Ok(_) => return true,
852                        Err(x) => in_flights = x,
853                    }
854                }
855                return false;
856            };
857            while !inc() {
858                let listener = self.transaction_limit_event.listen();
859                if inc() {
860                    break;
861                }
862                listener.await;
863            }
864        }
865    }
866
867    pub(crate) fn sub_transaction(&self) {
868        let old = self.in_flight_transactions.fetch_sub(1, Ordering::Relaxed);
869        assert!(old != 0);
870        if old <= MAX_IN_FLIGHT_TRANSACTIONS {
871            self.transaction_limit_event.notify(usize::MAX);
872        }
873    }
874}
875
876pub enum TxnGuard<'a> {
877    Borrowed(&'a TxnGuard<'a>),
878    Owned(ReadGuard<'static>),
879}
880
881impl TxnGuard<'_> {
882    pub fn fs(&self) -> &Arc<FxFilesystem> {
883        match self {
884            TxnGuard::Borrowed(b) => b.fs(),
885            TxnGuard::Owned(o) => o.fs().unwrap(),
886        }
887    }
888}
889
890/// Helper method for making a new filesystem.
891pub async fn mkfs(device: DeviceHolder) -> Result<(), Error> {
892    let fs = FxFilesystem::new_empty(device).await?;
893    fs.close().await
894}
895
896/// Helper method for making a new filesystem with a single named volume.
897/// This shouldn't be used in production; instead volumes should be created with the Volumes
898/// protocol.
899pub async fn mkfs_with_volume(
900    device: DeviceHolder,
901    volume_name: &str,
902    crypt: Option<Arc<dyn Crypt>>,
903) -> Result<(), Error> {
904    let fs = FxFilesystem::new_empty(device).await?;
905    {
906        // expect instead of propagating errors here, since otherwise we could drop |fs| before
907        // close is called, which leads to confusing and unrelated error messages.
908        let root_volume = root_volume(fs.clone()).await.expect("Open root_volume failed");
909        root_volume.new_volume(volume_name, NO_OWNER, crypt).await.expect("Create volume failed");
910    }
911    fs.close().await?;
912    Ok(())
913}
914
915struct FsckAfterEveryTransaction {
916    fs: OnceCell<Weak<FxFilesystem>>,
917    old_hook: PostCommitHook,
918}
919
920impl FsckAfterEveryTransaction {
921    fn new(old_hook: PostCommitHook) -> Arc<Self> {
922        Arc::new(Self { fs: OnceCell::new(), old_hook })
923    }
924
925    async fn run(self: Arc<Self>) {
926        if let Some(fs) = self.fs.get().and_then(Weak::upgrade) {
927            let options = FsckOptions {
928                fail_on_warning: true,
929                no_lock: true,
930                quiet: true,
931                ..Default::default()
932            };
933            fsck_with_options(fs.clone(), &options).await.expect("fsck failed");
934            let object_manager = fs.object_manager();
935            for store in object_manager.unlocked_stores() {
936                let store_id = store.store_object_id();
937                if !object_manager.is_system_store(store_id) {
938                    fsck_volume_with_options(fs.as_ref(), &options, store_id, None)
939                        .await
940                        .expect("fsck_volume_with_options failed");
941                }
942            }
943        }
944        if let Some(old_hook) = self.old_hook.as_ref() {
945            old_hook().await;
946        }
947    }
948}
949
950#[cfg(test)]
951mod tests {
952    use super::{FxFilesystem, FxFilesystemBuilder, SyncOptions};
953    use crate::fsck::{fsck, fsck_volume};
954    use crate::log::*;
955    use crate::lsm_tree::types::Item;
956    use crate::lsm_tree::Operation;
957    use crate::object_handle::{
958        ObjectHandle, ReadObjectHandle, WriteObjectHandle, INVALID_OBJECT_ID,
959    };
960    use crate::object_store::directory::{replace_child, Directory};
961    use crate::object_store::journal::JournalOptions;
962    use crate::object_store::transaction::{lock_keys, LockKey, Options};
963    use crate::object_store::volume::root_volume;
964    use crate::object_store::{HandleOptions, ObjectDescriptor, ObjectStore, NO_OWNER};
965    use crate::range::RangeExt;
966    use fuchsia_async as fasync;
967    use fuchsia_sync::Mutex;
968    use futures::future::join_all;
969    use futures::stream::{FuturesUnordered, TryStreamExt};
970    use fxfs_insecure_crypto::InsecureCrypt;
971    use rustc_hash::FxHashMap as HashMap;
972    use std::ops::Range;
973    use std::sync::Arc;
974    use std::time::Duration;
975    use storage_device::fake_device::FakeDevice;
976    use storage_device::DeviceHolder;
977
978    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
979
980    #[fuchsia::test(threads = 10)]
981    async fn test_compaction() {
982        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
983
984        // If compaction is not working correctly, this test will run out of space.
985        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
986        let root_store = fs.root_store();
987        let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
988            .await
989            .expect("open failed");
990
991        let mut tasks = Vec::new();
992        for i in 0..2 {
993            let mut transaction = fs
994                .clone()
995                .new_transaction(
996                    lock_keys![LockKey::object(
997                        root_store.store_object_id(),
998                        root_directory.object_id()
999                    )],
1000                    Options::default(),
1001                )
1002                .await
1003                .expect("new_transaction failed");
1004            let handle = root_directory
1005                .create_child_file(&mut transaction, &format!("{}", i))
1006                .await
1007                .expect("create_child_file failed");
1008            transaction.commit().await.expect("commit failed");
1009            tasks.push(fasync::Task::spawn(async move {
1010                const TEST_DATA: &[u8] = b"hello";
1011                let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
1012                buf.as_mut_slice().copy_from_slice(TEST_DATA);
1013                for _ in 0..1500 {
1014                    handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1015                }
1016            }));
1017        }
1018        join_all(tasks).await;
1019        fs.sync(SyncOptions::default()).await.expect("sync failed");
1020
1021        fsck(fs.clone()).await.expect("fsck failed");
1022        fs.close().await.expect("Close failed");
1023    }
1024
1025    #[fuchsia::test(threads = 10)]
1026    async fn test_replay_is_identical() {
1027        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1028        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1029
1030        // Reopen the store, but set reclaim size to a very large value which will effectively
1031        // stop the journal from flushing and allows us to track all the mutations to the store.
1032        fs.close().await.expect("close failed");
1033        let device = fs.take_device().await;
1034        device.reopen(false);
1035
1036        struct Mutations<K, V>(Mutex<Vec<(Operation, Item<K, V>)>>);
1037
1038        impl<K: Clone, V: Clone> Mutations<K, V> {
1039            fn new() -> Self {
1040                Mutations(Mutex::new(Vec::new()))
1041            }
1042
1043            fn push(&self, operation: Operation, item: &Item<K, V>) {
1044                self.0.lock().push((operation, item.clone()));
1045            }
1046        }
1047
1048        let open_fs = |device,
1049                       object_mutations: Arc<Mutex<HashMap<_, _>>>,
1050                       allocator_mutations: Arc<Mutations<_, _>>| async {
1051            FxFilesystemBuilder::new()
1052                .journal_options(JournalOptions { reclaim_size: u64::MAX, ..Default::default() })
1053                .on_new_allocator(move |allocator| {
1054                    let allocator_mutations = allocator_mutations.clone();
1055                    allocator.tree().set_mutation_callback(Some(Box::new(move |op, item| {
1056                        allocator_mutations.push(op, item)
1057                    })));
1058                })
1059                .on_new_store(move |store| {
1060                    let mutations = Arc::new(Mutations::new());
1061                    object_mutations.lock().insert(store.store_object_id(), mutations.clone());
1062                    store.tree().set_mutation_callback(Some(Box::new(move |op, item| {
1063                        mutations.push(op, item)
1064                    })));
1065                })
1066                .open(device)
1067                .await
1068                .expect("open failed")
1069        };
1070
1071        let allocator_mutations = Arc::new(Mutations::new());
1072        let object_mutations = Arc::new(Mutex::new(HashMap::default()));
1073        let fs = open_fs(device, object_mutations.clone(), allocator_mutations.clone()).await;
1074
1075        let root_store = fs.root_store();
1076        let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1077            .await
1078            .expect("open failed");
1079
1080        let mut transaction = fs
1081            .clone()
1082            .new_transaction(
1083                lock_keys![LockKey::object(
1084                    root_store.store_object_id(),
1085                    root_directory.object_id()
1086                )],
1087                Options::default(),
1088            )
1089            .await
1090            .expect("new_transaction failed");
1091        let object = root_directory
1092            .create_child_file(&mut transaction, "test")
1093            .await
1094            .expect("create_child_file failed");
1095        transaction.commit().await.expect("commit failed");
1096
1097        // Append some data.
1098        let buf = object.allocate_buffer(10000).await;
1099        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1100
1101        // Overwrite some data.
1102        object.write_or_append(Some(5000), buf.as_ref()).await.expect("write failed");
1103
1104        // Truncate.
1105        object.truncate(3000).await.expect("truncate failed");
1106
1107        // Delete the object.
1108        let mut transaction = fs
1109            .clone()
1110            .new_transaction(
1111                lock_keys![
1112                    LockKey::object(root_store.store_object_id(), root_directory.object_id()),
1113                    LockKey::object(root_store.store_object_id(), object.object_id()),
1114                ],
1115                Options::default(),
1116            )
1117            .await
1118            .expect("new_transaction failed");
1119
1120        replace_child(&mut transaction, None, (&root_directory, "test"))
1121            .await
1122            .expect("replace_child failed");
1123
1124        transaction.commit().await.expect("commit failed");
1125
1126        // Finally tombstone the object.
1127        root_store
1128            .tombstone_object(object.object_id(), Options::default())
1129            .await
1130            .expect("tombstone failed");
1131
1132        // Now reopen and check that replay produces the same set of mutations.
1133        fs.close().await.expect("close failed");
1134
1135        let metadata_reservation_amount = fs.object_manager().metadata_reservation().amount();
1136
1137        let device = fs.take_device().await;
1138        device.reopen(false);
1139
1140        let replayed_object_mutations = Arc::new(Mutex::new(HashMap::default()));
1141        let replayed_allocator_mutations = Arc::new(Mutations::new());
1142        let fs = open_fs(
1143            device,
1144            replayed_object_mutations.clone(),
1145            replayed_allocator_mutations.clone(),
1146        )
1147        .await;
1148
1149        let m1 = object_mutations.lock();
1150        let m2 = replayed_object_mutations.lock();
1151        assert_eq!(m1.len(), m2.len());
1152        for (store_id, mutations) in &*m1 {
1153            let mutations = mutations.0.lock();
1154            let replayed = m2.get(&store_id).expect("Found unexpected store").0.lock();
1155            assert_eq!(mutations.len(), replayed.len());
1156            for ((op1, i1), (op2, i2)) in mutations.iter().zip(replayed.iter()) {
1157                assert_eq!(op1, op2);
1158                assert_eq!(i1.key, i2.key);
1159                assert_eq!(i1.value, i2.value);
1160                assert_eq!(i1.sequence, i2.sequence);
1161            }
1162        }
1163
1164        let a1 = allocator_mutations.0.lock();
1165        let a2 = replayed_allocator_mutations.0.lock();
1166        assert_eq!(a1.len(), a2.len());
1167        for ((op1, i1), (op2, i2)) in a1.iter().zip(a2.iter()) {
1168            assert_eq!(op1, op2);
1169            assert_eq!(i1.key, i2.key);
1170            assert_eq!(i1.value, i2.value);
1171            assert_eq!(i1.sequence, i2.sequence);
1172        }
1173
1174        assert_eq!(
1175            fs.object_manager().metadata_reservation().amount(),
1176            metadata_reservation_amount
1177        );
1178    }
1179
1180    #[fuchsia::test]
1181    async fn test_max_in_flight_transactions() {
1182        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1183        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1184
1185        let transactions = FuturesUnordered::new();
1186        for _ in 0..super::MAX_IN_FLIGHT_TRANSACTIONS {
1187            transactions.push(fs.clone().new_transaction(lock_keys![], Options::default()));
1188        }
1189        let mut transactions: Vec<_> = transactions.try_collect().await.unwrap();
1190
1191        // Trying to create another one should be blocked.
1192        let mut fut = std::pin::pin!(fs.clone().new_transaction(lock_keys![], Options::default()));
1193        assert!(futures::poll!(&mut fut).is_pending());
1194
1195        // Dropping one should allow it to proceed.
1196        transactions.pop();
1197
1198        assert!(futures::poll!(&mut fut).is_ready());
1199    }
1200
1201    // If run on a single thread, the trim tasks starve out other work.
1202    #[fuchsia::test(threads = 10)]
1203    async fn test_continuously_trim() {
1204        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1205        let fs = FxFilesystemBuilder::new()
1206            .trim_config(Some((Duration::ZERO, Duration::ZERO)))
1207            .format(true)
1208            .open(device)
1209            .await
1210            .expect("open failed");
1211        // Do a small sleep so trim has time to get going.
1212        fasync::Timer::new(Duration::from_millis(10)).await;
1213
1214        // Create and delete a bunch of files whilst trim is ongoing.  This just ensures that
1215        // regular usage isn't affected by trim.
1216        let root_store = fs.root_store();
1217        let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1218            .await
1219            .expect("open failed");
1220        for _ in 0..100 {
1221            let mut transaction = fs
1222                .clone()
1223                .new_transaction(
1224                    lock_keys![LockKey::object(
1225                        root_store.store_object_id(),
1226                        root_directory.object_id()
1227                    )],
1228                    Options::default(),
1229                )
1230                .await
1231                .expect("new_transaction failed");
1232            let object = root_directory
1233                .create_child_file(&mut transaction, "test")
1234                .await
1235                .expect("create_child_file failed");
1236            transaction.commit().await.expect("commit failed");
1237
1238            {
1239                let buf = object.allocate_buffer(1024).await;
1240                object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1241            }
1242            std::mem::drop(object);
1243
1244            let mut transaction = root_directory
1245                .acquire_context_for_replace(None, "test", true)
1246                .await
1247                .expect("acquire_context_for_replace failed")
1248                .transaction;
1249            replace_child(&mut transaction, None, (&root_directory, "test"))
1250                .await
1251                .expect("replace_child failed");
1252            transaction.commit().await.expect("commit failed");
1253        }
1254        fs.close().await.expect("close failed");
1255    }
1256
1257    #[fuchsia::test]
1258    async fn test_power_fail() {
1259        // This test randomly discards blocks, so we run it a few times to increase the chances
1260        // of catching an issue in a single run.
1261        for _ in 0..10 {
1262            let (store_id, device, test_file_object_id) = {
1263                let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
1264                let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1265                let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
1266
1267                fs.sync(SyncOptions { flush_device: true, ..SyncOptions::default() })
1268                    .await
1269                    .expect("sync failed");
1270
1271                let store = root_volume
1272                    .new_volume("test", NO_OWNER, Some(Arc::new(InsecureCrypt::new())))
1273                    .await
1274                    .expect("new_volume failed");
1275                let root_directory = Directory::open(&store, store.root_directory_object_id())
1276                    .await
1277                    .expect("open failed");
1278
1279                // Create a number of files with the goal of using up more than one journal block.
1280                async fn create_files(store: &Arc<ObjectStore>, prefix: &str) {
1281                    let fs = store.filesystem();
1282                    let root_directory = Directory::open(store, store.root_directory_object_id())
1283                        .await
1284                        .expect("open failed");
1285                    for i in 0..100 {
1286                        let mut transaction = fs
1287                            .clone()
1288                            .new_transaction(
1289                                lock_keys![LockKey::object(
1290                                    store.store_object_id(),
1291                                    store.root_directory_object_id()
1292                                )],
1293                                Options::default(),
1294                            )
1295                            .await
1296                            .expect("new_transaction failed");
1297                        root_directory
1298                            .create_child_file(&mut transaction, &format!("{prefix} {i}"))
1299                            .await
1300                            .expect("create_child_file failed");
1301                        transaction.commit().await.expect("commit failed");
1302                    }
1303                }
1304
1305                // Create one batch of files.
1306                create_files(&store, "A").await;
1307
1308                // Create a file and write something to it.  This will make sure there's a
1309                // transaction present that includes a checksum.
1310                let mut transaction = fs
1311                    .clone()
1312                    .new_transaction(
1313                        lock_keys![LockKey::object(
1314                            store.store_object_id(),
1315                            store.root_directory_object_id()
1316                        )],
1317                        Options::default(),
1318                    )
1319                    .await
1320                    .expect("new_transaction failed");
1321                let object = root_directory
1322                    .create_child_file(&mut transaction, "test")
1323                    .await
1324                    .expect("create_child_file failed");
1325                transaction.commit().await.expect("commit failed");
1326
1327                let mut transaction =
1328                    object.new_transaction().await.expect("new_transaction failed");
1329                let mut buffer = object.allocate_buffer(4096).await;
1330                buffer.as_mut_slice().fill(0xed);
1331                object
1332                    .txn_write(&mut transaction, 0, buffer.as_ref())
1333                    .await
1334                    .expect("txn_write failed");
1335                transaction.commit().await.expect("commit failed");
1336
1337                // Create another batch of files.
1338                create_files(&store, "B").await;
1339
1340                // Sync the device, but don't flush the device. We want to do this so we can
1341                // randomly discard blocks below.
1342                fs.sync(SyncOptions::default()).await.expect("sync failed");
1343
1344                // When we call `sync` above on the filesystem, it will pad the journal so that it
1345                // will get written, but it doesn't wait for the write to occur.  We wait for a
1346                // short time here to give allow time for the journal to be written.  Adding timers
1347                // isn't great, but this test already isn't deterministic since we randomly discard
1348                // blocks.
1349                fasync::Timer::new(Duration::from_millis(10)).await;
1350
1351                (
1352                    store.store_object_id(),
1353                    fs.device().snapshot().expect("snapshot failed"),
1354                    object.object_id(),
1355                )
1356            };
1357
1358            // Randomly discard blocks since the last flush.  This simulates what might happen in
1359            // the case of power-loss.  This will be an uncontrolled unmount.
1360            device
1361                .discard_random_since_last_flush()
1362                .expect("discard_random_since_last_flush failed");
1363
1364            let fs = FxFilesystem::open(device).await.expect("open failed");
1365            fsck(fs.clone()).await.expect("fsck failed");
1366
1367            let mut check_test_file = false;
1368
1369            // If we replayed and the store exists (i.e. the transaction that created the store
1370            // made it out), start by running fsck on it.
1371            let object_id = if fs.object_manager().store(store_id).is_some() {
1372                fsck_volume(&fs, store_id, Some(Arc::new(InsecureCrypt::new())))
1373                    .await
1374                    .expect("fsck_volume failed");
1375
1376                // Now we want to create another file, unmount cleanly, and then finally check that
1377                // the new file exists.  This checks that we can continue to use the filesystem
1378                // after an unclean unmount.
1379                let store = root_volume(fs.clone())
1380                    .await
1381                    .expect("root_volume failed")
1382                    .volume("test", NO_OWNER, Some(Arc::new(InsecureCrypt::new())))
1383                    .await
1384                    .expect("volume failed");
1385
1386                let root_directory = Directory::open(&store, store.root_directory_object_id())
1387                    .await
1388                    .expect("open failed");
1389
1390                let mut transaction = fs
1391                    .clone()
1392                    .new_transaction(
1393                        lock_keys![LockKey::object(
1394                            store.store_object_id(),
1395                            store.root_directory_object_id()
1396                        )],
1397                        Options::default(),
1398                    )
1399                    .await
1400                    .expect("new_transaction failed");
1401                let object = root_directory
1402                    .create_child_file(&mut transaction, &format!("C"))
1403                    .await
1404                    .expect("create_child_file failed");
1405                transaction.commit().await.expect("commit failed");
1406
1407                // Write again to the test file if it exists.
1408                if let Ok(test_file) = ObjectStore::open_object(
1409                    &store,
1410                    test_file_object_id,
1411                    HandleOptions::default(),
1412                    None,
1413                )
1414                .await
1415                {
1416                    // Check it has the contents we expect.
1417                    let mut buffer = test_file.allocate_buffer(4096).await;
1418                    let bytes = test_file.read(0, buffer.as_mut()).await.expect("read failed");
1419                    if bytes == 4096 {
1420                        let expected = [0xed; 4096];
1421                        assert_eq!(buffer.as_slice(), &expected);
1422                    } else {
1423                        // If the write didn't make it, the file should have zero bytes.
1424                        assert_eq!(bytes, 0);
1425                    }
1426
1427                    // Modify the test file.
1428                    let mut transaction =
1429                        test_file.new_transaction().await.expect("new_transaction failed");
1430                    buffer.as_mut_slice().fill(0x37);
1431                    test_file
1432                        .txn_write(&mut transaction, 0, buffer.as_ref())
1433                        .await
1434                        .expect("txn_write failed");
1435                    transaction.commit().await.expect("commit failed");
1436                    check_test_file = true;
1437                }
1438
1439                object.object_id()
1440            } else {
1441                INVALID_OBJECT_ID
1442            };
1443
1444            // This will do a controlled unmount.
1445            fs.close().await.expect("close failed");
1446            let device = fs.take_device().await;
1447            device.reopen(false);
1448
1449            let fs = FxFilesystem::open(device).await.expect("open failed");
1450            fsck(fs.clone()).await.expect("fsck failed");
1451
1452            // As mentioned above, make sure that the object we created before the clean unmount
1453            // exists.
1454            if object_id != INVALID_OBJECT_ID {
1455                fsck_volume(&fs, store_id, Some(Arc::new(InsecureCrypt::new())))
1456                    .await
1457                    .expect("fsck_volume failed");
1458
1459                let store = root_volume(fs.clone())
1460                    .await
1461                    .expect("root_volume failed")
1462                    .volume("test", NO_OWNER, Some(Arc::new(InsecureCrypt::new())))
1463                    .await
1464                    .expect("volume failed");
1465                // We should be able to open the C object.
1466                ObjectStore::open_object(&store, object_id, HandleOptions::default(), None)
1467                    .await
1468                    .expect("open_object failed");
1469
1470                // If we made the modification to the test file, check it.
1471                if check_test_file {
1472                    info!("Checking test file for modification");
1473                    let test_file = ObjectStore::open_object(
1474                        &store,
1475                        test_file_object_id,
1476                        HandleOptions::default(),
1477                        None,
1478                    )
1479                    .await
1480                    .expect("open_object failed");
1481                    let mut buffer = test_file.allocate_buffer(4096).await;
1482                    assert_eq!(
1483                        test_file.read(0, buffer.as_mut()).await.expect("read failed"),
1484                        4096
1485                    );
1486                    let expected = [0x37; 4096];
1487                    assert_eq!(buffer.as_slice(), &expected);
1488                }
1489            }
1490
1491            fs.close().await.expect("close failed");
1492        }
1493    }
1494
1495    #[fuchsia::test]
1496    async fn test_image_builder_mode_no_early_writes() {
1497        const BLOCK_SIZE: u32 = 4096;
1498        let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
1499        device.reopen(true);
1500        let fs = FxFilesystemBuilder::new()
1501            .format(true)
1502            .image_builder_mode(true)
1503            .open(device)
1504            .await
1505            .expect("open failed");
1506        // Image builder mode only writes when data is written or fs.finalize() is called, so
1507        // we shouldn't see any errors here.
1508        fs.close().await.expect("closed");
1509    }
1510
1511    #[fuchsia::test]
1512    async fn test_image_builder_mode() {
1513        const BLOCK_SIZE: u32 = 4096;
1514        const EXISTING_FILE_RANGE: Range<u64> = 4096 * 1024..4096 * 1025;
1515        let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
1516
1517        // Write some fake file data at an offset in the image and confirm it as an fxfs file below.
1518        {
1519            let mut write_buf =
1520                device.allocate_buffer(EXISTING_FILE_RANGE.length().unwrap() as usize).await;
1521            write_buf.as_mut_slice().fill(0xf0);
1522            device.write(EXISTING_FILE_RANGE.start, write_buf.as_ref()).await.expect("write");
1523        }
1524
1525        device.reopen(true);
1526
1527        let device = {
1528            let fs = FxFilesystemBuilder::new()
1529                .format(true)
1530                .image_builder_mode(true)
1531                .open(device)
1532                .await
1533                .expect("open failed");
1534            {
1535                let root_store = fs.root_store();
1536                let root_directory =
1537                    Directory::open(&root_store, root_store.root_directory_object_id())
1538                        .await
1539                        .expect("open failed");
1540                // Create a file referencing existing data on device.
1541                let handle;
1542                {
1543                    let mut transaction = fs
1544                        .clone()
1545                        .new_transaction(
1546                            lock_keys![LockKey::object(
1547                                root_directory.store().store_object_id(),
1548                                root_directory.object_id()
1549                            )],
1550                            Options::default(),
1551                        )
1552                        .await
1553                        .expect("new transaction");
1554                    handle = root_directory
1555                        .create_child_file(&mut transaction, "test")
1556                        .await
1557                        .expect("create file");
1558                    handle.extend(&mut transaction, EXISTING_FILE_RANGE).await.expect("extend");
1559                    transaction.commit().await.expect("commit");
1560                }
1561            }
1562            fs.device().reopen(false);
1563            let (device, _size) = fs.finalize().await.expect("finalize");
1564            device
1565        };
1566        device.reopen(false);
1567        let fs = FxFilesystem::open(device).await.expect("open failed");
1568        fsck(fs.clone()).await.expect("fsck failed");
1569
1570        // Confirm that the test file points at the correct data.
1571        let root_store = fs.root_store();
1572        let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1573            .await
1574            .expect("open failed");
1575        let (object_id, descriptor) =
1576            root_directory.lookup("test").await.expect("lookup failed").unwrap();
1577        assert_eq!(descriptor, ObjectDescriptor::File);
1578        let test_file =
1579            ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
1580                .await
1581                .expect("open failed");
1582        let mut read_buf =
1583            test_file.allocate_buffer(EXISTING_FILE_RANGE.length().unwrap() as usize).await;
1584        test_file.read(0, read_buf.as_mut()).await.expect("read failed");
1585        assert_eq!(read_buf.as_slice(), [0xf0; 4096]);
1586        fs.close().await.expect("closed");
1587    }
1588}