fxfs/
filesystem.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::errors::FxfsError;
6use crate::fsck::{FsckOptions, fsck_volume_with_options, fsck_with_options};
7use crate::log::*;
8use crate::metrics;
9use crate::object_store::allocator::{Allocator, Hold, Reservation};
10use crate::object_store::directory::Directory;
11use crate::object_store::graveyard::Graveyard;
12use crate::object_store::journal::super_block::{SuperBlockHeader, SuperBlockInstance};
13use crate::object_store::journal::{self, Journal, JournalCheckpoint, JournalOptions};
14use crate::object_store::object_manager::ObjectManager;
15use crate::object_store::transaction::{
16    self, AssocObj, LockKey, LockKeys, LockManager, MetadataReservation, Mutation, ReadGuard,
17    TRANSACTION_METADATA_MAX_AMOUNT, Transaction, WriteGuard, lock_keys,
18};
19use crate::object_store::volume::{VOLUMES_DIRECTORY, root_volume};
20use crate::object_store::{NewChildStoreOptions, ObjectStore, StoreOptions};
21use crate::range::RangeExt;
22use crate::serialized_types::{LATEST_VERSION, Version};
23use anyhow::{Context, Error, anyhow, bail, ensure};
24use async_trait::async_trait;
25use event_listener::Event;
26use fuchsia_async as fasync;
27use fuchsia_inspect::{Inspector, LazyNode, NumericProperty as _, UintProperty};
28use fuchsia_sync::Mutex;
29use futures::FutureExt;
30use fxfs_crypto::Crypt;
31use once_cell::sync::OnceCell;
32use static_assertions::const_assert;
33use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
34use std::sync::{Arc, Weak};
35use storage_device::{Device, DeviceHolder};
36
37pub const MIN_BLOCK_SIZE: u64 = 4096;
38pub const MAX_BLOCK_SIZE: u64 = u16::MAX as u64 + 1;
39
40// Whilst Fxfs could support up to u64::MAX, off_t is i64 so allowing files larger than that becomes
41// difficult to deal with via the POSIX APIs. Additionally, PagedObjectHandle only sees data get
42// modified in page chunks so to prevent writes at i64::MAX the entire page containing i64::MAX
43// needs to be excluded.
44pub const MAX_FILE_SIZE: u64 = i64::MAX as u64 - 4095;
45const_assert!(9223372036854771712 == MAX_FILE_SIZE);
46
47// The maximum number of transactions that can be in-flight at any time.
48const MAX_IN_FLIGHT_TRANSACTIONS: u64 = 4;
49
50// Start trimming 1 hour after boot.  The idea here is to wait until the initial flurry of
51// activity during boot is finished.  This is a rough heuristic and may need to change later if
52// performance is affected.
53const TRIM_AFTER_BOOT_TIMER: std::time::Duration = std::time::Duration::from_secs(60 * 60);
54
55// After the initial trim, perform another trim every 24 hours.
56const TRIM_INTERVAL_TIMER: std::time::Duration = std::time::Duration::from_secs(60 * 60 * 24);
57
58/// Holds information on an Fxfs Filesystem
59pub struct Info {
60    pub total_bytes: u64,
61    pub used_bytes: u64,
62}
63
64pub type PostCommitHook =
65    Option<Box<dyn Fn() -> futures::future::BoxFuture<'static, ()> + Send + Sync>>;
66
67pub type PreCommitHook = Option<Box<dyn Fn(&Transaction<'_>) -> Result<(), Error> + Send + Sync>>;
68
69pub struct Options {
70    /// True if the filesystem is read-only.
71    pub read_only: bool,
72
73    /// The metadata keys will be rolled after this many bytes.  This must be large enough such that
74    /// we can't end up with more than two live keys (so it must be bigger than the maximum possible
75    /// size of unflushed journal contents).  This is exposed for testing purposes.
76    pub roll_metadata_key_byte_count: u64,
77
78    /// A callback that runs before every transaction is committed.  If this callback returns an
79    /// error then the transaction is failed with that error.
80    pub pre_commit_hook: PreCommitHook,
81
82    /// A callback that runs after every transaction has been committed.  This will be called whilst
83    /// a lock is held which will block more transactions from being committed.
84    pub post_commit_hook: PostCommitHook,
85
86    /// If true, don't do an initial reap of the graveyard at mount time.  This is useful for
87    /// testing.
88    pub skip_initial_reap: bool,
89
90    // The first duration is how long after the filesystem has been mounted to perform an initial
91    // trim.  The second is the interval to repeat trimming thereafter.  If set to None, no trimming
92    // is done.
93    // Default values are (5 minutes, 24 hours).
94    pub trim_config: Option<(std::time::Duration, std::time::Duration)>,
95
96    // If set, journal will not be used for writes. The user must call 'finalize' when finished.
97    // The provided superblock instance will be written upon finalize().
98    pub image_builder_mode: Option<SuperBlockInstance>,
99
100    // If true, the filesystem will use the hardware's inline crypto engine to write encrypted
101    // data. Requires the block device to support inline encryption and for `barriers_enabled` to
102    // be true.
103    // TODO(https://fxbug.dev/393196849): For now, this flag only prevents the filesystem from
104    // computing checksums. Update this comment when the filesystem actually uses inline
105    // encryption.
106    pub inline_crypto_enabled: bool,
107
108    // Configures the filesystem to use barriers instead of checksums to ensure consistency.
109    // Checksums may be computed and stored in extent records but will no longer be stored in the
110    // journal. The journal will use barriers to enforce proper ordering between data and metadata
111    // writes. Must be true if `inline_crypto_enabled` is true.
112    pub barriers_enabled: bool,
113}
114
115impl Default for Options {
116    fn default() -> Self {
117        Options {
118            roll_metadata_key_byte_count: 128 * 1024 * 1024,
119            read_only: false,
120            pre_commit_hook: None,
121            post_commit_hook: None,
122            skip_initial_reap: false,
123            trim_config: Some((TRIM_AFTER_BOOT_TIMER, TRIM_INTERVAL_TIMER)),
124            image_builder_mode: None,
125            inline_crypto_enabled: false,
126            barriers_enabled: false,
127        }
128    }
129}
130
131/// The context in which a transaction is being applied.
132pub struct ApplyContext<'a, 'b> {
133    /// The mode indicates whether the transaction is being replayed.
134    pub mode: ApplyMode<'a, 'b>,
135
136    /// The transaction checkpoint for this mutation.
137    pub checkpoint: JournalCheckpoint,
138}
139
140/// A transaction can be applied during replay or on a live running system (in which case a
141/// transaction object will be available).
142pub enum ApplyMode<'a, 'b> {
143    Replay,
144    Live(&'a Transaction<'b>),
145}
146
147impl ApplyMode<'_, '_> {
148    pub fn is_replay(&self) -> bool {
149        matches!(self, ApplyMode::Replay)
150    }
151
152    pub fn is_live(&self) -> bool {
153        matches!(self, ApplyMode::Live(_))
154    }
155}
156
157/// Objects that use journaling to track mutations (`Allocator` and `ObjectStore`) implement this.
158/// This is primarily used by `ObjectManager` and `SuperBlock` with flush calls used in a few tests.
159#[async_trait]
160pub trait JournalingObject: Send + Sync {
161    /// This method get called when the transaction commits, which can either be during live
162    /// operation (See `ObjectManager::apply_mutation`) or during journal replay, in which case
163    /// transaction will be None (See `super_block::read`).
164    fn apply_mutation(
165        &self,
166        mutation: Mutation,
167        context: &ApplyContext<'_, '_>,
168        assoc_obj: AssocObj<'_>,
169    ) -> Result<(), Error>;
170
171    /// Called when a transaction fails to commit.
172    fn drop_mutation(&self, mutation: Mutation, transaction: &Transaction<'_>);
173
174    /// Flushes in-memory changes to the device (to allow journal space to be freed).
175    ///
176    /// Also returns the earliest version of a struct in the filesystem.
177    async fn flush(&self) -> Result<Version, Error>;
178
179    /// Writes a mutation to the journal.  This allows objects to encrypt or otherwise modify what
180    /// gets written to the journal.
181    fn write_mutation(&self, mutation: &Mutation, mut writer: journal::Writer<'_>) {
182        writer.write(mutation.clone());
183    }
184}
185
186#[derive(Default)]
187pub struct SyncOptions<'a> {
188    /// If set, the journal will be flushed, as well as the underlying block device.  This is much
189    /// more expensive, but ensures the contents of the journal are persisted (which also acts as a
190    /// barrier, ensuring all previous journal writes are observable by future operations).
191    /// Note that when this is not set, the journal is *not* synchronously flushed by the sync call,
192    /// and it will return before the journal flush completes.  In other words, some journal
193    /// mutations may still be buffered in memory after this call returns.
194    pub flush_device: bool,
195
196    /// A precondition that is evaluated whilst a lock is held that determines whether or not the
197    /// sync needs to proceed.
198    pub precondition: Option<Box<dyn FnOnce() -> bool + 'a + Send>>,
199}
200
201pub struct OpenFxFilesystem(Arc<FxFilesystem>);
202
203impl OpenFxFilesystem {
204    /// Waits for filesystem to be dropped (so callers should ensure all direct and indirect
205    /// references are dropped) and returns the device.  No attempt is made at a graceful shutdown.
206    pub async fn take_device(self) -> DeviceHolder {
207        let fut = self.device.take_when_dropped();
208        std::mem::drop(self);
209        debug_assert_not_too_long!(fut)
210    }
211
212    /// Used to finalize a filesystem image when in image_builder_mode.
213    pub async fn finalize(&self) -> Result<(), Error> {
214        ensure!(
215            self.journal().image_builder_mode().is_some(),
216            "finalize() only valid in image_builder_mode."
217        );
218        self.journal().allocate_journal().await?;
219        self.journal().set_image_builder_mode(None);
220        self.journal().compact().await?;
221        Ok(())
222    }
223}
224
225impl From<Arc<FxFilesystem>> for OpenFxFilesystem {
226    fn from(fs: Arc<FxFilesystem>) -> Self {
227        Self(fs)
228    }
229}
230
231impl Drop for OpenFxFilesystem {
232    fn drop(&mut self) {
233        if self.options.image_builder_mode.is_some()
234            && self.journal().image_builder_mode().is_some()
235        {
236            error!("OpenFxFilesystem in image_builder_mode dropped without calling finalize().");
237        }
238        if !self.options.read_only && !self.closed.load(Ordering::SeqCst) {
239            error!("OpenFxFilesystem dropped without first being closed. Data loss may occur.");
240        }
241    }
242}
243
244impl std::ops::Deref for OpenFxFilesystem {
245    type Target = Arc<FxFilesystem>;
246
247    fn deref(&self) -> &Self::Target {
248        &self.0
249    }
250}
251
252pub struct FxFilesystemBuilder {
253    format: bool,
254    trace: bool,
255    options: Options,
256    journal_options: JournalOptions,
257    on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
258    on_new_store: Option<Box<dyn Fn(&ObjectStore) + Send + Sync>>,
259    fsck_after_every_transaction: bool,
260}
261
262impl FxFilesystemBuilder {
263    pub fn new() -> Self {
264        Self {
265            format: false,
266            trace: false,
267            options: Options::default(),
268            journal_options: JournalOptions::default(),
269            on_new_allocator: None,
270            on_new_store: None,
271            fsck_after_every_transaction: false,
272        }
273    }
274
275    /// Sets whether the block device should be formatted when opened. Defaults to `false`.
276    pub fn format(mut self, format: bool) -> Self {
277        self.format = format;
278        self
279    }
280
281    /// Enables or disables trace level logging. Defaults to `false`.
282    pub fn trace(mut self, trace: bool) -> Self {
283        self.trace = trace;
284        self
285    }
286
287    /// Sets whether the filesystem will be opened in read-only mode. Defaults to `false`.
288    /// Incompatible with `format`.
289    pub fn read_only(mut self, read_only: bool) -> Self {
290        self.options.read_only = read_only;
291        self
292    }
293
294    /// For image building and in-place migration.
295    ///
296    /// This mode avoids the initial write of super blocks and skips the journal for all
297    /// transactions. The user *must* call `finalize()` before closing the filesystem to trigger
298    /// a compaction of in-memory data structures, a minimal journal and a write to one
299    /// superblock (as specified).
300    pub fn image_builder_mode(mut self, mode: Option<SuperBlockInstance>) -> Self {
301        self.options.image_builder_mode = mode;
302        self
303    }
304
305    /// Sets how often the metadata keys are rolled. See `Options::roll_metadata_key_byte_count`.
306    pub fn roll_metadata_key_byte_count(mut self, roll_metadata_key_byte_count: u64) -> Self {
307        self.options.roll_metadata_key_byte_count = roll_metadata_key_byte_count;
308        self
309    }
310
311    /// Sets a callback that runs before every transaction. See `Options::pre_commit_hook`.
312    pub fn pre_commit_hook(
313        mut self,
314        hook: impl Fn(&Transaction<'_>) -> Result<(), Error> + Send + Sync + 'static,
315    ) -> Self {
316        self.options.pre_commit_hook = Some(Box::new(hook));
317        self
318    }
319
320    /// Sets a callback that runs after every transaction has been committed. See
321    /// `Options::post_commit_hook`.
322    pub fn post_commit_hook(
323        mut self,
324        hook: impl Fn() -> futures::future::BoxFuture<'static, ()> + Send + Sync + 'static,
325    ) -> Self {
326        self.options.post_commit_hook = Some(Box::new(hook));
327        self
328    }
329
330    /// Sets whether to do an initial reap of the graveyard at mount time. See
331    /// `Options::skip_initial_reap`. Defaults to `false`.
332    pub fn skip_initial_reap(mut self, skip_initial_reap: bool) -> Self {
333        self.options.skip_initial_reap = skip_initial_reap;
334        self
335    }
336
337    /// Sets the options for the journal.
338    pub fn journal_options(mut self, journal_options: JournalOptions) -> Self {
339        self.journal_options = journal_options;
340        self
341    }
342
343    /// Sets a method to be called immediately after creating the allocator.
344    pub fn on_new_allocator(
345        mut self,
346        on_new_allocator: impl Fn(Arc<Allocator>) + Send + Sync + 'static,
347    ) -> Self {
348        self.on_new_allocator = Some(Box::new(on_new_allocator));
349        self
350    }
351
352    /// Sets a method to be called each time a new store is registered with `ObjectManager`.
353    pub fn on_new_store(
354        mut self,
355        on_new_store: impl Fn(&ObjectStore) + Send + Sync + 'static,
356    ) -> Self {
357        self.on_new_store = Some(Box::new(on_new_store));
358        self
359    }
360
361    /// Enables or disables running fsck after every transaction. Defaults to `false`.
362    pub fn fsck_after_every_transaction(mut self, fsck_after_every_transaction: bool) -> Self {
363        self.fsck_after_every_transaction = fsck_after_every_transaction;
364        self
365    }
366
367    pub fn trim_config(
368        mut self,
369        delay_and_interval: Option<(std::time::Duration, std::time::Duration)>,
370    ) -> Self {
371        self.options.trim_config = delay_and_interval;
372        self
373    }
374
375    /// Enables or disables inline encryption. Defaults to `false`.
376    pub fn inline_crypto_enabled(mut self, inline_crypto_enabled: bool) -> Self {
377        self.options.inline_crypto_enabled = inline_crypto_enabled;
378        self
379    }
380
381    /// Enables or disables barriers in both the filesystem and journal options.
382    /// Defaults to `false`.
383    pub fn barriers_enabled(mut self, barriers_enabled: bool) -> Self {
384        self.options.barriers_enabled = barriers_enabled;
385        self.journal_options.barriers_enabled = barriers_enabled;
386        self
387    }
388
389    /// Constructs an `FxFilesystem` object with the specified settings.
390    pub async fn open(self, device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
391        let read_only = self.options.read_only;
392        if self.format && read_only {
393            bail!("Cannot initialize a filesystem as read-only");
394        }
395
396        // Inline encryption requires barriers to be enabled.
397        if self.options.inline_crypto_enabled && !self.options.barriers_enabled {
398            bail!("A filesystem using inline encryption requires barriers");
399        }
400
401        let objects = Arc::new(ObjectManager::new(self.on_new_store));
402        let journal = Arc::new(Journal::new(objects.clone(), self.journal_options));
403
404        let image_builder_mode = self.options.image_builder_mode;
405
406        let block_size = std::cmp::max(device.block_size().into(), MIN_BLOCK_SIZE);
407        assert_eq!(block_size % MIN_BLOCK_SIZE, 0);
408        assert!(block_size <= MAX_BLOCK_SIZE, "Max supported block size is 64KiB");
409
410        let mut fsck_after_every_transaction = None;
411        let mut filesystem_options = self.options;
412        if self.fsck_after_every_transaction {
413            let instance =
414                FsckAfterEveryTransaction::new(filesystem_options.post_commit_hook.take());
415            fsck_after_every_transaction = Some(instance.clone());
416            filesystem_options.post_commit_hook =
417                Some(Box::new(move || instance.clone().run().boxed()));
418        }
419
420        if !read_only && !self.format {
421            // See comment in JournalRecord::DidFlushDevice for why we need to flush the device
422            // before replay.
423            device.flush().await.context("Device flush failed")?;
424        }
425
426        let filesystem = Arc::new_cyclic(|weak: &Weak<FxFilesystem>| {
427            let weak = weak.clone();
428            FxFilesystem {
429                device,
430                block_size,
431                objects: objects.clone(),
432                journal,
433                commit_mutex: futures::lock::Mutex::new(()),
434                lock_manager: LockManager::new(),
435                flush_task: Mutex::new(None),
436                trim_task: Mutex::new(None),
437                closed: AtomicBool::new(true),
438                shutdown_event: Event::new(),
439                trace: self.trace,
440                graveyard: Graveyard::new(objects.clone()),
441                completed_transactions: metrics::detail().create_uint("completed_transactions", 0),
442                options: filesystem_options,
443                in_flight_transactions: AtomicU64::new(0),
444                transaction_limit_event: Event::new(),
445                _stores_node: metrics::register_fs(move || {
446                    let weak = weak.clone();
447                    Box::pin(async move {
448                        if let Some(fs) = weak.upgrade() {
449                            fs.populate_stores_node().await
450                        } else {
451                            Err(anyhow!("Filesystem has been dropped"))
452                        }
453                    })
454                }),
455            }
456        });
457
458        filesystem.journal().set_image_builder_mode(image_builder_mode);
459
460        filesystem.journal.set_trace(self.trace);
461        if self.format {
462            filesystem.journal.init_empty(filesystem.clone()).await?;
463            if image_builder_mode.is_none() {
464                // The filesystem isn't valid until superblocks are written but we want to defer
465                // that until last when migrating filesystems or building system images.
466                filesystem.journal.init_superblocks().await?;
467
468                // Start the graveyard's background reaping task.
469                filesystem.graveyard.clone().reap_async();
470            }
471
472            // Create the root volume directory.
473            let root_store = filesystem.root_store();
474            root_store.set_trace(self.trace);
475            let root_directory =
476                Directory::open(&root_store, root_store.root_directory_object_id())
477                    .await
478                    .context("Unable to open root volume directory")?;
479            let mut transaction = filesystem
480                .clone()
481                .new_transaction(
482                    lock_keys![LockKey::object(
483                        root_store.store_object_id(),
484                        root_directory.object_id()
485                    )],
486                    transaction::Options::default(),
487                )
488                .await?;
489            let volume_directory =
490                root_directory.create_child_dir(&mut transaction, VOLUMES_DIRECTORY).await?;
491            transaction.commit().await?;
492            objects.set_volume_directory(volume_directory);
493        } else {
494            filesystem
495                .journal
496                .replay(filesystem.clone(), self.on_new_allocator)
497                .await
498                .context("Journal replay failed")?;
499            filesystem.root_store().set_trace(self.trace);
500
501            if !read_only {
502                // Queue all purged entries for tombstoning.  Don't start the reaper yet because
503                // that can trigger a flush which can add more entries to the graveyard which might
504                // get caught in the initial reap and cause objects to be prematurely tombstoned.
505                for store in objects.unlocked_stores() {
506                    filesystem.graveyard.initial_reap(&store).await?;
507                }
508            }
509        }
510
511        // This must be after we've formatted the filesystem; it will fail during format otherwise.
512        if let Some(fsck_after_every_transaction) = fsck_after_every_transaction {
513            fsck_after_every_transaction
514                .fs
515                .set(Arc::downgrade(&filesystem))
516                .unwrap_or_else(|_| unreachable!());
517        }
518
519        filesystem.closed.store(false, Ordering::SeqCst);
520
521        if !read_only && image_builder_mode.is_none() {
522            // Start the background tasks.
523            filesystem.graveyard.clone().reap_async();
524
525            if let Some((delay, interval)) = filesystem.options.trim_config.clone() {
526                filesystem.start_trim_task(delay, interval);
527            }
528        }
529
530        Ok(filesystem.into())
531    }
532}
533
534pub struct FxFilesystem {
535    block_size: u64,
536    objects: Arc<ObjectManager>,
537    journal: Arc<Journal>,
538    commit_mutex: futures::lock::Mutex<()>,
539    lock_manager: LockManager,
540    flush_task: Mutex<Option<fasync::Task<()>>>,
541    trim_task: Mutex<Option<fasync::Task<()>>>,
542    closed: AtomicBool,
543    // An event that is signalled when the filesystem starts to shut down.
544    shutdown_event: Event,
545    trace: bool,
546    graveyard: Arc<Graveyard>,
547    completed_transactions: UintProperty,
548    options: Options,
549
550    // The number of in-flight transactions which we will limit to MAX_IN_FLIGHT_TRANSACTIONS.
551    in_flight_transactions: AtomicU64,
552
553    // An event that is used to wake up tasks that are blocked due to the in-flight transaction
554    // limit.
555    transaction_limit_event: Event,
556
557    // NOTE: This *must* go last so that when users take the device from a closed filesystem, the
558    // filesystem has dropped all other members first (Rust drops members in declaration order).
559    device: DeviceHolder,
560
561    // The "stores" node in the Inspect tree.
562    _stores_node: LazyNode,
563}
564
565#[fxfs_trace::trace]
566impl FxFilesystem {
567    pub async fn new_empty(device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
568        FxFilesystemBuilder::new().format(true).open(device).await
569    }
570
571    pub async fn open(device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
572        FxFilesystemBuilder::new().open(device).await
573    }
574
575    pub fn root_parent_store(&self) -> Arc<ObjectStore> {
576        self.objects.root_parent_store()
577    }
578
579    pub async fn close(&self) -> Result<(), Error> {
580        assert_eq!(self.closed.swap(true, Ordering::SeqCst), false);
581        self.shutdown_event.notify(usize::MAX);
582        debug_assert_not_too_long!(self.graveyard.wait_for_reap());
583        let trim_task = self.trim_task.lock().take();
584        if let Some(task) = trim_task {
585            debug_assert_not_too_long!(task);
586        }
587        self.journal.stop_compactions().await;
588        let sync_status =
589            self.journal.sync(SyncOptions { flush_device: true, ..Default::default() }).await;
590        match &sync_status {
591            Ok(checkpoint) => info!(
592                "Filesystem closed (checkpoint={}, metadata_reservation={:?}, \
593                 reservation_required={}, borrowed={})",
594                checkpoint.as_ref().unwrap().0.file_offset,
595                self.object_manager().metadata_reservation(),
596                self.object_manager().required_reservation(),
597                self.object_manager().borrowed_metadata_space(),
598            ),
599            Err(e) => error!(error:? = e; "Failed to sync filesystem; data may be lost"),
600        }
601        self.journal.terminate();
602        let flush_task = self.flush_task.lock().take();
603        if let Some(task) = flush_task {
604            debug_assert_not_too_long!(task);
605        }
606        // Regardless of whether sync succeeds, we should close the device, since otherwise we will
607        // crash instead of exiting gracefully.
608        self.device().close().await.context("Failed to close device")?;
609        sync_status.map(|_| ())
610    }
611
612    pub fn device(&self) -> Arc<dyn Device> {
613        Arc::clone(&self.device)
614    }
615
616    pub fn root_store(&self) -> Arc<ObjectStore> {
617        self.objects.root_store()
618    }
619
620    pub fn allocator(&self) -> Arc<Allocator> {
621        self.objects.allocator()
622    }
623
624    pub fn object_manager(&self) -> &Arc<ObjectManager> {
625        &self.objects
626    }
627
628    pub fn journal(&self) -> &Arc<Journal> {
629        &self.journal
630    }
631
632    pub async fn sync(&self, options: SyncOptions<'_>) -> Result<(), Error> {
633        self.journal.sync(options).await.map(|_| ())
634    }
635
636    pub fn block_size(&self) -> u64 {
637        self.block_size
638    }
639
640    pub fn get_info(&self) -> Info {
641        Info {
642            total_bytes: self.device.size(),
643            used_bytes: self.object_manager().allocator().get_used_bytes().0,
644        }
645    }
646
647    pub fn super_block_header(&self) -> SuperBlockHeader {
648        self.journal.super_block_header()
649    }
650
651    pub fn graveyard(&self) -> &Arc<Graveyard> {
652        &self.graveyard
653    }
654
655    pub fn trace(&self) -> bool {
656        self.trace
657    }
658
659    pub fn options(&self) -> &Options {
660        &self.options
661    }
662
663    /// Returns a guard that must be taken before any transaction can commence.  This guard takes a
664    /// shared lock on the filesystem.  `fsck` will take an exclusive lock so that it can get a
665    /// consistent picture of the filesystem that it can verify.  It is important that this lock is
666    /// acquired before *all* other locks.  It is also important that this lock is not taken twice
667    /// by the same task since that can lead to deadlocks if another task tries to take a write
668    /// lock.
669    pub async fn txn_guard(self: Arc<Self>) -> TxnGuard<'static> {
670        TxnGuard::Owned(
671            self.lock_manager
672                .read_lock(lock_keys!(LockKey::Filesystem))
673                .await
674                .into_owned(self.clone()),
675        )
676    }
677
678    pub async fn new_transaction<'a>(
679        self: Arc<Self>,
680        locks: LockKeys,
681        options: transaction::Options<'a>,
682    ) -> Result<Transaction<'a>, Error> {
683        let guard = if let Some(guard) = options.txn_guard.as_ref() {
684            TxnGuard::Borrowed(guard)
685        } else {
686            self.txn_guard().await
687        };
688        Transaction::new(guard, options, locks).await
689    }
690
691    #[trace]
692    pub async fn commit_transaction(
693        &self,
694        transaction: &mut Transaction<'_>,
695        callback: &mut (dyn FnMut(u64) + Send),
696    ) -> Result<u64, Error> {
697        if let Some(hook) = self.options.pre_commit_hook.as_ref() {
698            hook(transaction)?;
699        }
700        debug_assert_not_too_long!(self.lock_manager.commit_prepare(&transaction));
701        self.maybe_start_flush_task();
702        let _guard = debug_assert_not_too_long!(self.commit_mutex.lock());
703        let journal_offset = if self.journal().image_builder_mode().is_some() {
704            let journal_checkpoint =
705                JournalCheckpoint { file_offset: 0, checksum: 0, version: LATEST_VERSION };
706            let maybe_mutation = self
707                .object_manager()
708                .apply_transaction(transaction, &journal_checkpoint)
709                .expect("Transactions must not fail in image_builder_mode");
710            if let Some(mutation) = maybe_mutation {
711                assert!(matches!(mutation, Mutation::UpdateBorrowed(_)));
712                // These are Mutation::UpdateBorrowed which are normally used to track borrowing of
713                // metadata reservations. As we are image-building and not using the journal,
714                // we don't track this.
715            }
716            self.object_manager().did_commit_transaction(transaction, &journal_checkpoint, 0);
717            0
718        } else {
719            self.journal.commit(transaction).await?
720        };
721        self.completed_transactions.add(1);
722
723        // For now, call the callback whilst holding the lock.  Technically, we don't need to do
724        // that except if there's a post-commit-hook (which there usually won't be).  We can
725        // consider changing this if we need to for performance, but we'd need to double check that
726        // callers don't depend on this.
727        callback(journal_offset);
728
729        if let Some(hook) = self.options.post_commit_hook.as_ref() {
730            hook().await;
731        }
732
733        Ok(journal_offset)
734    }
735
736    pub fn lock_manager(&self) -> &LockManager {
737        &self.lock_manager
738    }
739
740    pub(crate) fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
741        if !matches!(transaction.metadata_reservation, MetadataReservation::None) {
742            self.sub_transaction();
743        }
744        // If we placed a hold for metadata space, return it now.
745        if let MetadataReservation::Hold(hold_amount) =
746            std::mem::replace(&mut transaction.metadata_reservation, MetadataReservation::None)
747        {
748            let hold = transaction
749                .allocator_reservation
750                .unwrap()
751                .reserve(0)
752                .expect("Zero should always succeed.");
753            hold.add(hold_amount);
754        }
755        self.objects.drop_transaction(transaction);
756        self.lock_manager.drop_transaction(transaction);
757    }
758
759    fn maybe_start_flush_task(&self) {
760        let mut flush_task = self.flush_task.lock();
761        if flush_task.is_none() {
762            let journal = self.journal.clone();
763            *flush_task = Some(fasync::Task::spawn(journal.flush_task()));
764        }
765    }
766
767    // Returns the number of bytes trimmed.
768    async fn do_trim(&self) -> Result<usize, Error> {
769        const MAX_EXTENTS_PER_BATCH: usize = 8;
770        const MAX_EXTENT_SIZE: usize = 256 * 1024;
771        let mut offset = 0;
772        let mut bytes_trimmed = 0;
773        loop {
774            if self.closed.load(Ordering::Relaxed) {
775                info!("Filesystem is closed, nothing to trim");
776                return Ok(bytes_trimmed);
777            }
778            let allocator = self.allocator();
779            let trimmable_extents =
780                allocator.take_for_trimming(offset, MAX_EXTENT_SIZE, MAX_EXTENTS_PER_BATCH).await?;
781            for device_range in trimmable_extents.extents() {
782                self.device.trim(device_range.clone()).await?;
783                bytes_trimmed += device_range.length()? as usize;
784            }
785            if let Some(device_range) = trimmable_extents.extents().last() {
786                offset = device_range.end;
787            } else {
788                break;
789            }
790        }
791        Ok(bytes_trimmed)
792    }
793
794    fn start_trim_task(
795        self: &Arc<Self>,
796        delay: std::time::Duration,
797        interval: std::time::Duration,
798    ) {
799        if !self.device.supports_trim() {
800            info!("Device does not support trim; not scheduling trimming");
801            return;
802        }
803        let this = self.clone();
804        let mut next_timer = delay;
805        *self.trim_task.lock() = Some(fasync::Task::spawn(async move {
806            loop {
807                let shutdown_listener = this.shutdown_event.listen();
808                // Note that we need to check if the filesystem was closed after we start listening
809                // to the shutdown event, but before we start waiting on `timer`, because otherwise
810                // we might start listening on `shutdown_event` *after* the event was signaled, and
811                // so `shutdown_listener` will never fire, and this task will get stuck until
812                // `timer` expires.
813                if this.closed.load(Ordering::SeqCst) {
814                    return;
815                }
816                futures::select!(
817                    () = fasync::Timer::new(next_timer.clone()).fuse() => {},
818                    () = shutdown_listener.fuse() => return,
819                );
820                let start_time = std::time::Instant::now();
821                let res = this.do_trim().await;
822                let duration = std::time::Instant::now() - start_time;
823                next_timer = interval.clone();
824                match res {
825                    Ok(bytes_trimmed) => info!(
826                        "Trimmed {bytes_trimmed} bytes in {duration:?}.  Next trim in \
827                        {next_timer:?}",
828                    ),
829                    Err(e) => error!(e:?; "Failed to trim"),
830                }
831            }
832        }));
833    }
834
835    pub(crate) async fn reservation_for_transaction<'a>(
836        self: &Arc<Self>,
837        options: transaction::Options<'a>,
838    ) -> Result<(MetadataReservation, Option<&'a Reservation>, Option<Hold<'a>>), Error> {
839        if self.options.image_builder_mode.is_some() {
840            // Image builder mode avoids the journal so reservation tracking for metadata overheads
841            // doesn't make sense and so we essentially have 'all or nothing' semantics instead.
842            return Ok((MetadataReservation::Borrowed, None, None));
843        }
844        if !options.skip_journal_checks {
845            self.maybe_start_flush_task();
846            self.journal.check_journal_space().await?;
847        }
848
849        // We support three options for metadata space reservation:
850        //
851        //   1. We can borrow from the filesystem's metadata reservation.  This should only be
852        //      be used on the understanding that eventually, potentially after a full compaction,
853        //      there should be no net increase in space used.  For example, unlinking an object
854        //      should eventually decrease the amount of space used and setting most attributes
855        //      should not result in any change.
856        //
857        //   2. A reservation is provided in which case we'll place a hold on some of it for
858        //      metadata.
859        //
860        //   3. No reservation is supplied, so we try and reserve space with the allocator now,
861        //      and will return NoSpace if that fails.
862        let mut hold = None;
863        let metadata_reservation = if options.borrow_metadata_space {
864            MetadataReservation::Borrowed
865        } else {
866            match options.allocator_reservation {
867                Some(reservation) => {
868                    hold = Some(
869                        reservation
870                            .reserve(TRANSACTION_METADATA_MAX_AMOUNT)
871                            .ok_or(FxfsError::NoSpace)?,
872                    );
873                    MetadataReservation::Hold(TRANSACTION_METADATA_MAX_AMOUNT)
874                }
875                None => {
876                    let reservation = self
877                        .allocator()
878                        .reserve(None, TRANSACTION_METADATA_MAX_AMOUNT)
879                        .ok_or(FxfsError::NoSpace)?;
880                    MetadataReservation::Reservation(reservation)
881                }
882            }
883        };
884        Ok((metadata_reservation, options.allocator_reservation, hold))
885    }
886
887    pub(crate) async fn add_transaction(&self, skip_journal_checks: bool) {
888        if skip_journal_checks {
889            self.in_flight_transactions.fetch_add(1, Ordering::Relaxed);
890        } else {
891            let inc = || {
892                let mut in_flights = self.in_flight_transactions.load(Ordering::Relaxed);
893                while in_flights < MAX_IN_FLIGHT_TRANSACTIONS {
894                    match self.in_flight_transactions.compare_exchange_weak(
895                        in_flights,
896                        in_flights + 1,
897                        Ordering::Relaxed,
898                        Ordering::Relaxed,
899                    ) {
900                        Ok(_) => return true,
901                        Err(x) => in_flights = x,
902                    }
903                }
904                return false;
905            };
906            while !inc() {
907                let listener = self.transaction_limit_event.listen();
908                if inc() {
909                    break;
910                }
911                listener.await;
912            }
913        }
914    }
915
916    pub(crate) fn sub_transaction(&self) {
917        let old = self.in_flight_transactions.fetch_sub(1, Ordering::Relaxed);
918        assert!(old != 0);
919        if old <= MAX_IN_FLIGHT_TRANSACTIONS {
920            self.transaction_limit_event.notify(usize::MAX);
921        }
922    }
923
924    pub async fn truncate_guard(&self, store_id: u64, object_id: u64) -> TruncateGuard<'_> {
925        let keys = lock_keys![LockKey::truncate(store_id, object_id,)];
926        TruncateGuard(self.lock_manager().write_lock(keys).await)
927    }
928
929    async fn populate_stores_node(&self) -> Result<Inspector, Error> {
930        let inspector = fuchsia_inspect::Inspector::default();
931        let root = inspector.root();
932        root.record_child("__root", |n| self.root_store().record_data(n));
933        let object_manager = self.object_manager();
934        let volume_directory = object_manager.volume_directory();
935        let layer_set = volume_directory.store().tree().layer_set();
936        let mut merger = layer_set.merger();
937        let mut iter = volume_directory.iter(&mut merger).await?;
938        while let Some((name, id, _)) = iter.get() {
939            if let Some(store) = object_manager.store(id) {
940                root.record_child(name.to_string(), |n| store.record_data(n));
941            }
942            iter.advance().await?;
943        }
944        Ok(inspector)
945    }
946}
947
948pub enum TxnGuard<'a> {
949    Borrowed(&'a TxnGuard<'a>),
950    Owned(ReadGuard<'static>),
951}
952
953impl TxnGuard<'_> {
954    pub fn fs(&self) -> &Arc<FxFilesystem> {
955        match self {
956            TxnGuard::Borrowed(b) => b.fs(),
957            TxnGuard::Owned(o) => o.fs().unwrap(),
958        }
959    }
960}
961
962/// A wrapper around a guard that needs to be taken when truncating an object.
963#[allow(dead_code)]
964pub struct TruncateGuard<'a>(WriteGuard<'a>);
965
966/// Helper method for making a new filesystem.
967pub async fn mkfs(device: DeviceHolder) -> Result<DeviceHolder, Error> {
968    let fs = FxFilesystem::new_empty(device).await?;
969    fs.close().await?;
970    Ok(fs.take_device().await)
971}
972
973/// Helper method for making a new filesystem with a single named volume.
974/// This shouldn't be used in production; instead volumes should be created with the Volumes
975/// protocol.
976pub async fn mkfs_with_volume(
977    device: DeviceHolder,
978    volume_name: &str,
979    crypt: Option<Arc<dyn Crypt>>,
980) -> Result<DeviceHolder, Error> {
981    let fs = FxFilesystem::new_empty(device).await?;
982    {
983        // expect instead of propagating errors here, since otherwise we could drop |fs| before
984        // close is called, which leads to confusing and unrelated error messages.
985        let root_volume = root_volume(fs.clone()).await.expect("Open root_volume failed");
986        root_volume
987            .new_volume(
988                volume_name,
989                NewChildStoreOptions {
990                    options: StoreOptions { crypt, ..StoreOptions::default() },
991                    ..Default::default()
992                },
993            )
994            .await
995            .expect("Create volume failed");
996    }
997    fs.close().await?;
998    Ok(fs.take_device().await)
999}
1000
1001struct FsckAfterEveryTransaction {
1002    fs: OnceCell<Weak<FxFilesystem>>,
1003    old_hook: PostCommitHook,
1004}
1005
1006impl FsckAfterEveryTransaction {
1007    fn new(old_hook: PostCommitHook) -> Arc<Self> {
1008        Arc::new(Self { fs: OnceCell::new(), old_hook })
1009    }
1010
1011    async fn run(self: Arc<Self>) {
1012        if let Some(fs) = self.fs.get().and_then(Weak::upgrade) {
1013            let options = FsckOptions {
1014                fail_on_warning: true,
1015                no_lock: true,
1016                quiet: true,
1017                ..Default::default()
1018            };
1019            fsck_with_options(fs.clone(), &options).await.expect("fsck failed");
1020            let object_manager = fs.object_manager();
1021            for store in object_manager.unlocked_stores() {
1022                let store_id = store.store_object_id();
1023                if !object_manager.is_system_store(store_id) {
1024                    fsck_volume_with_options(fs.as_ref(), &options, store_id, None)
1025                        .await
1026                        .expect("fsck_volume_with_options failed");
1027                }
1028            }
1029        }
1030        if let Some(old_hook) = self.old_hook.as_ref() {
1031            old_hook().await;
1032        }
1033    }
1034}
1035
1036#[cfg(test)]
1037mod tests {
1038    use super::{FxFilesystem, FxFilesystemBuilder, SyncOptions};
1039    use crate::fsck::{fsck, fsck_volume};
1040    use crate::log::*;
1041    use crate::lsm_tree::Operation;
1042    use crate::lsm_tree::types::Item;
1043    use crate::object_handle::{
1044        INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle, WriteObjectHandle,
1045    };
1046    use crate::object_store::directory::{Directory, replace_child};
1047    use crate::object_store::journal::JournalOptions;
1048    use crate::object_store::journal::super_block::SuperBlockInstance;
1049    use crate::object_store::transaction::{LockKey, Options, lock_keys};
1050    use crate::object_store::volume::root_volume;
1051    use crate::object_store::{
1052        HandleOptions, NewChildStoreOptions, ObjectDescriptor, ObjectStore, StoreOptions,
1053    };
1054    use crate::range::RangeExt;
1055    use fuchsia_async as fasync;
1056    use fuchsia_sync::Mutex;
1057    use futures::future::join_all;
1058    use futures::stream::{FuturesUnordered, TryStreamExt};
1059    use fxfs_insecure_crypto::InsecureCrypt;
1060    use rustc_hash::FxHashMap as HashMap;
1061    use std::ops::Range;
1062    use std::sync::Arc;
1063    use std::sync::atomic::{self, AtomicU32};
1064    use std::time::Duration;
1065    use storage_device::DeviceHolder;
1066    use storage_device::fake_device::{self, FakeDevice};
1067    use test_case::test_case;
1068
1069    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
1070
1071    #[fuchsia::test(threads = 10)]
1072    async fn test_compaction() {
1073        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1074
1075        // If compaction is not working correctly, this test will run out of space.
1076        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1077        let root_store = fs.root_store();
1078        let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1079            .await
1080            .expect("open failed");
1081
1082        let mut tasks = Vec::new();
1083        for i in 0..2 {
1084            let mut transaction = fs
1085                .clone()
1086                .new_transaction(
1087                    lock_keys![LockKey::object(
1088                        root_store.store_object_id(),
1089                        root_directory.object_id()
1090                    )],
1091                    Options::default(),
1092                )
1093                .await
1094                .expect("new_transaction failed");
1095            let handle = root_directory
1096                .create_child_file(&mut transaction, &format!("{}", i))
1097                .await
1098                .expect("create_child_file failed");
1099            transaction.commit().await.expect("commit failed");
1100            tasks.push(fasync::Task::spawn(async move {
1101                const TEST_DATA: &[u8] = b"hello";
1102                let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
1103                buf.as_mut_slice().copy_from_slice(TEST_DATA);
1104                for _ in 0..1500 {
1105                    handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1106                }
1107            }));
1108        }
1109        join_all(tasks).await;
1110        fs.sync(SyncOptions::default()).await.expect("sync failed");
1111
1112        fsck(fs.clone()).await.expect("fsck failed");
1113        fs.close().await.expect("Close failed");
1114    }
1115
1116    #[fuchsia::test(threads = 10)]
1117    async fn test_replay_is_identical() {
1118        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1119        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1120
1121        // Reopen the store, but set reclaim size to a very large value which will effectively
1122        // stop the journal from flushing and allows us to track all the mutations to the store.
1123        fs.close().await.expect("close failed");
1124        let device = fs.take_device().await;
1125        device.reopen(false);
1126
1127        struct Mutations<K, V>(Mutex<Vec<(Operation, Item<K, V>)>>);
1128
1129        impl<K: Clone, V: Clone> Mutations<K, V> {
1130            fn new() -> Self {
1131                Mutations(Mutex::new(Vec::new()))
1132            }
1133
1134            fn push(&self, operation: Operation, item: &Item<K, V>) {
1135                self.0.lock().push((operation, item.clone()));
1136            }
1137        }
1138
1139        let open_fs = |device,
1140                       object_mutations: Arc<Mutex<HashMap<_, _>>>,
1141                       allocator_mutations: Arc<Mutations<_, _>>| async {
1142            FxFilesystemBuilder::new()
1143                .journal_options(JournalOptions { reclaim_size: u64::MAX, ..Default::default() })
1144                .on_new_allocator(move |allocator| {
1145                    let allocator_mutations = allocator_mutations.clone();
1146                    allocator.tree().set_mutation_callback(Some(Box::new(move |op, item| {
1147                        allocator_mutations.push(op, item)
1148                    })));
1149                })
1150                .on_new_store(move |store| {
1151                    let mutations = Arc::new(Mutations::new());
1152                    object_mutations.lock().insert(store.store_object_id(), mutations.clone());
1153                    store.tree().set_mutation_callback(Some(Box::new(move |op, item| {
1154                        mutations.push(op, item)
1155                    })));
1156                })
1157                .open(device)
1158                .await
1159                .expect("open failed")
1160        };
1161
1162        let allocator_mutations = Arc::new(Mutations::new());
1163        let object_mutations = Arc::new(Mutex::new(HashMap::default()));
1164        let fs = open_fs(device, object_mutations.clone(), allocator_mutations.clone()).await;
1165
1166        let root_store = fs.root_store();
1167        let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1168            .await
1169            .expect("open failed");
1170
1171        let mut transaction = fs
1172            .clone()
1173            .new_transaction(
1174                lock_keys![LockKey::object(
1175                    root_store.store_object_id(),
1176                    root_directory.object_id()
1177                )],
1178                Options::default(),
1179            )
1180            .await
1181            .expect("new_transaction failed");
1182        let object = root_directory
1183            .create_child_file(&mut transaction, "test")
1184            .await
1185            .expect("create_child_file failed");
1186        transaction.commit().await.expect("commit failed");
1187
1188        // Append some data.
1189        let buf = object.allocate_buffer(10000).await;
1190        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1191
1192        // Overwrite some data.
1193        object.write_or_append(Some(5000), buf.as_ref()).await.expect("write failed");
1194
1195        // Truncate.
1196        object.truncate(3000).await.expect("truncate failed");
1197
1198        // Delete the object.
1199        let mut transaction = fs
1200            .clone()
1201            .new_transaction(
1202                lock_keys![
1203                    LockKey::object(root_store.store_object_id(), root_directory.object_id()),
1204                    LockKey::object(root_store.store_object_id(), object.object_id()),
1205                ],
1206                Options::default(),
1207            )
1208            .await
1209            .expect("new_transaction failed");
1210
1211        replace_child(&mut transaction, None, (&root_directory, "test"))
1212            .await
1213            .expect("replace_child failed");
1214
1215        transaction.commit().await.expect("commit failed");
1216
1217        // Finally tombstone the object.
1218        root_store
1219            .tombstone_object(object.object_id(), Options::default())
1220            .await
1221            .expect("tombstone failed");
1222
1223        // Now reopen and check that replay produces the same set of mutations.
1224        fs.close().await.expect("close failed");
1225
1226        let metadata_reservation_amount = fs.object_manager().metadata_reservation().amount();
1227
1228        let device = fs.take_device().await;
1229        device.reopen(false);
1230
1231        let replayed_object_mutations = Arc::new(Mutex::new(HashMap::default()));
1232        let replayed_allocator_mutations = Arc::new(Mutations::new());
1233        let fs = open_fs(
1234            device,
1235            replayed_object_mutations.clone(),
1236            replayed_allocator_mutations.clone(),
1237        )
1238        .await;
1239
1240        let m1 = object_mutations.lock();
1241        let m2 = replayed_object_mutations.lock();
1242        assert_eq!(m1.len(), m2.len());
1243        for (store_id, mutations) in &*m1 {
1244            let mutations = mutations.0.lock();
1245            let replayed = m2.get(&store_id).expect("Found unexpected store").0.lock();
1246            assert_eq!(mutations.len(), replayed.len());
1247            for ((op1, i1), (op2, i2)) in mutations.iter().zip(replayed.iter()) {
1248                assert_eq!(op1, op2);
1249                assert_eq!(i1.key, i2.key);
1250                assert_eq!(i1.value, i2.value);
1251                assert_eq!(i1.sequence, i2.sequence);
1252            }
1253        }
1254
1255        let a1 = allocator_mutations.0.lock();
1256        let a2 = replayed_allocator_mutations.0.lock();
1257        assert_eq!(a1.len(), a2.len());
1258        for ((op1, i1), (op2, i2)) in a1.iter().zip(a2.iter()) {
1259            assert_eq!(op1, op2);
1260            assert_eq!(i1.key, i2.key);
1261            assert_eq!(i1.value, i2.value);
1262            assert_eq!(i1.sequence, i2.sequence);
1263        }
1264
1265        assert_eq!(
1266            fs.object_manager().metadata_reservation().amount(),
1267            metadata_reservation_amount
1268        );
1269    }
1270
1271    #[fuchsia::test]
1272    async fn test_max_in_flight_transactions() {
1273        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1274        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1275
1276        let transactions = FuturesUnordered::new();
1277        for _ in 0..super::MAX_IN_FLIGHT_TRANSACTIONS {
1278            transactions.push(fs.clone().new_transaction(lock_keys![], Options::default()));
1279        }
1280        let mut transactions: Vec<_> = transactions.try_collect().await.unwrap();
1281
1282        // Trying to create another one should be blocked.
1283        let mut fut = std::pin::pin!(fs.clone().new_transaction(lock_keys![], Options::default()));
1284        assert!(futures::poll!(&mut fut).is_pending());
1285
1286        // Dropping one should allow it to proceed.
1287        transactions.pop();
1288
1289        assert!(futures::poll!(&mut fut).is_ready());
1290    }
1291
1292    // If run on a single thread, the trim tasks starve out other work.
1293    #[fuchsia::test(threads = 10)]
1294    async fn test_continuously_trim() {
1295        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1296        let fs = FxFilesystemBuilder::new()
1297            .trim_config(Some((Duration::ZERO, Duration::ZERO)))
1298            .format(true)
1299            .open(device)
1300            .await
1301            .expect("open failed");
1302        // Do a small sleep so trim has time to get going.
1303        fasync::Timer::new(Duration::from_millis(10)).await;
1304
1305        // Create and delete a bunch of files whilst trim is ongoing.  This just ensures that
1306        // regular usage isn't affected by trim.
1307        let root_store = fs.root_store();
1308        let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1309            .await
1310            .expect("open failed");
1311        for _ in 0..100 {
1312            let mut transaction = fs
1313                .clone()
1314                .new_transaction(
1315                    lock_keys![LockKey::object(
1316                        root_store.store_object_id(),
1317                        root_directory.object_id()
1318                    )],
1319                    Options::default(),
1320                )
1321                .await
1322                .expect("new_transaction failed");
1323            let object = root_directory
1324                .create_child_file(&mut transaction, "test")
1325                .await
1326                .expect("create_child_file failed");
1327            transaction.commit().await.expect("commit failed");
1328
1329            {
1330                let buf = object.allocate_buffer(1024).await;
1331                object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1332            }
1333            std::mem::drop(object);
1334
1335            let mut transaction = root_directory
1336                .acquire_context_for_replace(None, "test", true)
1337                .await
1338                .expect("acquire_context_for_replace failed")
1339                .transaction;
1340            replace_child(&mut transaction, None, (&root_directory, "test"))
1341                .await
1342                .expect("replace_child failed");
1343            transaction.commit().await.expect("commit failed");
1344        }
1345        fs.close().await.expect("close failed");
1346    }
1347
1348    #[test_case(true; "test power fail with barriers")]
1349    #[test_case(false; "test power fail with checksums")]
1350    #[fuchsia::test]
1351    async fn test_power_fail(barriers_enabled: bool) {
1352        // This test randomly discards blocks, so we run it a few times to increase the chances
1353        // of catching an issue in a single run.
1354        for _ in 0..10 {
1355            let (store_id, device, test_file_object_id) = {
1356                let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
1357                let fs = if barriers_enabled {
1358                    FxFilesystemBuilder::new()
1359                        .barriers_enabled(true)
1360                        .format(true)
1361                        .open(device)
1362                        .await
1363                        .expect("new filesystem failed")
1364                } else {
1365                    FxFilesystem::new_empty(device).await.expect("new_empty failed")
1366                };
1367                let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
1368
1369                fs.sync(SyncOptions { flush_device: true, ..SyncOptions::default() })
1370                    .await
1371                    .expect("sync failed");
1372
1373                let store = root_volume
1374                    .new_volume(
1375                        "test",
1376                        NewChildStoreOptions {
1377                            options: StoreOptions {
1378                                crypt: Some(Arc::new(InsecureCrypt::new())),
1379                                ..StoreOptions::default()
1380                            },
1381                            ..Default::default()
1382                        },
1383                    )
1384                    .await
1385                    .expect("new_volume failed");
1386                let root_directory = Directory::open(&store, store.root_directory_object_id())
1387                    .await
1388                    .expect("open failed");
1389
1390                // Create a number of files with the goal of using up more than one journal block.
1391                async fn create_files(store: &Arc<ObjectStore>, prefix: &str) {
1392                    let fs = store.filesystem();
1393                    let root_directory = Directory::open(store, store.root_directory_object_id())
1394                        .await
1395                        .expect("open failed");
1396                    for i in 0..100 {
1397                        let mut transaction = fs
1398                            .clone()
1399                            .new_transaction(
1400                                lock_keys![LockKey::object(
1401                                    store.store_object_id(),
1402                                    store.root_directory_object_id()
1403                                )],
1404                                Options::default(),
1405                            )
1406                            .await
1407                            .expect("new_transaction failed");
1408                        root_directory
1409                            .create_child_file(&mut transaction, &format!("{prefix} {i}"))
1410                            .await
1411                            .expect("create_child_file failed");
1412                        transaction.commit().await.expect("commit failed");
1413                    }
1414                }
1415
1416                // Create one batch of files.
1417                create_files(&store, "A").await;
1418
1419                // Create a file and write something to it.  This will make sure there's a
1420                // transaction present that includes a checksum.
1421                let mut transaction = fs
1422                    .clone()
1423                    .new_transaction(
1424                        lock_keys![LockKey::object(
1425                            store.store_object_id(),
1426                            store.root_directory_object_id()
1427                        )],
1428                        Options::default(),
1429                    )
1430                    .await
1431                    .expect("new_transaction failed");
1432                let object = root_directory
1433                    .create_child_file(&mut transaction, "test")
1434                    .await
1435                    .expect("create_child_file failed");
1436                transaction.commit().await.expect("commit failed");
1437
1438                let mut transaction =
1439                    object.new_transaction().await.expect("new_transaction failed");
1440                let mut buffer = object.allocate_buffer(4096).await;
1441                buffer.as_mut_slice().fill(0xed);
1442                object
1443                    .txn_write(&mut transaction, 0, buffer.as_ref())
1444                    .await
1445                    .expect("txn_write failed");
1446                transaction.commit().await.expect("commit failed");
1447
1448                // Create another batch of files.
1449                create_files(&store, "B").await;
1450
1451                // Sync the device, but don't flush the device. We want to do this so we can
1452                // randomly discard blocks below.
1453                fs.sync(SyncOptions::default()).await.expect("sync failed");
1454
1455                // When we call `sync` above on the filesystem, it will pad the journal so that it
1456                // will get written, but it doesn't wait for the write to occur.  We wait for a
1457                // short time here to give allow time for the journal to be written.  Adding timers
1458                // isn't great, but this test already isn't deterministic since we randomly discard
1459                // blocks.
1460                fasync::Timer::new(Duration::from_millis(10)).await;
1461
1462                (
1463                    store.store_object_id(),
1464                    fs.device().snapshot().expect("snapshot failed"),
1465                    object.object_id(),
1466                )
1467            };
1468
1469            // Randomly discard blocks since the last flush.  This simulates what might happen in
1470            // the case of power-loss.  This will be an uncontrolled unmount.
1471            device
1472                .discard_random_since_last_flush()
1473                .expect("discard_random_since_last_flush failed");
1474
1475            let fs = FxFilesystem::open(device).await.expect("open failed");
1476            fsck(fs.clone()).await.expect("fsck failed");
1477
1478            let mut check_test_file = false;
1479
1480            // If we replayed and the store exists (i.e. the transaction that created the store
1481            // made it out), start by running fsck on it.
1482            let object_id = if fs.object_manager().store(store_id).is_some() {
1483                fsck_volume(&fs, store_id, Some(Arc::new(InsecureCrypt::new())))
1484                    .await
1485                    .expect("fsck_volume failed");
1486
1487                // Now we want to create another file, unmount cleanly, and then finally check that
1488                // the new file exists.  This checks that we can continue to use the filesystem
1489                // after an unclean unmount.
1490                let store = root_volume(fs.clone())
1491                    .await
1492                    .expect("root_volume failed")
1493                    .volume(
1494                        "test",
1495                        StoreOptions {
1496                            crypt: Some(Arc::new(InsecureCrypt::new())),
1497                            ..StoreOptions::default()
1498                        },
1499                    )
1500                    .await
1501                    .expect("volume failed");
1502
1503                let root_directory = Directory::open(&store, store.root_directory_object_id())
1504                    .await
1505                    .expect("open failed");
1506
1507                let mut transaction = fs
1508                    .clone()
1509                    .new_transaction(
1510                        lock_keys![LockKey::object(
1511                            store.store_object_id(),
1512                            store.root_directory_object_id()
1513                        )],
1514                        Options::default(),
1515                    )
1516                    .await
1517                    .expect("new_transaction failed");
1518                let object = root_directory
1519                    .create_child_file(&mut transaction, &format!("C"))
1520                    .await
1521                    .expect("create_child_file failed");
1522                transaction.commit().await.expect("commit failed");
1523
1524                // Write again to the test file if it exists.
1525                if let Ok(test_file) = ObjectStore::open_object(
1526                    &store,
1527                    test_file_object_id,
1528                    HandleOptions::default(),
1529                    None,
1530                )
1531                .await
1532                {
1533                    // Check it has the contents we expect.
1534                    let mut buffer = test_file.allocate_buffer(4096).await;
1535                    let bytes = test_file.read(0, buffer.as_mut()).await.expect("read failed");
1536                    if bytes == 4096 {
1537                        let expected = [0xed; 4096];
1538                        assert_eq!(buffer.as_slice(), &expected);
1539                    } else {
1540                        // If the write didn't make it, the file should have zero bytes.
1541                        assert_eq!(bytes, 0);
1542                    }
1543
1544                    // Modify the test file.
1545                    let mut transaction =
1546                        test_file.new_transaction().await.expect("new_transaction failed");
1547                    buffer.as_mut_slice().fill(0x37);
1548                    test_file
1549                        .txn_write(&mut transaction, 0, buffer.as_ref())
1550                        .await
1551                        .expect("txn_write failed");
1552                    transaction.commit().await.expect("commit failed");
1553                    check_test_file = true;
1554                }
1555
1556                object.object_id()
1557            } else {
1558                INVALID_OBJECT_ID
1559            };
1560
1561            // This will do a controlled unmount.
1562            fs.close().await.expect("close failed");
1563            let device = fs.take_device().await;
1564            device.reopen(false);
1565
1566            let fs = FxFilesystem::open(device).await.expect("open failed");
1567            fsck(fs.clone()).await.expect("fsck failed");
1568
1569            // As mentioned above, make sure that the object we created before the clean unmount
1570            // exists.
1571            if object_id != INVALID_OBJECT_ID {
1572                fsck_volume(&fs, store_id, Some(Arc::new(InsecureCrypt::new())))
1573                    .await
1574                    .expect("fsck_volume failed");
1575
1576                let store = root_volume(fs.clone())
1577                    .await
1578                    .expect("root_volume failed")
1579                    .volume(
1580                        "test",
1581                        StoreOptions {
1582                            crypt: Some(Arc::new(InsecureCrypt::new())),
1583                            ..StoreOptions::default()
1584                        },
1585                    )
1586                    .await
1587                    .expect("volume failed");
1588                // We should be able to open the C object.
1589                ObjectStore::open_object(&store, object_id, HandleOptions::default(), None)
1590                    .await
1591                    .expect("open_object failed");
1592
1593                // If we made the modification to the test file, check it.
1594                if check_test_file {
1595                    info!("Checking test file for modification");
1596                    let test_file = ObjectStore::open_object(
1597                        &store,
1598                        test_file_object_id,
1599                        HandleOptions::default(),
1600                        None,
1601                    )
1602                    .await
1603                    .expect("open_object failed");
1604                    let mut buffer = test_file.allocate_buffer(4096).await;
1605                    assert_eq!(
1606                        test_file.read(0, buffer.as_mut()).await.expect("read failed"),
1607                        4096
1608                    );
1609                    let expected = [0x37; 4096];
1610                    assert_eq!(buffer.as_slice(), &expected);
1611                }
1612            }
1613
1614            fs.close().await.expect("close failed");
1615        }
1616    }
1617
1618    #[fuchsia::test]
1619    async fn test_barrier_not_emitted_when_transaction_has_no_data() {
1620        let barrier_count = Arc::new(AtomicU32::new(0));
1621
1622        struct Observer(Arc<AtomicU32>);
1623
1624        impl fake_device::Observer for Observer {
1625            fn barrier(&self) {
1626                self.0.fetch_add(1, atomic::Ordering::Relaxed);
1627            }
1628        }
1629
1630        let mut fake_device = FakeDevice::new(8192, 4096);
1631        fake_device.set_observer(Box::new(Observer(barrier_count.clone())));
1632        let device = DeviceHolder::new(fake_device);
1633        let fs = FxFilesystemBuilder::new()
1634            .barriers_enabled(true)
1635            .format(true)
1636            .open(device)
1637            .await
1638            .expect("new filesystem failed");
1639
1640        {
1641            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1642            root_vol
1643                .new_volume(
1644                    "test",
1645                    NewChildStoreOptions {
1646                        options: StoreOptions {
1647                            crypt: Some(Arc::new(InsecureCrypt::new())),
1648                            ..StoreOptions::default()
1649                        },
1650                        ..NewChildStoreOptions::default()
1651                    },
1652                )
1653                .await
1654                .expect("there is no test volume");
1655            fs.close().await.expect("close failed");
1656        }
1657        // Remount the filesystem to ensure that the journal flushes and we can get a reliable
1658        // measure of the number of barriers issued during setup.
1659        let device = fs.take_device().await;
1660        device.reopen(false);
1661        let fs = FxFilesystemBuilder::new()
1662            .barriers_enabled(true)
1663            .open(device)
1664            .await
1665            .expect("new filesystem failed");
1666        let expected_barrier_count = barrier_count.load(atomic::Ordering::Relaxed);
1667
1668        let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1669        let store = root_vol
1670            .volume(
1671                "test",
1672                StoreOptions {
1673                    crypt: Some(Arc::new(InsecureCrypt::new())),
1674                    ..StoreOptions::default()
1675                },
1676            )
1677            .await
1678            .expect("there is no test volume");
1679
1680        // Create a number of files with the goal of using up more than one journal block.
1681        let fs = store.filesystem();
1682        let root_directory =
1683            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
1684        for i in 0..100 {
1685            let mut transaction = fs
1686                .clone()
1687                .new_transaction(
1688                    lock_keys![LockKey::object(
1689                        store.store_object_id(),
1690                        store.root_directory_object_id()
1691                    )],
1692                    Options::default(),
1693                )
1694                .await
1695                .expect("new_transaction failed");
1696            root_directory
1697                .create_child_file(&mut transaction, &format!("A {i}"))
1698                .await
1699                .expect("create_child_file failed");
1700            transaction.commit().await.expect("commit failed");
1701        }
1702
1703        // Unmount the filesystem to ensure that the journal flushes.
1704        fs.close().await.expect("close failed");
1705        // Ensure that no barriers were emitted while creating files, as no data was written.
1706        assert_eq!(expected_barrier_count, barrier_count.load(atomic::Ordering::Relaxed));
1707    }
1708
1709    #[fuchsia::test]
1710    async fn test_barrier_emitted_when_transaction_includes_data() {
1711        let barrier_count = Arc::new(AtomicU32::new(0));
1712
1713        struct Observer(Arc<AtomicU32>);
1714
1715        impl fake_device::Observer for Observer {
1716            fn barrier(&self) {
1717                self.0.fetch_add(1, atomic::Ordering::Relaxed);
1718            }
1719        }
1720
1721        let mut fake_device = FakeDevice::new(8192, 4096);
1722        fake_device.set_observer(Box::new(Observer(barrier_count.clone())));
1723        let device = DeviceHolder::new(fake_device);
1724        let fs = FxFilesystemBuilder::new()
1725            .barriers_enabled(true)
1726            .format(true)
1727            .open(device)
1728            .await
1729            .expect("new filesystem failed");
1730
1731        {
1732            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1733            root_vol
1734                .new_volume(
1735                    "test",
1736                    NewChildStoreOptions {
1737                        options: StoreOptions {
1738                            crypt: Some(Arc::new(InsecureCrypt::new())),
1739                            ..StoreOptions::default()
1740                        },
1741                        ..NewChildStoreOptions::default()
1742                    },
1743                )
1744                .await
1745                .expect("there is no test volume");
1746            fs.close().await.expect("close failed");
1747        }
1748        // Remount the filesystem to ensure that the journal flushes and we can get a reliable
1749        // measure of the number of barriers issued during setup.
1750        let device = fs.take_device().await;
1751        device.reopen(false);
1752        let fs = FxFilesystemBuilder::new()
1753            .barriers_enabled(true)
1754            .open(device)
1755            .await
1756            .expect("new filesystem failed");
1757        let expected_barrier_count = barrier_count.load(atomic::Ordering::Relaxed);
1758
1759        let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1760        let store = root_vol
1761            .volume(
1762                "test",
1763                StoreOptions {
1764                    crypt: Some(Arc::new(InsecureCrypt::new())),
1765                    ..StoreOptions::default()
1766                },
1767            )
1768            .await
1769            .expect("there is no test volume");
1770
1771        // Create a file and write something to it. This should cause a barrier to be emitted.
1772        let fs: Arc<FxFilesystem> = store.filesystem();
1773        let root_directory =
1774            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
1775
1776        let mut transaction = fs
1777            .clone()
1778            .new_transaction(
1779                lock_keys![LockKey::object(
1780                    store.store_object_id(),
1781                    store.root_directory_object_id()
1782                )],
1783                Options::default(),
1784            )
1785            .await
1786            .expect("new_transaction failed");
1787        let object = root_directory
1788            .create_child_file(&mut transaction, "test")
1789            .await
1790            .expect("create_child_file failed");
1791        transaction.commit().await.expect("commit failed");
1792
1793        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
1794        let mut buffer = object.allocate_buffer(4096).await;
1795        buffer.as_mut_slice().fill(0xed);
1796        object.txn_write(&mut transaction, 0, buffer.as_ref()).await.expect("txn_write failed");
1797        transaction.commit().await.expect("commit failed");
1798
1799        // Unmount the filesystem to ensure that the journal flushes.
1800        fs.close().await.expect("close failed");
1801        // Ensure that a barrier was emitted while writing to the file.
1802        assert!(expected_barrier_count < barrier_count.load(atomic::Ordering::Relaxed));
1803    }
1804
1805    #[fuchsia::test]
1806    async fn test_image_builder_mode_no_early_writes() {
1807        const BLOCK_SIZE: u32 = 4096;
1808        let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
1809        device.reopen(true);
1810        let fs = FxFilesystemBuilder::new()
1811            .format(true)
1812            .image_builder_mode(Some(SuperBlockInstance::A))
1813            .open(device)
1814            .await
1815            .expect("open failed");
1816        // Image builder mode only writes when data is written or fs.finalize() is called, so
1817        // we shouldn't see any errors here.
1818        fs.close().await.expect("closed");
1819    }
1820
1821    #[fuchsia::test]
1822    async fn test_image_builder_mode() {
1823        const BLOCK_SIZE: u32 = 4096;
1824        const EXISTING_FILE_RANGE: Range<u64> = 4096 * 1024..4096 * 1025;
1825        let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
1826
1827        // Write some fake file data at an offset in the image and confirm it as an fxfs file below.
1828        {
1829            let mut write_buf =
1830                device.allocate_buffer(EXISTING_FILE_RANGE.length().unwrap() as usize).await;
1831            write_buf.as_mut_slice().fill(0xf0);
1832            device.write(EXISTING_FILE_RANGE.start, write_buf.as_ref()).await.expect("write");
1833        }
1834
1835        device.reopen(true);
1836
1837        let device = {
1838            let fs = FxFilesystemBuilder::new()
1839                .format(true)
1840                .image_builder_mode(Some(SuperBlockInstance::B))
1841                .open(device)
1842                .await
1843                .expect("open failed");
1844            {
1845                let root_store = fs.root_store();
1846                let root_directory =
1847                    Directory::open(&root_store, root_store.root_directory_object_id())
1848                        .await
1849                        .expect("open failed");
1850                // Create a file referencing existing data on device.
1851                let handle;
1852                {
1853                    let mut transaction = fs
1854                        .clone()
1855                        .new_transaction(
1856                            lock_keys![LockKey::object(
1857                                root_directory.store().store_object_id(),
1858                                root_directory.object_id()
1859                            )],
1860                            Options::default(),
1861                        )
1862                        .await
1863                        .expect("new transaction");
1864                    handle = root_directory
1865                        .create_child_file(&mut transaction, "test")
1866                        .await
1867                        .expect("create file");
1868                    handle.extend(&mut transaction, EXISTING_FILE_RANGE).await.expect("extend");
1869                    transaction.commit().await.expect("commit");
1870                }
1871            }
1872            fs.device().reopen(false);
1873            fs.finalize().await.expect("finalize");
1874            fs.close().await.expect("close");
1875            fs.take_device().await
1876        };
1877        device.reopen(false);
1878        let fs = FxFilesystem::open(device).await.expect("open failed");
1879        fsck(fs.clone()).await.expect("fsck failed");
1880
1881        // Confirm that the test file points at the correct data.
1882        let root_store = fs.root_store();
1883        let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1884            .await
1885            .expect("open failed");
1886        let (object_id, descriptor, _) =
1887            root_directory.lookup("test").await.expect("lookup failed").unwrap();
1888        assert_eq!(descriptor, ObjectDescriptor::File);
1889        let test_file =
1890            ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
1891                .await
1892                .expect("open failed");
1893        let mut read_buf =
1894            test_file.allocate_buffer(EXISTING_FILE_RANGE.length().unwrap() as usize).await;
1895        test_file.read(0, read_buf.as_mut()).await.expect("read failed");
1896        assert_eq!(read_buf.as_slice(), [0xf0; 4096]);
1897        fs.close().await.expect("closed");
1898    }
1899}